import re import subprocess import time from config import ClientInterface wpafile_wpa = """country=GB # Your 2-digit country code ctrl_interface=DIR=/var/run/wpa_supplicant GROUP=netdev network={{ ssid="{}" psk="{}" key_mgmt=WPA-PSK }} """ wpafile_nowpa = """country=GB # Your 2-digit country code ctrl_interface=DIR=/var/run/wpa_supplicant GROUP=netdev network={{ ssid="{}" key_mgmt=NONE }} """ wpafile_none = """country=GB # Your 2-digit country code ctrl_interface=DIR=/var/run/wpa_supplicant GROUP=netdev """ import logging import shlex def get_logger(): """Returns a logger""" logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") return logging.getLogger() def run_subprocess(cmd, check=True, delay=0): """Runs a subprocess command""" try: cmd_split = shlex.split(cmd) output = subprocess.run( cmd_split, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=check, text=True, encoding="utf-8" ) if delay > 0: time.sleep(delay) return output.stdout except subprocess.CalledProcessError as e: get_logger().error("Subprocess command failed: %s", e) return None def run_subprocess_interface(cmd, check=True): return run_subprocess(cmd.format(ClientInterface), check) def parse_iwlist(iwlist_output, current_sid): """Parses iwlist scan output Args: iwlist_output (string): iwlist scan output current (string): Current Connected Sidd Returns: [dictionary]: Dictionary containing relavent data """ data = [] cell = [] for line in iwlist_output.splitlines(): if line.find(" Cell ") != -1 and cell != []: data.append(cell) cell = [] elif line.find("Scan completed :") > 0: pass else: cell.append(line) try: del data[0][0] except: pass cells = [] try: for item in data: cell = {} for line in item: line = line.strip() if line.find("ESSID:") != -1: if line.partition("ESSID:")[2].strip('"') != "": cell["SSID"] = line.partition("ESSID:")[2].strip('"') else: cell["SSID"] = "!***!" if cell["SSID"] == current_sid: cell["Connected"] = "Yes" else: cell["Connected"] = "" if line.partition("Quality=")[2].split("/")[0] != "" != -1: cell["Signal"] = line.partition("Quality=")[2].split("/")[0] + "/70" if line.find("Encryption key:") != -1: if line.find(":on") != -1: cell["WPA"] = "WPA2" else: cell["WPA"] = "None" cells.append(cell) except: cells = [] # Remove Blank SSID's cells = [cell for cell in cells if cell["SSID"] != "!***!" and cell["SSID"][0] != "\x00" and cell["SSID"][0:4] != "\\x00"] # Remove Blank SSID's cells = sorted(cells, key=lambda k: k["Signal"], reverse=True) seen = set() new_cells = [] for d in cells: t = tuple(d["SSID"]) if t not in seen: seen.add(t) new_cells.append(d) return new_cells def scan_networks(): """Scans for Networks""" output = run_subprocess("sudo iwgetid") if output is None: get_logger().error("Failed to get current network SSID.") current = "" else: current = output.partition("ESSID:")[2].strip().strip('"') output = None while output is None: output = run_subprocess(f"sudo iwlist {ClientInterface} scan") if output is None: get_logger().info("Failed to scan networks, trying to bring interface up.") ifup_output = run_subprocess(f"sudo ifup {ClientInterface}") if ifup_output is None: get_logger().error("Failed to bring interface up.") # Avoid a tight loop if `ifup` fails time.sleep(5) else: time.sleep(1) scan = parse_iwlist(output, current) return scan def connect_network(ssid, security, password): """Connects to a network""" with open("/etc/network/interfaces", "r") as source: lines = source.readlines() with open("/etc/network/interfaces", "w") as source: for line in lines: source.write( re.sub(r"^iface {} inet manual".format(ClientInterface), f"iface {ClientInterface} inet dhcp", line) ) if run_subprocess_interface("sudo /usr/sbin/ifdown {}") is None: get_logger().error("Failed to bring interface down.") return False if run_subprocess("sudo /usr/bin/systemctl stop wpa_supplicant") is None: get_logger().error("Failed to stop wpa_supplicant.") return False with open("/etc/wpa_supplicant/wpa_supplicant.conf", "wt") as f: if security == "WPA2": f.write(wpafile_wpa.format(ssid, password)) else: f.write(wpafile_nowpa.format(ssid)) if run_subprocess("sudo /usr/bin/systemctl start wpa_supplicant") is None: get_logger().error("Failed to start wpa_supplicant.") return False if run_subprocess_interface("sudo /usr/sbin/ifup {}") is None: get_logger().error("Failed to bring interface up.") return False return True def disconnect_network(): """Disconnects from a network""" if run_subprocess_interface("sudo /usr/sbin/ifdown {}") is None: get_logger().error("Failed to bring interface down.") return False with open("/etc/wpa_supplicant/wpa_supplicant.conf", "wt") as f: f.write(wpafile_none) if run_subprocess("sudo /usr/bin/systemctl stop wpa_supplicant") is None: get_logger().error("Failed to stop wpa_supplicant.") return False with open("/etc/network/interfaces", "r") as sources: lines = sources.readlines() with open("/etc/network/interfaces", "w") as sources: for line in lines: sources.write( re.sub(r"^iface {} inet dhcp".format(ClientInterface), f"iface {ClientInterface} inet manual", line) ) if run_subprocess("sudo /usr/bin/systemctl start wpa_supplicant") is None: get_logger().error("Failed to start wpa_supplicant.") return False if run_subprocess_interface("sudo /usr/sbin/ifup {}") is None: get_logger().error("Failed to bring interface up.") return False return True def cleanup_network(): """Cleans up the network""" if run_subprocess_interface("sudo /usr/sbin/ifdown {}") is None: get_logger().error("Failed to bring interface down.") return False with open("/etc/wpa_supplicant/wpa_supplicant.conf", "wt") as f: f.write(wpafile_none) if run_subprocess("sudo /usr/bin/systemctl stop wpa_supplicant") is None: get_logger().error("Failed to stop wpa_supplicant.") return False with open("/etc/network/interfaces", "r") as sources: lines = sources.readlines() with open("/etc/network/interfaces", "w") as sources: for line in lines: sources.write( re.sub(r"^iface {} inet dhcp".format(ClientInterface), f"iface {ClientInterface} inet manual", line) ) if run_subprocess("sudo /usr/bin/systemctl start wpa_supplicant") is None: get_logger().error("Failed to start wpa_supplicant.") return False if run_subprocess_interface("sudo /usr/sbin/ifup {}") is None: get_logger().error("Failed to bring interface up.") return False return True def vpn_connected(): """Checks if the VPN is connected""" output = run_subprocess("nordvpn status") if output is None: return False return "Disconnected" not in output def vpn_connect(): """Connects to the VPN""" output = run_subprocess("nordvpn c") if output is None: return False return "connected to" in output def vpn_disconnect(): """Disconnects from the VPN""" output = run_subprocess("nordvpn d") if output is None: return False return "You are disconnected from NordVPN" in output def killswitch_status(): """Checks the status of the killswitch""" output = run_subprocess("nordvpn settings") if output is None: return True # Assume enabled for safety return "Kill Switch: disabled" not in output def killswich_enable(): """Enables the killswitch""" output = run_subprocess("nordvpn set killswitch on", delay=2) if output is None: return False return "Kill Switch is set to 'enabled' successfully" in output def killswich_disaable(): """Disables the killswitch""" output = run_subprocess("nordvpn set killswitch off", delay=2) if output is None: return False return "Kill Switch is set to 'disabled' successfully" in output