Update dashboard.py

Removed the need to check for each custom classes, will check if the class have `toJson`, if `True` then use it.
This commit is contained in:
Donald Zou 2025-01-19 14:04:59 +08:00
parent 645db97c14
commit 8956355e57

View File

@ -1,23 +1,24 @@
import itertools, random, shutil, sqlite3, configparser, hashlib, ipaddress, json, traceback, os, secrets, subprocess
import smtplib
import random, shutil, sqlite3, configparser, hashlib, ipaddress, json, os, secrets, subprocess
import time, re, urllib.error, uuid, bcrypt, psutil, pyotp, threading
from uuid import uuid4
from zipfile import ZipFile
from datetime import datetime, timedelta
from typing import Any
from jinja2 import Template
from flask import Flask, request, render_template, session, g, send_file
from flask import Flask, request, render_template, session, send_file
from json import JSONEncoder
from flask_cors import CORS
from icmplib import ping, traceroute
from flask.json.provider import DefaultJSONProvider
from Utilities import (
RegexMatch, GetRemoteEndpoint, StringToBoolean,
ValidateIPAddressesWithRange, ValidateIPAddresses, ValidateDNSAddress,
RegexMatch, GetRemoteEndpoint, StringToBoolean,
ValidateIPAddressesWithRange, ValidateDNSAddress,
GenerateWireguardPublicKey, GenerateWireguardPrivateKey
)
from Email import EmailSender
from modules.Email import EmailSender
from modules.Log import Log
from modules.DashboardLogger import DashboardLogger
from modules.PeerJobLogger import PeerJobLogger
DASHBOARD_VERSION = 'v4.2.0'
@ -44,12 +45,7 @@ class CustomJsonEncoder(DefaultJSONProvider):
super().__init__(app)
def default(self, o):
if (isinstance(o, WireguardConfiguration)
or isinstance(o, Peer)
or isinstance(o, PeerJob)
or isinstance(o, Log)
or isinstance(o, DashboardAPIKey)
or isinstance(o, PeerShareLink)):
if callable(getattr(o, "toJson", None)):
return o.toJson()
return super().default(self, o)
app.json = CustomJsonEncoder(app)
@ -64,117 +60,58 @@ def ResponseObject(status=True, message=None, data=None) -> Flask.response_class
"data": data
})
response.content_type = "application/json"
return response
"""
Log Class
"""
class Log:
def __init__(self, LogID: str, JobID: str, LogDate: str, Status: str, Message: str):
self.LogID = LogID
self.JobID = JobID
self.LogDate = LogDate
self.Status = Status
self.Message = Message
def toJson(self):
return {
"LogID": self.LogID,
"JobID": self.JobID,
"LogDate": self.LogDate,
"Status": self.Status,
"Message": self.Message
}
def __dict__(self):
return self.toJson()
"""
Dashboard Logger Class
"""
class DashboardLogger:
def __init__(self):
self.loggerdb = sqlite3.connect(os.path.join(CONFIGURATION_PATH, 'db', 'wgdashboard_log.db'),
isolation_level=None,
check_same_thread=False)
self.loggerdb.row_factory = sqlite3.Row
self.__createLogDatabase()
self.log(Message="WGDashboard started")
def __createLogDatabase(self):
with self.loggerdb:
loggerdbCursor = self.loggerdb.cursor()
existingTable = loggerdbCursor.execute("SELECT name from sqlite_master where type='table'").fetchall()
existingTable = [t['name'] for t in existingTable]
if "DashboardLog" not in existingTable:
loggerdbCursor.execute(
"CREATE TABLE DashboardLog (LogID VARCHAR NOT NULL, LogDate DATETIME DEFAULT (strftime('%Y-%m-%d %H:%M:%S','now', 'localtime')), URL VARCHAR, IP VARCHAR, Status VARCHAR, Message VARCHAR, PRIMARY KEY (LogID))")
if self.loggerdb.in_transaction:
self.loggerdb.commit()
def log(self, URL: str = "", IP: str = "", Status: str = "true", Message: str = "") -> bool:
try:
loggerdbCursor = self.loggerdb.cursor()
loggerdbCursor.execute(
"INSERT INTO DashboardLog (LogID, URL, IP, Status, Message) VALUES (?, ?, ?, ?, ?);", (str(uuid.uuid4()), URL, IP, Status, Message,))
loggerdbCursor.close()
self.loggerdb.commit()
return True
except Exception as e:
print(f"[WGDashboard] Access Log Error: {str(e)}")
return False
return response
"""
Peer Job Logger
"""
class PeerJobLogger:
def __init__(self):
self.loggerdb = sqlite3.connect(os.path.join(CONFIGURATION_PATH, 'db', 'wgdashboard_log.db'),
check_same_thread=False)
self.loggerdb.row_factory = sqlite3.Row
self.logs:list(Log) = []
self.__createLogDatabase()
def __createLogDatabase(self):
with self.loggerdb:
loggerdbCursor = self.loggerdb.cursor()
existingTable = loggerdbCursor.execute("SELECT name from sqlite_master where type='table'").fetchall()
existingTable = [t['name'] for t in existingTable]
if "JobLog" not in existingTable:
loggerdbCursor.execute("CREATE TABLE JobLog (LogID VARCHAR NOT NULL, JobID NOT NULL, LogDate DATETIME DEFAULT (strftime('%Y-%m-%d %H:%M:%S','now', 'localtime')), Status VARCHAR NOT NULL, Message VARCHAR, PRIMARY KEY (LogID))")
if self.loggerdb.in_transaction:
self.loggerdb.commit()
def log(self, JobID: str, Status: bool = True, Message: str = "") -> bool:
try:
with self.loggerdb:
loggerdbCursor = self.loggerdb.cursor()
loggerdbCursor.execute(f"INSERT INTO JobLog (LogID, JobID, Status, Message) VALUES (?, ?, ?, ?)",
(str(uuid.uuid4()), JobID, Status, Message,))
if self.loggerdb.in_transaction:
self.loggerdb.commit()
except Exception as e:
print(f"[WGDashboard] Peer Job Log Error: {str(e)}")
return False
return True
def getLogs(self, all: bool = False, configName = None) -> list[Log]:
logs: list[Log] = []
try:
allJobs = AllPeerJobs.getAllJobs(configName)
allJobsID = ", ".join([f"'{x.JobID}'" for x in allJobs])
with self.loggerdb:
loggerdbCursor = self.loggerdb.cursor()
table = loggerdbCursor.execute(f"SELECT * FROM JobLog WHERE JobID IN ({allJobsID}) ORDER BY LogDate DESC").fetchall()
self.logs.clear()
for l in table:
logs.append(
Log(l["LogID"], l["JobID"], l["LogDate"], l["Status"], l["Message"]))
except Exception as e:
return logs
return logs
# class PeerJobLogger:
# def __init__(self):
# self.loggerdb = sqlite3.connect(os.path.join(CONFIGURATION_PATH, 'db', 'wgdashboard_log.db'),
# check_same_thread=False)
# self.loggerdb.row_factory = sqlite3.Row
# self.logs:list(Log) = []
# self.__createLogDatabase()
#
# def __createLogDatabase(self):
# with self.loggerdb:
# loggerdbCursor = self.loggerdb.cursor()
#
# existingTable = loggerdbCursor.execute("SELECT name from sqlite_master where type='table'").fetchall()
# existingTable = [t['name'] for t in existingTable]
#
# if "JobLog" not in existingTable:
# loggerdbCursor.execute("CREATE TABLE JobLog (LogID VARCHAR NOT NULL, JobID NOT NULL, LogDate DATETIME DEFAULT (strftime('%Y-%m-%d %H:%M:%S','now', 'localtime')), Status VARCHAR NOT NULL, Message VARCHAR, PRIMARY KEY (LogID))")
# if self.loggerdb.in_transaction:
# self.loggerdb.commit()
# def log(self, JobID: str, Status: bool = True, Message: str = "") -> bool:
# try:
# with self.loggerdb:
# loggerdbCursor = self.loggerdb.cursor()
# loggerdbCursor.execute(f"INSERT INTO JobLog (LogID, JobID, Status, Message) VALUES (?, ?, ?, ?)",
# (str(uuid.uuid4()), JobID, Status, Message,))
# if self.loggerdb.in_transaction:
# self.loggerdb.commit()
# except Exception as e:
# print(f"[WGDashboard] Peer Job Log Error: {str(e)}")
# return False
# return True
#
# def getLogs(self, all: bool = False, configName = None) -> list[Log]:
# logs: list[Log] = []
# try:
# allJobs = AllPeerJobs.getAllJobs(configName)
# allJobsID = ", ".join([f"'{x.JobID}'" for x in allJobs])
# with self.loggerdb:
# loggerdbCursor = self.loggerdb.cursor()
# table = loggerdbCursor.execute(f"SELECT * FROM JobLog WHERE JobID IN ({allJobsID}) ORDER BY LogDate DESC").fetchall()
# self.logs.clear()
# for l in table:
# logs.append(
# Log(l["LogID"], l["JobID"], l["LogDate"], l["Status"], l["Message"]))
# except Exception as e:
# return logs
# return logs
"""
Peer Job
@ -357,9 +294,13 @@ class PeerJobs:
f"Peer {fp.id} from {c.Name} failed {job.Action}ed."
)
else:
needToDelete.append(job)
JobLogger.log(job.JobID, False,
f"Somehow can't find this peer {job.Peer} from {c.Name} failed {job.Action}ed."
)
else:
needToDelete.append(job)
JobLogger.log(job.JobID, False,
f"Somehow can't find this peer {job.Peer} from {job.Configuration} failed {job.Action}ed."
)
for j in needToDelete:
self.deleteJob(j)
@ -3257,8 +3198,8 @@ def InitAmneziaWireguardConfigurationsList(startup: bool = False):
AllPeerShareLinks: PeerShareLinks = PeerShareLinks()
AllPeerJobs: PeerJobs = PeerJobs()
JobLogger: PeerJobLogger = PeerJobLogger()
DashboardLogger: DashboardLogger = DashboardLogger()
JobLogger: PeerJobLogger = PeerJobLogger(CONFIGURATION_PATH, AllPeerJobs)
DashboardLogger: DashboardLogger = DashboardLogger(CONFIGURATION_PATH)
_, app_ip = DashboardConfig.GetConfig("Server", "app_ip")
_, app_port = DashboardConfig.GetConfig("Server", "app_port")
_, WG_CONF_PATH = DashboardConfig.GetConfig("Server", "wg_conf_path")