323 lines
12 KiB
Python
323 lines
12 KiB
Python
import re
|
|
import subprocess
|
|
|
|
from config import RPI, ClientInterface, Debug
|
|
from flask import flash, redirect, render_template, url_for
|
|
from flask_login import current_user, login_required, login_user, logout_user
|
|
|
|
from app import app
|
|
from app.forms import LoginForm, WPAForm
|
|
from app.models import Passwords, User
|
|
|
|
CMD_SCAN = "sudo nmcli -t -f SSID,SIGNAL,IN-USE,SECURITY -e yes -m tab device wifi list ifname wlan1 --rescan yes"
|
|
CMD_JOIN = "sudo nmcli device wifi connect *SSID* ifname wlan1"
|
|
CMD_JOINPW = "sudo nmcli device wifi connect *SSID* ifname wlan1 password *PASSWORD*"
|
|
CMD_DISCONNECT = "sudo nmcli device disconnect wlan1"
|
|
# Error: Connection activation failed: (7) Secrets were required, but not provided.
|
|
# Device 'wlxf81a6719febb' successfully activated with '11111-1111-11111-111111-11111111'
|
|
|
|
|
|
def parse_iwlist(iwlist_output, current):
|
|
data = []
|
|
cell = []
|
|
for line in iwlist_output.splitlines():
|
|
if line.find(" Cell ") != -1 and cell != []:
|
|
data.append(cell)
|
|
cell = []
|
|
elif line.find("Scan completed :") > 0:
|
|
pass
|
|
else:
|
|
cell.append(line)
|
|
|
|
try:
|
|
del data[0][0]
|
|
except:
|
|
pass
|
|
|
|
cells = []
|
|
|
|
for a in data:
|
|
cell = {}
|
|
for line in a:
|
|
line = line.strip()
|
|
if line.find("ESSID:") != -1:
|
|
if line.partition("ESSID:")[2].strip('"') != "":
|
|
cell["SSID"] = line.partition("ESSID:")[2].strip('"')
|
|
if cell["SSID"] == current:
|
|
cell["Connected"] = "Yes"
|
|
else:
|
|
cell["Connected"] = ""
|
|
if line.partition("Signal level=")[2].split("/")[0] != "" != -1:
|
|
cell["Signal"] = line.partition("Signal level=")[2].split("/")[0]
|
|
if line.find("Encryption key:") != -1:
|
|
if line.find(":on") != -1:
|
|
cell["WPA"] = "WPA2"
|
|
else:
|
|
cell["WPA"] = "None"
|
|
cells.append(cell)
|
|
|
|
return cells
|
|
|
|
|
|
def scan_networks():
|
|
scan = []
|
|
|
|
if not RPI:
|
|
output = subprocess.run(CMD_SCAN.split(" "), stdout=subprocess.PIPE).stdout.decode("utf-8")
|
|
for line in output.splitlines():
|
|
t = line.split(":")
|
|
if t[0] != "":
|
|
scan.append(line)
|
|
else:
|
|
output = subprocess.run(["sudo", "iwgetid"], stdout=subprocess.PIPE).stdout.decode("utf-8")
|
|
current = output.partition("ESSID:")[2].strip().strip('"')
|
|
|
|
output = subprocess.run(["sudo", "iwlist", ClientInterface, "scan"], stdout=subprocess.PIPE).stdout.decode("utf-8")
|
|
scan = parse_iwlist(output, current)
|
|
# a = 0
|
|
# scan = []
|
|
# for line in output.splitlines():
|
|
# if a == 0:
|
|
# connected = " "
|
|
# if line.partition("ESSID:")[2].strip('"') != "":
|
|
# a = 1
|
|
# ssid = line.partition("ESSID:")[2].strip('"')
|
|
# if ssid == current:
|
|
# connected = "*"
|
|
# if a == 1:
|
|
# if line.find("Encryption key:off") != -1:
|
|
# password = ""
|
|
# a = 2
|
|
# elif line.find("Encryption key:on") != -1:
|
|
# password = "WPA2"
|
|
# a = 2
|
|
# if a == 2:
|
|
# if line.partition("Signal level=")[2].split("/")[0] != "":
|
|
# signal = line.partition("Signal level=")[2].split("/")[0]
|
|
# a = 0
|
|
# scan.append(ssid + ":" + signal + ":" + connected + ":" + password)
|
|
return scan
|
|
|
|
|
|
@app.route("/")
|
|
@app.route("/index")
|
|
@login_required
|
|
def index():
|
|
results = []
|
|
scan = ["rpi:100: :WPA2", "Home:94:*:WPA2", "HOME2:48: :WPA2", "BT:23: :"]
|
|
if not Debug:
|
|
scan = scan_networks()
|
|
|
|
# for network in scan:
|
|
# item = [
|
|
# network.split(":", maxsplit=1)[0],
|
|
# network.split(":")[1],
|
|
# "Yes" if network.split(":")[2] == "*" else "",
|
|
# "None" if network.split(":")[3].strip() == "" else "WPA2",
|
|
# ]
|
|
# results.append(item)
|
|
|
|
item = []
|
|
for network in scan:
|
|
item = [network["SSID"], network["Signal"], network["Connected"], network["WPA"]]
|
|
results.append(item)
|
|
|
|
# table = Networks(results)
|
|
# table.border = True
|
|
t = {}
|
|
for a in results:
|
|
t["ssid"] = a[0]
|
|
|
|
return render_template("index.html", index_table=results)
|
|
|
|
|
|
@app.route("/login", methods=["GET", "POST"])
|
|
def login():
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for("index"))
|
|
form = LoginForm()
|
|
if form.validate_on_submit():
|
|
user = User.query.filter_by(username=form.username.data).first()
|
|
if user is None or not user.check_password(form.password.data):
|
|
flash("Invalid username or password")
|
|
return redirect(url_for("login"))
|
|
login_user(user)
|
|
return redirect(url_for("index"))
|
|
return render_template("login.html", title="Sign In", form=form)
|
|
|
|
|
|
@app.route("/logout")
|
|
def logout():
|
|
logout_user()
|
|
return redirect(url_for("index"))
|
|
|
|
|
|
@app.route("/wpa/<ssid>", methods=["GET", "POST"])
|
|
@login_required
|
|
def wpa(ssid):
|
|
# wlan0: flags=4098<BROADCAST,MULTICAST> mtu 1500
|
|
# wlan1: flags=4163<UP,BROADCAST,RUNNING,MULTICAST> mtu 1500
|
|
if not RPI:
|
|
form = WPAForm()
|
|
if form.validate_on_submit():
|
|
cmd = CMD_JOINPW.replace("*SSID*", ssid)
|
|
cmd = cmd.replace("*PASSWORD*", form.password.data)
|
|
|
|
output = subprocess.run(cmd.split(" "), stdout=subprocess.PIPE, stderr=subprocess.PIPE).stdout.decode("utf-8")
|
|
if output.find("Error") == -1 and output != "":
|
|
return render_template("message.html", message="Successfully connected to {}".format(ssid))
|
|
|
|
return render_template("message.html", message="Failed to connect to {}".format(ssid))
|
|
|
|
return render_template("wpa.html", title="WPA Password", form=form)
|
|
else:
|
|
form = WPAForm()
|
|
if form.validate_on_submit():
|
|
wpafile = """country=GB # Your 2-digit country code
|
|
ctrl_interface=DIR=/var/run/wpa_supplicant GROUP=netdev
|
|
network={{
|
|
ssid="{}"
|
|
psk="{}"
|
|
key_mgmt=WPA-PSK
|
|
}}
|
|
""".format(
|
|
ssid, form.password.data
|
|
)
|
|
|
|
with open("/etc/network/interfaces", "r") as sources:
|
|
lines = sources.readlines()
|
|
with open("/etc/network/interfaces", "w") as sources:
|
|
for line in lines:
|
|
sources.write(re.sub(r"^iface wlan0 inet manual", "iface wlan0 inet dhcp", line))
|
|
|
|
output = subprocess.run(
|
|
["sudo", "/usr/sbin/ifdown", ClientInterface], stdout=subprocess.PIPE, stderr=subprocess.PIPE
|
|
).stdout.decode("utf-8")
|
|
output = subprocess.run(
|
|
["sudo", "/usr/bin/systemctl", "stop", "wpa_supplicant"], stdout=subprocess.PIPE, stderr=subprocess.PIPE
|
|
).stdout.decode("utf-8")
|
|
|
|
with open("/etc/wpa_supplicant/wpa_supplicant.conf", "wt") as f:
|
|
f.write(wpafile)
|
|
|
|
try:
|
|
output = subprocess.run(
|
|
["sudo", "/usr/bin/systemctl", "start", "wpa_supplicant"],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
timeout=20,
|
|
).stdout.decode("utf-8")
|
|
output = subprocess.run(
|
|
["sudo", "/usr/sbin/ifup", ClientInterface], stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=20
|
|
).stdout.decode("utf-8")
|
|
except:
|
|
return render_template("message.html", message="Failt to connected to {}".format(ssid))
|
|
|
|
return render_template("message.html", message="Successfully connected to {}".format(ssid))
|
|
|
|
return render_template("wpa.html", title="WPA Password", form=form)
|
|
|
|
|
|
@app.route("/connect/<string:ssid>&<string:security>", methods=["GET", "POST"])
|
|
@login_required
|
|
def connect(ssid, security):
|
|
if security == "WPA2":
|
|
return redirect(url_for("wpa", ssid=ssid))
|
|
|
|
# network={
|
|
# ssid="my ssid with spaces"
|
|
# key_mgmt=NONE
|
|
# }
|
|
|
|
if not RPI:
|
|
cmd = CMD_JOIN.replace("*SSID*", ssid)
|
|
|
|
output = subprocess.run(cmd.split(" "), stdout=subprocess.PIPE, stderr=subprocess.PIPE).stdout.decode("utf-8")
|
|
if output.find("Error") == -1:
|
|
return render_template("message.html", message="Successfully connected to {}".format(ssid))
|
|
|
|
return render_template("message.html", message="Failed to connect to {}".format(ssid))
|
|
else:
|
|
wpafile = """country=GB # Your 2-digit country code
|
|
ctrl_interface=DIR=/var/run/wpa_supplicant GROUP=netdev
|
|
network={{
|
|
ssid="{}"
|
|
key_mgmt=NONE
|
|
}}
|
|
""".format(
|
|
ssid
|
|
)
|
|
|
|
with open("/etc/network/interfaces", "r") as sources:
|
|
lines = sources.readlines()
|
|
with open("/etc/network/interfaces", "w") as sources:
|
|
for line in lines:
|
|
sources.write(re.sub(r"^iface wlan0 inet manual", "iface wlan0 inet dhcp", line))
|
|
|
|
output = subprocess.run(
|
|
["sudo", "/usr/sbin/ifdown", ClientInterface], stdout=subprocess.PIPE, stderr=subprocess.PIPE
|
|
).stdout.decode("utf-8")
|
|
output = subprocess.run(
|
|
["sudo", "/usr/bin/systemctl", "stop", "wpa_supplicant"], stdout=subprocess.PIPE, stderr=subprocess.PIPE
|
|
).stdout.decode("utf-8")
|
|
|
|
with open("/etc/wpa_supplicant/wpa_supplicant.conf", "wt") as f:
|
|
f.write(wpafile)
|
|
|
|
try:
|
|
output = subprocess.run(
|
|
["sudo", "/usr/bin/systemctl", "start", "wpa_supplicant"],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
timeout=20,
|
|
).stdout.decode("utf-8")
|
|
output = subprocess.run(
|
|
["sudo", "/usr/sbin/ifup", ClientInterface], stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=20
|
|
).stdout.decode("utf-8")
|
|
except:
|
|
return render_template("message.html", message="Failt to connected to {}".format(ssid))
|
|
|
|
return render_template("message.html", message="Successfully connected to {}".format(ssid))
|
|
|
|
|
|
@app.route("/disconnect/<string:ssid>", methods=["GET", "POST"])
|
|
@login_required
|
|
def disconnect(ssid):
|
|
if not RPI:
|
|
output = subprocess.run(CMD_DISCONNECT.split(" "), stdout=subprocess.PIPE, stderr=subprocess.PIPE).stdout.decode("utf-8")
|
|
if output.find("successfully disconnected") != -1:
|
|
return render_template("message.html", message="Sucessfully disconnected from {}".format(ssid))
|
|
|
|
return render_template("message.html", message="Failed to Disconnect from {}".format(ssid))
|
|
else:
|
|
output = subprocess.run(
|
|
["sudo", "/usr/sbin/ifdown", ClientInterface], stdout=subprocess.PIPE, stderr=subprocess.PIPE
|
|
).stdout.decode("utf-8")
|
|
|
|
wpafile = """country=GB # Your 2-digit country code
|
|
ctrl_interface=DIR=/var/run/wpa_supplicant GROUP=netdev
|
|
"""
|
|
with open("/etc/wpa_supplicant/wpa_supplicant.conf", "wt") as f:
|
|
f.write(wpafile)
|
|
|
|
with open("/etc/network/interfaces", "r") as sources:
|
|
lines = sources.readlines()
|
|
with open("/etc/network/interfaces", "w") as sources:
|
|
for line in lines:
|
|
sources.write(re.sub(r"^iface wlan0 inet dhcp", "iface wlan0 inet manual", line))
|
|
|
|
try:
|
|
output = subprocess.run(
|
|
["sudo", "/usr/bin/systemctl", "restart", "wpa_supplicant"],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
timeout=20,
|
|
).stdout.decode("utf-8")
|
|
output = subprocess.run(
|
|
["sudo", "/usr/sbin/ifup", ClientInterface], stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=20
|
|
).stdout.decode("utf-8")
|
|
except:
|
|
return render_template("message.html", message="Failt to disconnect from {}".format(ssid))
|
|
|
|
return render_template("message.html", message="Sucessfully disconnected from {}".format(ssid))
|