From fb3606c2b7b95e11cbb4cc08ca46a23813d3ca4c Mon Sep 17 00:00:00 2001
From: Galonza Peter
Date: Sun, 2 Jan 2022 16:35:39 +0300
Subject: [PATCH] Refactored
Added docstrings like sphinx style, small refactoring and added TODO
---
src/dashboard.py | 533 +++++++++++++++++++++++++++++++++++++++--------
1 file changed, 444 insertions(+), 89 deletions(-)
diff --git a/src/dashboard.py b/src/dashboard.py
index 4db5c87..c0742b4 100644
--- a/src/dashboard.py
+++ b/src/dashboard.py
@@ -27,7 +27,7 @@ from icmplib import ping, traceroute
from tinydb import TinyDB, Query
# Import other python files
-from util import regex_match, check_DNS, check_Allowed_IPs, check_remote_endpoint,\
+from util import regex_match, check_DNS, check_Allowed_IPs, check_remote_endpoint, \
check_IP_with_range, clean_IP_with_range
# Dashboard Version
@@ -54,37 +54,61 @@ QRcode(app)
sem = threading.RLock()
+# TODO use class and object oriented programming
+
# Read / Write Dashboard Config File
def get_dashboard_conf():
+ """Dashboard Configuration Related
+
+ :return: A config parser object
+ :rtype: configparser.ConfigParser
"""
- Dashboard Configuration Related
- """
+
config = configparser.ConfigParser(strict=False)
config.read(DASHBOARD_CONF)
return config
def set_dashboard_conf(config):
+ """Configuration writer
+
+ :param config: A config parser object
+ :type config: configparser.ConfigParser
+ """
+
with open(DASHBOARD_CONF, "w", encoding='utf-8') as conf_object:
config.write(conf_object)
# Get all keys from a configuration
def get_conf_peer_key(config_name):
+ """Get the peers keys of wireguard interface.
+
+ :param config_name: Name of WG interface
+ :type config_name: str
+ :return: Return list of peers keys or text if configuration not running
+ :rtype: list, str
"""
- Configuration Related
- """
+
try:
- peer_key = subprocess.run(f"wg show {config_name} peers",
- check=True, shell=True, capture_output=True).stdout
- peer_key = peer_key.decode("UTF-8").split()
- return peer_key
+ peers_keys = subprocess.run(f"wg show {config_name} peers",
+ check=True, shell=True, capture_output=True).stdout
+ peers_keys = peers_keys.decode("UTF-8").split()
+ return peers_keys
except subprocess.CalledProcessError:
return config_name + " is not running."
# Get numbers of connected peer of a configuration
def get_conf_running_peer_number(config_name):
+ """Get number of running peers on wireguard interface.
+
+ :param config_name: Name of WG interface
+ :type config_name: str
+ :return: Number of running peers, or test if configuration not running
+ :rtype: int, str
+ """
+
running = 0
# Get latest handshakes
try:
@@ -104,8 +128,17 @@ def get_conf_running_peer_number(config_name):
return running
+# TODO use modules for working with ini(configparser or wireguard)
# Read [Interface] section from configuration file
def read_conf_file_interface(config_name):
+ """Get interface settings.
+
+ :param config_name: Name of WG interface
+ :type config_name: str
+ :return: Dictionary with interface settings
+ :rtype: dict
+ """
+
conf_location = WG_CONF_PATH + "/" + config_name + ".conf"
with open(conf_location, 'r', encoding='utf-8') as file_object:
file = file_object.read().split("\n")
@@ -120,8 +153,17 @@ def read_conf_file_interface(config_name):
return data
+# TODO use modules for working with ini(configparser or wireguard)
# Read the whole configuration file
def read_conf_file(config_name):
+ """Get configurations from file of wireguard interface.
+
+ :param config_name: Name of WG interface
+ :type config_name: str
+ :return: Dictionary with interface and peers settings
+ :rtype: dict
+ """
+
# Read Configuration File Start
conf_location = WG_CONF_PATH + "/" + config_name + ".conf"
f = open(conf_location, 'r')
@@ -162,6 +204,17 @@ def read_conf_file(config_name):
# Get latest handshake from all peers of a configuration
def get_latest_handshake(config_name, db, peers):
+ """Update latest handshake of peers.
+
+ :param config_name: Name of WG interface
+ :type config_name: str
+ :param db: Connector for database
+ :type db: tinydb.TinyDB
+ :param peers: TODO
+ :return: Return string if stopped or None if OK.
+ :rtype: string, None
+ """
+
# Get latest handshakes
try:
data_usage = subprocess.run(f"wg show {config_name} latest-handshakes",
@@ -190,6 +243,17 @@ def get_latest_handshake(config_name, db, peers):
# Get transfer from all peers of a configuration
def get_transfer(config_name, db, peers):
+ """Update transfer data values of peers.
+
+ :param config_name: Name of WG interface
+ :type config_name: str
+ :param db: Connector for database
+ :type db: tinydb.TinyDB
+ :param peers: TODO
+ :return: Return string if stopped or None if OK.
+ :rtype: string, None
+ """
+
# Get transfer
try:
data_usage = subprocess.run(f"wg show {config_name} transfer",
@@ -233,6 +297,17 @@ def get_transfer(config_name, db, peers):
# Get endpoint from all peers of a configuration
def get_endpoint(config_name, db, peers):
+ """Get endpoint address for peers.
+
+ :param config_name: Name of WG interface
+ :type config_name: str
+ :param db: Connector for database
+ :type db: tinydb.TinyDB
+ :param peers: TODO
+ :return: Return string if stopped or None if OK.
+ :rtype: string, None
+ """
+
# Get endpoint
try:
data_usage = subprocess.run(f"wg show {config_name} endpoints",
@@ -250,6 +325,15 @@ def get_endpoint(config_name, db, peers):
# Get allowed ips from all peers of a configuration
def get_allowed_ip(db, peers, conf_peer_data):
+ """Get allowed subnets or ips for peers.
+
+ :param peers: TODO
+ :param db: Connector for database
+ :type db: tinydb.TinyDB
+ :param conf_peer_data: Dictionary with configurations fot peers and interfaces
+ :type conf_peer_data" dict
+ """
+
# Get allowed ip
for i in conf_peer_data["Peers"]:
db.update({"allowed_ip": i.get('AllowedIPs', '(None)')}, peers.id == i["PublicKey"])
@@ -257,6 +341,12 @@ def get_allowed_ip(db, peers, conf_peer_data):
# Look for new peers from WireGuard
def get_all_peers_data(config_name):
+ """Get all settings fot peer.
+
+ :param config_name: Name of WG interface
+ :type config_name: str
+ """
+
sem.acquire(timeout=1)
db = TinyDB(os.path.join(DB_PATH, config_name + '.json'))
peers = Query()
@@ -303,7 +393,7 @@ def get_all_peers_data(config_name):
if "keepalive" not in search[0]:
update_db['keepalive'] = config.get("Peers", "peer_keep_alive")
if "remote_endpoint" not in search[0]:
- update_db['remote_endpoint'] = config.get("Peers","remote_endpoint")
+ update_db['remote_endpoint'] = config.get("Peers", "remote_endpoint")
if "preshared_key" not in search[0]:
if "PresharedKey" in i.keys():
update_db['preshared_key'] = i["PresharedKey"]
@@ -332,8 +422,15 @@ def get_all_peers_data(config_name):
# Search for peers
def get_peers(config_name, search, sort_t):
- """
- Frontend Related Functions
+ """Get all peers.
+
+ :param config_name: Name of WG interface
+ :type config_name: str
+ :param search: Search string
+ :type search: str
+ :param sort_t: TODO
+ :type sort_t: str
+ :return: TODO
"""
get_all_peers_data(config_name)
@@ -358,6 +455,14 @@ def get_peers(config_name, search, sort_t):
# Get configuration public key
def get_conf_pub_key(config_name):
+ """Get public key for configuration.
+
+ :param config_name: Name of WG interface
+ :type config_name: str
+ :return: Return public key or empty string
+ :rtype: str
+ """
+
try:
conf = configparser.ConfigParser(strict=False)
conf.read(WG_CONF_PATH + "/" + config_name + ".conf")
@@ -371,6 +476,14 @@ def get_conf_pub_key(config_name):
# Get configuration listen port
def get_conf_listen_port(config_name):
+ """Get listen port number.
+
+ :param config_name: Name of WG interface
+ :type config_name: str
+ :return: Return number of port or empty string
+ :rtype: str
+ """
+
conf = configparser.ConfigParser(strict=False)
conf.read(WG_CONF_PATH + "/" + config_name + ".conf")
port = ""
@@ -387,6 +500,14 @@ def get_conf_listen_port(config_name):
# Get configuration total data
def get_conf_total_data(config_name):
+ """Get total transferred data via wireguard interface.
+
+ :param config_name: Name of WG interface
+ :type config_name: str
+ :return: Dictionary with total_sent, total_receive, total io
+ :rtype: dict
+ """
+
sem.acquire(timeout=1)
db = TinyDB(os.path.join(DB_PATH, config_name + ".json"))
upload_total = 0
@@ -417,6 +538,12 @@ def get_conf_status(config_name):
# Get all configuration as a list
def get_conf_list():
+ """Get all wireguard interfaces with status.
+
+ :return: Return a list of dicts with interfaces and its statuses
+ :rtype: list
+ """
+
conf = []
for i in os.listdir(WG_CONF_PATH):
if regex_match("^(.{1,}).(conf)$", i):
@@ -434,9 +561,15 @@ def get_conf_list():
# Generate private key
def gen_private_key():
- gen = subprocess.check_output('wg genkey > private_key.txt && wg pubkey < private_key.txt > public_key.txt',
- shell=True)
- gen_psk = subprocess.check_output('wg genpsk', shell=True)
+ """Generate the private key.
+
+ :return: Return dict with private, public and preshared keys
+ :rtype: dict
+ """
+
+ subprocess.run('wg genkey > private_key.txt && wg pubkey < private_key.txt > public_key.txt',
+ check=True, shell=True)
+ gen_psk = subprocess.run('wg genpsk', shell=True, check=True, capture_output=True).stdout
preshare_key = gen_psk.decode("UTF-8").strip()
with open('private_key.txt', encoding='utf-8') as file_object:
private_key = file_object.readline().strip()
@@ -448,10 +581,18 @@ def gen_private_key():
# Generate public key
def gen_public_key(private_key):
+ """Generate the public key.
+
+ :param private_key: Pricate key
+ :type private_key: str
+ :return: Return dict with public key or error message
+ :rtype: dict
+ """
+
with open('private_key.txt', 'w', encoding='utf-8') as file_object:
file_object.write(private_key)
try:
- check = subprocess.check_output("wg pubkey < private_key.txt > public_key.txt", shell=True)
+ subprocess.run("wg pubkey < private_key.txt > public_key.txt", check=True, shell=True)
with open('public_key.txt', encoding='utf-8') as file_object:
public_key = file_object.readline().strip()
os.remove('private_key.txt')
@@ -464,6 +605,18 @@ def gen_public_key(private_key):
# Check if private key and public key match
def f_check_key_match(private_key, public_key, config_name):
+ """TODO
+
+ :param private_key: Private key
+ :type private_key: str
+ :param public_key: Public key
+ :type public_key: str
+ :param config_name: Name of WG interface
+ :type config_name: str
+ :return: Return dictionary with status
+ :rtype: dict
+ """
+
result = gen_public_key(private_key)
if result['status'] == 'failed':
return result
@@ -487,8 +640,21 @@ def f_check_key_match(private_key, public_key, config_name):
print("RuntimeError: cannot release un-acquired lock")
return {'status': 'success'}
+
# Check if there is repeated allowed IP
def check_repeat_allowed_ip(public_key, ip, config_name):
+ """Check for the existence of an allowed ip.
+
+ :param public_key: Public key
+ :type public_key: str
+ :param ip: allowed ip
+ :type ip: str
+ :param config_name: Name of WG interface
+ :type config_name: str
+ :return: Return dictionary with status
+ :rtype: dict
+ """
+
sem.acquire(timeout=1)
db = TinyDB(os.path.join(DB_PATH, config_name + ".json"))
peers = Query()
@@ -521,6 +687,12 @@ Flask Functions
# Before request
@app.before_request
def auth_req():
+ """ TODO
+
+ :return: TODO
+ :rtype: str, None
+ """
+
conf = get_dashboard_conf()
req = conf.get("Server", "auth_req")
session['update'] = UPDATE
@@ -555,6 +727,12 @@ Sign In / Sign Out
# Sign In
@app.route('/signin', methods=['GET'])
def signin():
+ """Sign in request.
+
+ :return: TODO
+ :rtype: TODO
+ """
+
message = ""
if "message" in session:
message = session['message']
@@ -565,6 +743,12 @@ def signin():
# Sign Out
@app.route('/signout', methods=['GET'])
def signout():
+ """Sign out request.
+
+ :return: TODO
+ :rtype: TODO
+ """
+
if "username" in session:
session.pop("username")
message = "Sign out successfully!"
@@ -574,6 +758,12 @@ def signout():
# Authentication
@app.route('/auth', methods=['POST'])
def auth():
+ """Authentication request.
+
+ :return: TODO
+ :rtype: TODO
+ """
+
config = get_dashboard_conf()
password = hashlib.sha256(request.form['password'].encode())
if password.hexdigest() == config["Account"]["password"] \
@@ -589,8 +779,10 @@ def auth():
@app.route('/', methods=['GET'])
def index():
- """
- Index Page Related
+ """Index Page Related.
+
+ :return: TODO
+ :rtype: TODO
"""
return render_template('index.html', conf=get_conf_list())
@@ -599,8 +791,10 @@ def index():
# Setting Page
@app.route('/settings', methods=['GET'])
def settings():
- """
- Setting Page Related
+ """Setting Page Related.
+
+ :return: TODO
+ :rtype: TODO
"""
message = ""
status = ""
@@ -624,6 +818,12 @@ def settings():
# Update account username
@app.route('/update_acct', methods=['POST'])
def update_acct():
+ """Change account user name.
+
+ :return: TODO
+ :rtype: TODO
+ """
+
if len(request.form['username']) == 0:
session['message'] = "Username cannot be empty."
session['message_status'] = "danger"
@@ -647,6 +847,12 @@ def update_acct():
# Update peer default settting
@app.route('/update_peer_default_config', methods=['POST'])
def update_peer_default_config():
+ """Change default configurations for peers.
+
+ :return: TODO
+ :rtype: TODO
+ """
+
config = get_dashboard_conf()
if len(request.form['peer_endpoint_allowed_ip']) == 0 or \
len(request.form['peer_global_DNS']) == 0 or \
@@ -712,6 +918,12 @@ def update_peer_default_config():
# Update dashboard password
@app.route('/update_pwd', methods=['POST'])
def update_pwd():
+ """Change account password.
+
+ :return: TODO
+ :rtype: TODO
+ """
+
config = get_dashboard_conf()
if hashlib.sha256(request.form['currentpass'].encode()).hexdigest() == config.get("Account", "password"):
if hashlib.sha256(request.form['newpass'].encode()).hexdigest() == hashlib.sha256(
@@ -743,6 +955,9 @@ def update_pwd():
# Update dashboard IP and port
@app.route('/update_app_ip_port', methods=['POST'])
def update_app_ip_port():
+ """Change port number of dashboard.
+ """
+
config = get_dashboard_conf()
config.set("Server", "app_ip", request.form['app_ip'])
config.set("Server", "app_port", request.form['app_port'])
@@ -754,6 +969,9 @@ def update_app_ip_port():
# Update WireGuard configuration file path
@app.route('/update_wg_conf_path', methods=['POST'])
def update_wg_conf_path():
+ """Change path to dashboard configuration.
+ """
+
config = get_dashboard_conf()
config.set("Server", "wg_conf_path", request.form['wg_conf_path'])
set_dashboard_conf(config)
@@ -766,9 +984,9 @@ def update_wg_conf_path():
# Update configuration sorting
@app.route('/update_dashboard_sort', methods=['POST'])
def update_dashbaord_sort():
+ """Configuration Page Related
"""
- Configuration Page Related
- """
+
config = get_dashboard_conf()
data = request.get_json()
sort_tag = ['name', 'status', 'allowed_ip']
@@ -784,6 +1002,12 @@ def update_dashbaord_sort():
# Update configuration refresh interval
@app.route('/update_dashboard_refresh_interval', methods=['POST'])
def update_dashboard_refresh_interval():
+ """Change the refresh time.
+
+ :return: Return text with result
+ :rtype: str
+ """
+
preset_interval = ["5000", "10000", "30000", "60000"]
if request.form["interval"] in preset_interval:
config = get_dashboard_conf()
@@ -798,6 +1022,14 @@ def update_dashboard_refresh_interval():
# Configuration Page
@app.route('/configuration/', methods=['GET'])
def configuration(config_name):
+ """Show wireguard interface view.
+
+ :param config_name: Name of WG interface
+ :type config_name: str
+ :return: TODO
+ :rtype: TODO
+ """
+
config = get_dashboard_conf()
conf_data = {
"name": config_name,
@@ -830,6 +1062,14 @@ def configuration(config_name):
# Get configuration details
@app.route('/get_config/', methods=['GET'])
def get_conf(config_name):
+ """Get configuration setting of wireguard interface.
+
+ :param config_name: Name of WG interface
+ :type config_name: str
+ :return: TODO
+ :rtype: TODO
+ """
+
config_interface = read_conf_file_interface(config_name)
search = request.args.get('search')
if len(search) == 0:
@@ -838,7 +1078,7 @@ def get_conf(config_name):
config = get_dashboard_conf()
sort = config.get("Server", "dashboard_sort")
peer_display_mode = config.get("Peers", "peer_display_mode")
- wg_ip = config.get("Peers","remote_endpoint")
+ wg_ip = config.get("Peers", "remote_endpoint")
if "Address" not in config_interface:
conf_address = "N/A"
else:
@@ -867,9 +1107,18 @@ def get_conf(config_name):
# return render_template('get_conf.html', conf_data=conf_data, wg_ip=config.get("Peers","remote_endpoint"), sort_tag=sort,
# dashboard_refresh_interval=int(config.get("Server", "dashboard_refresh_interval")), peer_display_mode=peer_display_mode)
+
# Turn on / off a configuration
@app.route('/switch/', methods=['GET'])
def switch(config_name):
+ """On/off the wireguard interface.
+
+ :param config_name: Name of WG interface
+ :type config_name: str
+ :return: TODO
+ :rtype: TODO
+ """
+
if "username" not in session:
print("not loggedin")
return redirect(url_for("signin"))
@@ -888,9 +1137,18 @@ def switch(config_name):
return redirect('/')
return redirect(request.referrer)
+
# Add peer
@app.route('/add_peer/', methods=['POST'])
def add_peer(config_name):
+ """Add new peer.
+
+ :param config_name: Name of WG interface
+ :type config_name: str
+ :return: Return result of action or recommendations
+ :rtype: str
+ """
+
sem.acquire(timeout=1)
db = TinyDB(os.path.join(DB_PATH, config_name + ".json"))
peers = Query()
@@ -944,24 +1202,25 @@ def add_peer(config_name):
print("RuntimeError: cannot release un-acquired lock")
return "Endpoint Allowed IPs format is incorrect."
if len(data['MTU']) == 0 or not data['MTU'].isdigit():
- db.close()
- try:
- sem.release()
- except RuntimeError as e:
- print("RuntimeError: cannot release un-acquired lock")
- return "MTU format is not correct."
+ db.close()
+ try:
+ sem.release()
+ except RuntimeError as e:
+ print("RuntimeError: cannot release un-acquired lock")
+ return "MTU format is not correct."
if len(data['keep_alive']) == 0 or not data['keep_alive'].isdigit():
- db.close()
- try:
- sem.release()
- except RuntimeError as e:
- print("RuntimeError: cannot release un-acquired lock")
- return "Persistent Keepalive format is not correct."
+ db.close()
+ try:
+ sem.release()
+ except RuntimeError as e:
+ print("RuntimeError: cannot release un-acquired lock")
+ return "Persistent Keepalive format is not correct."
try:
if enable_preshared_key == True:
key = subprocess.check_output("wg genpsk > tmp_psk.txt", shell=True)
- status = subprocess.check_output(f"wg set {config_name} peer {public_key} allowed-ips {allowed_ips} preshared-key tmp_psk.txt",
- shell=True, stderr=subprocess.STDOUT)
+ status = subprocess.check_output(
+ f"wg set {config_name} peer {public_key} allowed-ips {allowed_ips} preshared-key tmp_psk.txt",
+ shell=True, stderr=subprocess.STDOUT)
os.remove("tmp_psk.txt")
elif enable_preshared_key == False:
status = subprocess.check_output(f"wg set {config_name} peer {public_key} allowed-ips {allowed_ips}",
@@ -990,6 +1249,14 @@ def add_peer(config_name):
# Remove peer
@app.route('/remove_peer/', methods=['POST'])
def remove_peer(config_name):
+ """Remove peer.
+
+ :param config_name: Name of WG interface
+ :type config_name: str
+ :return: Return result of action or recommendations
+ :rtype: str
+ """
+
if get_conf_status(config_name) == "stopped":
return "Your need to turn on " + config_name + " first."
sem.acquire(timeout=1)
@@ -1003,30 +1270,39 @@ def remove_peer(config_name):
if delete_key not in keys:
db.close()
return "This key does not exist"
- else:
+
+ try:
+ subprocess.run(f"wg set {config_name} peer {delete_key} remove",
+ shell=True, check=True, stderr=subprocess.STDOUT)
+ subprocess.run(f"wg-quick save {config_name}",
+ shell=True, check=True, stderr=subprocess.STDOUT)
+ db.remove(peers.id == delete_key)
+ db.close()
try:
- remove_wg = subprocess.check_output(f"wg set {config_name} peer {delete_key} remove",
- shell=True, stderr=subprocess.STDOUT)
- save_wg = subprocess.check_output(f"wg-quick save {config_name}",
- shell=True, stderr=subprocess.STDOUT)
- db.remove(peers.id == delete_key)
- db.close()
- try:
- sem.release()
- except RuntimeError as e:
- print("RuntimeError: cannot release un-acquired lock")
- return "true"
- except subprocess.CalledProcessError as exc:
- db.close()
- try:
- sem.release()
- except RuntimeError as e:
- print("RuntimeError: cannot release un-acquired lock")
- return exc.output.strip()
+ sem.release()
+ except RuntimeError as e:
+ print("RuntimeError: cannot release un-acquired lock")
+ return "true"
+ except subprocess.CalledProcessError as exc:
+ db.close()
+ try:
+ sem.release()
+ except RuntimeError as e:
+ print("RuntimeError: cannot release un-acquired lock")
+ return exc.output.strip()
+
# Save peer settings
@app.route('/save_peer_setting/', methods=['POST'])
def save_peer_setting(config_name):
+ """Save peer configuration.
+
+ :param config_name: Name of WG interface
+ :type config_name: str
+ :return: Return status of action and text with recommendations
+ :rtype: TODO
+ """
+
data = request.get_json()
id = data['id']
name = data['name']
@@ -1088,8 +1364,8 @@ def save_peer_setting(config_name):
tmp_psk = open("tmp_edit_psk.txt", "w+")
tmp_psk.write(preshared_key)
tmp_psk.close()
- change_psk = subprocess.check_output(f"wg set {config_name} peer {id} preshared-key tmp_edit_psk.txt",
- shell=True, stderr=subprocess.STDOUT)
+ change_psk = subprocess.run(f"wg set {config_name} peer {id} preshared-key tmp_edit_psk.txt",
+ shell=True, check=True, stderr=subprocess.STDOUT)
if change_psk.decode("UTF-8") != "":
db.close()
try:
@@ -1100,9 +1376,9 @@ def save_peer_setting(config_name):
if allowed_ip == "":
allowed_ip = '""'
allowed_ip = allowed_ip.replace(" ", "")
- change_ip = subprocess.check_output(f"wg set {config_name} peer {id} allowed-ips {allowed_ip}",
- shell=True, stderr=subprocess.STDOUT)
- subprocess.check_output(f'wg-quick save {config_name}', shell=True, stderr=subprocess.STDOUT)
+ change_ip = subprocess.run(f"wg set {config_name} peer {id} allowed-ips {allowed_ip}",
+ shell=True, check=True, stderr=subprocess.STDOUT)
+ subprocess.run(f'wg-quick save {config_name}', shell=True, check=True, stderr=subprocess.STDOUT)
if change_ip.decode("UTF-8") != "":
db.close()
try:
@@ -1112,12 +1388,12 @@ def save_peer_setting(config_name):
return jsonify({"status": "failed", "msg": change_ip.decode("UTF-8")})
db.update(
{
- "name": name,
- "private_key": private_key,
- "DNS": dns_addresses,
- "endpoint_allowed_ip": endpoint_allowed_ip,
- "mtu": data['MTU'],
- "keepalive":data['keep_alive'], "preshared_key": preshared_key
+ "name": name,
+ "private_key": private_key,
+ "DNS": dns_addresses,
+ "endpoint_allowed_ip": endpoint_allowed_ip,
+ "mtu": data['MTU'],
+ "keepalive": data['keep_alive'], "preshared_key": preshared_key
}, peers.id == id)
db.close()
try:
@@ -1132,18 +1408,26 @@ def save_peer_setting(config_name):
except RuntimeError as e:
print("RuntimeError: cannot release un-acquired lock")
return jsonify({"status": "failed", "msg": str(exc.output.decode("UTF-8").strip())})
- else:
- db.close()
- try:
- sem.release()
- except RuntimeError as e:
- print("RuntimeError: cannot release un-acquired lock")
- return jsonify({"status": "failed", "msg": "This peer does not exist."})
+
+ db.close()
+ try:
+ sem.release()
+ except RuntimeError as e:
+ print("RuntimeError: cannot release un-acquired lock")
+ return jsonify({"status": "failed", "msg": "This peer does not exist."})
# Get peer settings
@app.route('/get_peer_data/', methods=['POST'])
def get_peer_name(config_name):
+ """Get peer settings.
+
+ :param config_name: Name of WG interface
+ :type config_name: str
+ :return: Return settings of peer
+ :rtype: TODO
+ """
+
data = request.get_json()
id = data['id']
sem.acquire(timeout=1)
@@ -1165,12 +1449,24 @@ def get_peer_name(config_name):
# Generate a private key
@app.route('/generate_peer', methods=['GET'])
def generate_peer():
+ """Generate the private key for peer.
+
+ :return: Return dict with private, public and preshared keys
+ :rtype: TODO
+ """
+
return jsonify(gen_private_key())
# Generate a public key from a private key
@app.route('/generate_public_key', methods=['POST'])
def generate_public_key():
+ """Generate the public key.
+
+ :return: Return dict with public key or error message
+ :rtype: TODO
+ """
+
data = request.get_json()
private_key = data['private_key']
return jsonify(gen_public_key(private_key))
@@ -1179,6 +1475,14 @@ def generate_public_key():
# Check if both key match
@app.route('/check_key_match/', methods=['POST'])
def check_key_match(config_name):
+ """TODO
+
+ :param config_name: Name of WG interface
+ :type config_name: str
+ :return: Return dictionary with status
+ :rtype: TODO
+ """
+
data = request.get_json()
private_key = data['private_key']
public_key = data['public_key']
@@ -1187,6 +1491,14 @@ def check_key_match(config_name):
@app.route("/qrcode/", methods=['GET'])
def generate_qrcode(config_name):
+ """generate the QR with peer configuration.
+
+ :param config_name: Name of WG interface
+ :type config_name: str
+ :return: TODO
+ :rtype: TODO
+ """
+
id = request.args.get('id')
sem.acquire(timeout=1)
db = TinyDB(os.path.join(DB_PATH, config_name + ".json"))
@@ -1224,24 +1536,35 @@ def generate_qrcode(config_name):
except RuntimeError as e:
print("RuntimeError: cannot release un-acquired lock")
- result = "[Interface]\nPrivateKey = "+conf['private_key']+"\nAddress = "+conf['allowed_ip']+"\nMTU = "+conf['mtu']+"\nDNS = "+conf['DNS']\
- +"\n\n[Peer]\nPublicKey = "+conf['public_key']+"\nAllowedIPs = "+conf['endpoint_allowed_ip']+"\nPersistentKeepalive = "+conf['keepalive']+"\nEndpoint = "+conf['endpoint']
+ result = "[Interface]\nPrivateKey = " + conf['private_key'] + "\nAddress = " + conf[
+ 'allowed_ip'] + "\nMTU = " + conf['mtu'] + "\nDNS = " + conf['DNS'] \
+ + "\n\n[Peer]\nPublicKey = " + conf['public_key'] + "\nAllowedIPs = " + conf[
+ 'endpoint_allowed_ip'] + "\nPersistentKeepalive = " + conf['keepalive'] + "\nEndpoint = " + \
+ conf['endpoint']
if preshared_key != "":
- result += "\nPresharedKey = "+preshared_key
+ result += "\nPresharedKey = " + preshared_key
return render_template("qrcode.html", i=result)
- else:
- db.close()
- try:
- sem.release()
- except RuntimeError as e:
- print("RuntimeError: cannot release un-acquired lock")
- return redirect("/configuration/" + config_name)
+
+ db.close()
+ try:
+ sem.release()
+ except RuntimeError as e:
+ print("RuntimeError: cannot release un-acquired lock")
+ return redirect("/configuration/" + config_name)
# Download configuration file
@app.route('/download/', methods=['GET'])
def download(config_name):
+ """Download client configuration file.
+
+ :param config_name: Name of WG interface
+ :type config_name: str
+ :return: TODO
+ :rtype: TODO
+ """
+
print(request.headers.get('User-Agent'))
id = request.args.get('id')
sem.acquire(timeout=1)
@@ -1279,7 +1602,7 @@ def download(config_name):
filename = filename + "_" + config_name
psk = ""
if preshared_key != "":
- psk = "\nPresharedKey = "+preshared_key
+ psk = "\nPresharedKey = " + preshared_key
db.close()
try:
sem.release()
@@ -1289,7 +1612,8 @@ def download(config_name):
dns_addresses + "\nMTU = " + mtu_value + "\n\n[Peer]\nPublicKey = " + \
public_key + "\nAllowedIPs = " + endpoint_allowed_ip + "\nEndpoint = " + \
endpoint + "\nPersistentKeepalive = " + keepalive + psk
- return app.response_class((yield result), mimetype='text/conf', headers={"Content-Disposition": "attachment;filename=" + filename + ".conf"})
+ return app.response_class((yield result), mimetype='text/conf',
+ headers={"Content-Disposition": "attachment;filename=" + filename + ".conf"})
db.close()
return redirect("/configuration/" + config_name)
@@ -1297,6 +1621,14 @@ def download(config_name):
# Switch peer display mode
@app.route('/switch_display_mode/', methods=['GET'])
def switch_display_mode(mode):
+ """Change display view style.
+
+ :param mode: Mode name
+ :type mode: str
+ :return: Return text with result
+ :rtype: str
+ """
+
if mode in ['list', 'grid']:
config = get_dashboard_conf()
config.set("Peers", "peer_display_mode", mode)
@@ -1314,6 +1646,12 @@ Dashboard Tools Related
# Get all IP for ping
@app.route('/get_ping_ip', methods=['POST'])
def get_ping_ip():
+ """Get ips for network testing.
+
+ :return: TODO
+ :rtype: TODO
+ """
+
config = request.form['config']
sem.acquire(timeout=1)
db = TinyDB(os.path.join(DB_PATH, config + ".json"))
@@ -1340,6 +1678,12 @@ def get_ping_ip():
# Ping IP
@app.route('/ping_ip', methods=['POST'])
def ping_ip():
+ """Execute ping command.
+
+ :return: Return text with result
+ :rtype: str
+ """
+
try:
result = ping('' + request.form['ip'] + '', count=int(request.form['count']), privileged=True, source=None)
returnjson = {
@@ -1362,6 +1706,12 @@ def ping_ip():
# Traceroute IP
@app.route('/traceroute_ip', methods=['POST'])
def traceroute_ip():
+ """Execute ping traceroute command.
+
+ :return: Return text with result
+ :rtype: str
+ """
+
try:
result = traceroute('' + request.form['ip'] + '', first_hop=1, max_hops=30, count=1, fast=True)
returnjson = []
@@ -1383,6 +1733,9 @@ Dashboard Initialization
def init_dashboard():
+ """Create dashboard default configuration.
+ """
+
# Set Default INI File
if not os.path.isfile(DASHBOARD_CONF):
conf_file = open(DASHBOARD_CONF, "w+")
@@ -1397,7 +1750,7 @@ def init_dashboard():
# Defualt dashboard server setting
if "Server" not in config:
config['Server'] = {}
- if 'wg_conf_path' not in config['Server']:
+ if 'wg_conf_path' not in config['Serxecute ping traceroute command.ver']:
config['Server']['wg_conf_path'] = '/etc/wireguard'
# TODO: IPv6 for the app IP might need to configure with Gunicorn...
if 'app_ip' not in config['Server']:
@@ -1432,8 +1785,10 @@ def init_dashboard():
def check_update():
- """
- Dashboard check update
+ """Dashboard check update
+
+ :return: Retunt text with result
+ :rtype: str
"""
config = get_dashboard_conf()
data = urllib.request.urlopen("https://api.github.com/repos/donaldzou/WGDashboard/releases").read()