300 lines
8.9 KiB
Python
300 lines
8.9 KiB
Python
import re
|
|
import subprocess
|
|
import time
|
|
|
|
from config import ClientInterface
|
|
|
|
wpafile_wpa = """country=GB # Your 2-digit country code
|
|
ctrl_interface=DIR=/var/run/wpa_supplicant GROUP=netdev
|
|
|
|
network={{
|
|
ssid="{}"
|
|
psk="{}"
|
|
key_mgmt=WPA-PSK
|
|
}}
|
|
"""
|
|
|
|
wpafile_nowpa = """country=GB # Your 2-digit country code
|
|
ctrl_interface=DIR=/var/run/wpa_supplicant GROUP=netdev
|
|
|
|
network={{
|
|
ssid="{}"
|
|
key_mgmt=NONE
|
|
}}
|
|
"""
|
|
|
|
wpafile_none = """country=GB # Your 2-digit country code
|
|
ctrl_interface=DIR=/var/run/wpa_supplicant GROUP=netdev
|
|
"""
|
|
|
|
|
|
import logging
|
|
import shlex
|
|
|
|
def get_logger():
|
|
"""Returns a logger"""
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
|
|
return logging.getLogger()
|
|
|
|
|
|
def run_subprocess(cmd, check=True, delay=0):
|
|
"""Runs a subprocess command"""
|
|
try:
|
|
cmd_split = shlex.split(cmd)
|
|
output = subprocess.run(
|
|
cmd_split, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=check, text=True, encoding="utf-8"
|
|
)
|
|
if delay > 0:
|
|
time.sleep(delay)
|
|
return output.stdout
|
|
except subprocess.CalledProcessError as e:
|
|
get_logger().error("Subprocess command failed: %s", e)
|
|
return None
|
|
|
|
|
|
def run_subprocess_interface(cmd, check=True):
|
|
return run_subprocess(cmd.format(ClientInterface), check)
|
|
|
|
|
|
def parse_iwlist(iwlist_output, current_sid):
|
|
"""Parses iwlist scan output
|
|
|
|
Args:
|
|
iwlist_output (string): iwlist scan output
|
|
current (string): Current Connected Sidd
|
|
|
|
Returns:
|
|
[dictionary]: Dictionary containing relavent data
|
|
"""
|
|
data = []
|
|
cell = []
|
|
for line in iwlist_output.splitlines():
|
|
if line.find(" Cell ") != -1 and cell != []:
|
|
data.append(cell)
|
|
cell = []
|
|
elif line.find("Scan completed :") > 0:
|
|
pass
|
|
else:
|
|
cell.append(line)
|
|
|
|
try:
|
|
del data[0][0]
|
|
except:
|
|
pass
|
|
|
|
cells = []
|
|
|
|
try:
|
|
for item in data:
|
|
cell = {}
|
|
for line in item:
|
|
line = line.strip()
|
|
if line.find("ESSID:") != -1:
|
|
if line.partition("ESSID:")[2].strip('"') != "":
|
|
cell["SSID"] = line.partition("ESSID:")[2].strip('"')
|
|
else:
|
|
cell["SSID"] = "!***!"
|
|
if cell["SSID"] == current_sid:
|
|
cell["Connected"] = "Yes"
|
|
else:
|
|
cell["Connected"] = ""
|
|
if line.partition("Quality=")[2].split("/")[0] != "" != -1:
|
|
cell["Signal"] = line.partition("Quality=")[2].split("/")[0] + "/70"
|
|
if line.find("Encryption key:") != -1:
|
|
if line.find(":on") != -1:
|
|
cell["WPA"] = "WPA2"
|
|
else:
|
|
cell["WPA"] = "None"
|
|
|
|
cells.append(cell)
|
|
except:
|
|
cells = []
|
|
|
|
# Remove Blank SSID's
|
|
cells = [cell for cell in cells if cell["SSID"] != "!***!" and cell["SSID"][0] != "\x00" and cell["SSID"][0:4] != "\\x00"]
|
|
# Remove Blank SSID's
|
|
cells = sorted(cells, key=lambda k: k["Signal"], reverse=True)
|
|
|
|
seen = set()
|
|
new_cells = []
|
|
for d in cells:
|
|
t = tuple(d["SSID"])
|
|
if t not in seen:
|
|
seen.add(t)
|
|
new_cells.append(d)
|
|
|
|
return new_cells
|
|
|
|
|
|
def scan_networks():
|
|
"""Scans for Networks"""
|
|
output = run_subprocess("sudo iwgetid")
|
|
if output is None:
|
|
get_logger().error("Failed to get current network SSID.")
|
|
current = ""
|
|
else:
|
|
current = output.partition("ESSID:")[2].strip().strip('"')
|
|
|
|
output = None
|
|
while output is None:
|
|
output = run_subprocess(f"sudo iwlist {ClientInterface} scan")
|
|
if output is None:
|
|
get_logger().info("Failed to scan networks, trying to bring interface up.")
|
|
ifup_output = run_subprocess(f"sudo ifup {ClientInterface}")
|
|
if ifup_output is None:
|
|
get_logger().error("Failed to bring interface up.")
|
|
# Avoid a tight loop if `ifup` fails
|
|
time.sleep(5)
|
|
else:
|
|
time.sleep(1)
|
|
|
|
scan = parse_iwlist(output, current)
|
|
return scan
|
|
|
|
|
|
def connect_network(ssid, security, password):
|
|
"""Connects to a network"""
|
|
with open("/etc/network/interfaces", "r") as source:
|
|
lines = source.readlines()
|
|
with open("/etc/network/interfaces", "w") as source:
|
|
for line in lines:
|
|
source.write(
|
|
re.sub(r"^iface {} inet manual".format(ClientInterface), f"iface {ClientInterface} inet dhcp", line)
|
|
)
|
|
|
|
if run_subprocess_interface("sudo /usr/sbin/ifdown {}") is None:
|
|
get_logger().error("Failed to bring interface down.")
|
|
return False
|
|
|
|
if run_subprocess("sudo /usr/bin/systemctl stop wpa_supplicant") is None:
|
|
get_logger().error("Failed to stop wpa_supplicant.")
|
|
return False
|
|
|
|
with open("/etc/wpa_supplicant/wpa_supplicant.conf", "wt") as f:
|
|
if security == "WPA2":
|
|
f.write(wpafile_wpa.format(ssid, password))
|
|
else:
|
|
f.write(wpafile_nowpa.format(ssid))
|
|
|
|
if run_subprocess("sudo /usr/bin/systemctl start wpa_supplicant") is None:
|
|
get_logger().error("Failed to start wpa_supplicant.")
|
|
return False
|
|
|
|
if run_subprocess_interface("sudo /usr/sbin/ifup {}") is None:
|
|
get_logger().error("Failed to bring interface up.")
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def disconnect_network():
|
|
"""Disconnects from a network"""
|
|
if run_subprocess_interface("sudo /usr/sbin/ifdown {}") is None:
|
|
get_logger().error("Failed to bring interface down.")
|
|
return False
|
|
|
|
with open("/etc/wpa_supplicant/wpa_supplicant.conf", "wt") as f:
|
|
f.write(wpafile_none)
|
|
|
|
if run_subprocess("sudo /usr/bin/systemctl stop wpa_supplicant") is None:
|
|
get_logger().error("Failed to stop wpa_supplicant.")
|
|
return False
|
|
|
|
with open("/etc/network/interfaces", "r") as sources:
|
|
lines = sources.readlines()
|
|
with open("/etc/network/interfaces", "w") as sources:
|
|
for line in lines:
|
|
sources.write(
|
|
re.sub(r"^iface {} inet dhcp".format(ClientInterface), f"iface {ClientInterface} inet manual", line)
|
|
)
|
|
|
|
if run_subprocess("sudo /usr/bin/systemctl start wpa_supplicant") is None:
|
|
get_logger().error("Failed to start wpa_supplicant.")
|
|
return False
|
|
|
|
if run_subprocess_interface("sudo /usr/sbin/ifup {}") is None:
|
|
get_logger().error("Failed to bring interface up.")
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def cleanup_network():
|
|
"""Cleans up the network"""
|
|
if run_subprocess_interface("sudo /usr/sbin/ifdown {}") is None:
|
|
get_logger().error("Failed to bring interface down.")
|
|
return False
|
|
|
|
with open("/etc/wpa_supplicant/wpa_supplicant.conf", "wt") as f:
|
|
f.write(wpafile_none)
|
|
|
|
if run_subprocess("sudo /usr/bin/systemctl stop wpa_supplicant") is None:
|
|
get_logger().error("Failed to stop wpa_supplicant.")
|
|
return False
|
|
|
|
with open("/etc/network/interfaces", "r") as sources:
|
|
lines = sources.readlines()
|
|
with open("/etc/network/interfaces", "w") as sources:
|
|
for line in lines:
|
|
sources.write(
|
|
re.sub(r"^iface {} inet dhcp".format(ClientInterface), f"iface {ClientInterface} inet manual", line)
|
|
)
|
|
|
|
if run_subprocess("sudo /usr/bin/systemctl start wpa_supplicant") is None:
|
|
get_logger().error("Failed to start wpa_supplicant.")
|
|
return False
|
|
|
|
if run_subprocess_interface("sudo /usr/sbin/ifup {}") is None:
|
|
get_logger().error("Failed to bring interface up.")
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def vpn_connected():
|
|
"""Checks if the VPN is connected"""
|
|
output = run_subprocess("nordvpn status")
|
|
if output is None:
|
|
return False
|
|
return "Disconnected" not in output
|
|
|
|
|
|
def vpn_connect():
|
|
"""Connects to the VPN"""
|
|
output = run_subprocess("nordvpn c")
|
|
if output is None:
|
|
return False
|
|
return "connected to" in output
|
|
|
|
|
|
def vpn_disconnect():
|
|
"""Disconnects from the VPN"""
|
|
output = run_subprocess("nordvpn d")
|
|
if output is None:
|
|
return False
|
|
return "You are disconnected from NordVPN" in output
|
|
|
|
|
|
def killswitch_status():
|
|
"""Checks the status of the killswitch"""
|
|
output = run_subprocess("nordvpn settings")
|
|
if output is None:
|
|
return True # Assume enabled for safety
|
|
return "Kill Switch: disabled" not in output
|
|
|
|
|
|
def killswich_enable():
|
|
"""Enables the killswitch"""
|
|
output = run_subprocess("nordvpn set killswitch on", delay=2)
|
|
if output is None:
|
|
return False
|
|
return "Kill Switch is set to 'enabled' successfully" in output
|
|
|
|
|
|
def killswich_disaable():
|
|
"""Disables the killswitch"""
|
|
output = run_subprocess("nordvpn set killswitch off", delay=2)
|
|
if output is None:
|
|
return False
|
|
return "Kill Switch is set to 'disabled' successfully" in output
|