erm
This commit is contained in:
@@ -28,12 +28,28 @@ ctrl_interface=DIR=/var/run/wpa_supplicant GROUP=netdev
|
||||
"""
|
||||
|
||||
|
||||
import logging
|
||||
import shlex
|
||||
|
||||
def get_logger():
|
||||
"""Returns a logger"""
|
||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
|
||||
return logging.getLogger()
|
||||
|
||||
|
||||
def run_subprocess(cmd, check=True, delay=0):
|
||||
cmd_split = cmd.split(" ")
|
||||
output = subprocess.run(cmd_split, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=check).stdout.decode("utf-8")
|
||||
if delay > 0:
|
||||
time.sleep(delay)
|
||||
return output
|
||||
"""Runs a subprocess command"""
|
||||
try:
|
||||
cmd_split = shlex.split(cmd)
|
||||
output = subprocess.run(
|
||||
cmd_split, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=check, text=True, encoding="utf-8"
|
||||
)
|
||||
if delay > 0:
|
||||
time.sleep(delay)
|
||||
return output.stdout
|
||||
except subprocess.CalledProcessError as e:
|
||||
get_logger().error("Subprocess command failed: %s", e)
|
||||
return None
|
||||
|
||||
|
||||
def run_subprocess_interface(cmd, check=True):
|
||||
@@ -112,38 +128,47 @@ def parse_iwlist(iwlist_output, current_sid):
|
||||
|
||||
def scan_networks():
|
||||
"""Scans for Networks"""
|
||||
output = run_subprocess("sudo iwgetid", check=False)
|
||||
output = run_subprocess("sudo iwgetid")
|
||||
if output is None:
|
||||
get_logger().error("Failed to get current network SSID.")
|
||||
current = ""
|
||||
else:
|
||||
current = output.partition("ESSID:")[2].strip().strip('"')
|
||||
|
||||
current = output.partition("ESSID:")[2].strip().strip('"')
|
||||
|
||||
worked = False
|
||||
while not worked:
|
||||
try:
|
||||
output = run_subprocess("sudo iwlist {} scan".format(ClientInterface))
|
||||
worked = True
|
||||
except:
|
||||
try:
|
||||
output = run_subprocess("sudo ifup {}".format(ClientInterface))
|
||||
output = None
|
||||
while output is None:
|
||||
output = run_subprocess(f"sudo iwlist {ClientInterface} scan")
|
||||
if output is None:
|
||||
get_logger().info("Failed to scan networks, trying to bring interface up.")
|
||||
ifup_output = run_subprocess(f"sudo ifup {ClientInterface}")
|
||||
if ifup_output is None:
|
||||
get_logger().error("Failed to bring interface up.")
|
||||
# Avoid a tight loop if `ifup` fails
|
||||
time.sleep(5)
|
||||
else:
|
||||
time.sleep(1)
|
||||
except:
|
||||
pass
|
||||
scan = parse_iwlist(output, current)
|
||||
|
||||
scan = parse_iwlist(output, current)
|
||||
return scan
|
||||
|
||||
|
||||
def connect_network(ssid, security, password):
|
||||
"""Connects to a network"""
|
||||
with open("/etc/network/interfaces", "r") as source:
|
||||
lines = source.readlines()
|
||||
with open("/etc/network/interfaces", "w") as source:
|
||||
for line in lines:
|
||||
source.write(
|
||||
re.sub(r"^iface {} inet manual".format(ClientInterface), "iface {} inet dhcp".format(ClientInterface), line)
|
||||
re.sub(r"^iface {} inet manual".format(ClientInterface), f"iface {ClientInterface} inet dhcp", line)
|
||||
)
|
||||
|
||||
run_subprocess_interface("sudo /usr/sbin/ifdown {}")
|
||||
if run_subprocess_interface("sudo /usr/sbin/ifdown {}") is None:
|
||||
get_logger().error("Failed to bring interface down.")
|
||||
return False
|
||||
|
||||
run_subprocess("sudo /usr/bin/systemctl stop wpa_supplicant")
|
||||
if run_subprocess("sudo /usr/bin/systemctl stop wpa_supplicant") is None:
|
||||
get_logger().error("Failed to stop wpa_supplicant.")
|
||||
return False
|
||||
|
||||
with open("/etc/wpa_supplicant/wpa_supplicant.conf", "wt") as f:
|
||||
if security == "WPA2":
|
||||
@@ -151,96 +176,124 @@ def connect_network(ssid, security, password):
|
||||
else:
|
||||
f.write(wpafile_nowpa.format(ssid))
|
||||
|
||||
try:
|
||||
run_subprocess("sudo /usr/bin/systemctl start wpa_supplicant")
|
||||
run_subprocess_interface("sudo /usr/sbin/ifup {}")
|
||||
except:
|
||||
if run_subprocess("sudo /usr/bin/systemctl start wpa_supplicant") is None:
|
||||
get_logger().error("Failed to start wpa_supplicant.")
|
||||
return False
|
||||
|
||||
if run_subprocess_interface("sudo /usr/sbin/ifup {}") is None:
|
||||
get_logger().error("Failed to bring interface up.")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def disconnect_network():
|
||||
run_subprocess_interface("sudo /usr/sbin/ifdown {}")
|
||||
"""Disconnects from a network"""
|
||||
if run_subprocess_interface("sudo /usr/sbin/ifdown {}") is None:
|
||||
get_logger().error("Failed to bring interface down.")
|
||||
return False
|
||||
|
||||
with open("/etc/wpa_supplicant/wpa_supplicant.conf", "wt") as f:
|
||||
f.write(wpafile_none)
|
||||
|
||||
run_subprocess("sudo /usr/bin/systemctl stop wpa_supplicant")
|
||||
if run_subprocess("sudo /usr/bin/systemctl stop wpa_supplicant") is None:
|
||||
get_logger().error("Failed to stop wpa_supplicant.")
|
||||
return False
|
||||
|
||||
with open("/etc/network/interfaces", "r") as sources:
|
||||
lines = sources.readlines()
|
||||
with open("/etc/network/interfaces", "w") as sources:
|
||||
for line in lines:
|
||||
sources.write(
|
||||
re.sub(r"^iface {} inet dhcp".format(ClientInterface), "iface {} inet manual".format(ClientInterface), line)
|
||||
re.sub(r"^iface {} inet dhcp".format(ClientInterface), f"iface {ClientInterface} inet manual", line)
|
||||
)
|
||||
|
||||
try:
|
||||
run_subprocess("sudo /usr/bin/systemctl start wpa_supplicant")
|
||||
run_subprocess_interface("sudo /usr/sbin/ifup {}")
|
||||
except:
|
||||
if run_subprocess("sudo /usr/bin/systemctl start wpa_supplicant") is None:
|
||||
get_logger().error("Failed to start wpa_supplicant.")
|
||||
return False
|
||||
|
||||
if run_subprocess_interface("sudo /usr/sbin/ifup {}") is None:
|
||||
get_logger().error("Failed to bring interface up.")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def cleanup_network():
|
||||
run_subprocess_interface("sudo /usr/sbin/ifdown {}")
|
||||
"""Cleans up the network"""
|
||||
if run_subprocess_interface("sudo /usr/sbin/ifdown {}") is None:
|
||||
get_logger().error("Failed to bring interface down.")
|
||||
return False
|
||||
|
||||
with open("/etc/wpa_supplicant/wpa_supplicant.conf", "wt") as f:
|
||||
f.write(wpafile_none)
|
||||
|
||||
run_subprocess("sudo /usr/bin/systemctl stop wpa_supplicant")
|
||||
if run_subprocess("sudo /usr/bin/systemctl stop wpa_supplicant") is None:
|
||||
get_logger().error("Failed to stop wpa_supplicant.")
|
||||
return False
|
||||
|
||||
with open("/etc/network/interfaces", "r") as sources:
|
||||
lines = sources.readlines()
|
||||
with open("/etc/network/interfaces", "w") as sources:
|
||||
for line in lines:
|
||||
sources.write(
|
||||
re.sub(r"^iface {} inet dhcp".format(ClientInterface), "iface {} inet manual".format(ClientInterface), line)
|
||||
re.sub(r"^iface {} inet dhcp".format(ClientInterface), f"iface {ClientInterface} inet manual", line)
|
||||
)
|
||||
|
||||
try:
|
||||
run_subprocess("sudo /usr/bin/systemctl start wpa_supplicant")
|
||||
run_subprocess_interface("sudo /usr/sbin/ifup {}")
|
||||
except:
|
||||
if run_subprocess("sudo /usr/bin/systemctl start wpa_supplicant") is None:
|
||||
get_logger().error("Failed to start wpa_supplicant.")
|
||||
return False
|
||||
|
||||
if run_subprocess_interface("sudo /usr/sbin/ifup {}") is None:
|
||||
get_logger().error("Failed to bring interface up.")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def vpn_connected():
|
||||
if run_subprocess("nordvpn status").find("Disconnected") != -1:
|
||||
"""Checks if the VPN is connected"""
|
||||
output = run_subprocess("nordvpn status")
|
||||
if output is None:
|
||||
return False
|
||||
return True
|
||||
return "Disconnected" not in output
|
||||
|
||||
|
||||
def vpn_connect():
|
||||
if run_subprocess("nordvpn c").find("connected to") != -1:
|
||||
return True
|
||||
return False
|
||||
"""Connects to the VPN"""
|
||||
output = run_subprocess("nordvpn c")
|
||||
if output is None:
|
||||
return False
|
||||
return "connected to" in output
|
||||
|
||||
|
||||
def vpn_disconnect():
|
||||
if run_subprocess("nordvpn d").find("You are disconnected from NordVPN") != -1:
|
||||
return True
|
||||
return False
|
||||
"""Disconnects from the VPN"""
|
||||
output = run_subprocess("nordvpn d")
|
||||
if output is None:
|
||||
return False
|
||||
return "You are disconnected from NordVPN" in output
|
||||
|
||||
|
||||
def killswitch_status():
|
||||
if run_subprocess("nordvpn settings").find("Kill Switch: disabled") != -1:
|
||||
return False
|
||||
return True
|
||||
"""Checks the status of the killswitch"""
|
||||
output = run_subprocess("nordvpn settings")
|
||||
if output is None:
|
||||
return True # Assume enabled for safety
|
||||
return "Kill Switch: disabled" not in output
|
||||
|
||||
|
||||
def killswich_enable():
|
||||
if run_subprocess("nordvpn set killswitch on", delay=2).find("Kill Switch is set to 'enabled' successfully") != -1:
|
||||
return True
|
||||
return False
|
||||
"""Enables the killswitch"""
|
||||
output = run_subprocess("nordvpn set killswitch on", delay=2)
|
||||
if output is None:
|
||||
return False
|
||||
return "Kill Switch is set to 'enabled' successfully" in output
|
||||
|
||||
|
||||
def killswich_disaable():
|
||||
if run_subprocess("nordvpn set killswitch off", delay=2).find("Kill Switch is set to 'disabled' successfully") != -1:
|
||||
return True
|
||||
return False
|
||||
"""Disables the killswitch"""
|
||||
output = run_subprocess("nordvpn set killswitch off", delay=2)
|
||||
if output is None:
|
||||
return False
|
||||
return "Kill Switch is set to 'disabled' successfully" in output
|
||||
|
||||
Reference in New Issue
Block a user