forked from dagrha/pypia
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pia_toolkit.py
executable file
·144 lines (119 loc) · 4.55 KB
/
pia_toolkit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#!/usr/bin/env python3
"""
Randomly select a Private Internet Access VPN connection.
With command line arguments, you can connect to random US or worldwide
connection, shuffle your connection, or disconnect completely.
Usage:
-----
if script is executable ($ chmod +x random_pia.py):
./random_pia.py [-r {us, all, int}] [-d, --disconnect]
otherwise if it's not executable you can do:
python3 random_pia.py [-r {us, all, int}] [-d, --disconnect]
"""
import os
from random import randint
import subprocess
import re
import argparse
from threading import Thread
from queue import Queue
from pypia import get_vpn_configs
def get_pia_connections(region):
if region == 'us':
search_term = 'PIA - US'
else:
search_term = 'PIA - '
sys_cons = '/etc/NetworkManager/system-connections/'
if region == 'int':
cons = [i for i in os.listdir(sys_cons) if (search_term in i) & ('US' not in i)]
else:
cons = [i for i in os.listdir(sys_cons) if search_term in i]
return cons
def pick_rand_con(cons):
return cons[randint(0, len(cons) - 1)]
def make_connection(con):
print('Activating {}...'.format(con))
subprocess.call(['nmcli', 'con', 'up', 'id', con])
def connect_vpn(region, disconnected):
cons = get_pia_connections(region)
con = pick_rand_con(cons)
while con in disconnected:
con = pick_rand_con(cons)
make_connection(con)
def disconnect_vpn():
active_cons = subprocess.check_output(['nmcli', 'con', 'show', '--active']).decode('utf-8')
active_pia = []
for i in active_cons.split('\n'):
if re.search('\svpn\s', i):
vpn = re.split('\s+\S{36}\s+', i)[0]
if vpn.startswith('PIA'):
active_pia.append(vpn)
for i in active_pia:
print('Disconnecting {}...'.format(i))
subprocess.call(['nmcli', 'con', 'down', 'id', i])
return active_pia
class Latencies():
def __init__(self, region):
self.configs = get_vpn_configs()
self.q = Queue()
self.latencies = {}
self.get_ip_addresses(region)
print('Pinging all PIA servers. This might take a few seconds...\n')
self.threaded_pings()
self.get_fastest()
def get_ip_addresses(self, region):
cons = [i.split(' - ')[-1] for i in get_pia_connections(region)]
self.ip_addresses = {}
for k in self.configs:
if isinstance(self.configs[k], dict):
if self.configs[k].get('ping'):
if self.configs[k]['name'] in cons:
ip = self.configs[k]['ping'].split(':')[0]
self.ip_addresses[ip] = self.configs[k]['name']
self.q.put(ip)
def ping(self):
ip = self.q.get()
result = subprocess.check_output(['ping', '-c', '3', ip]).decode('utf-8')
avg = float(result.split('=')[-1].split('/')[1])
self.latencies[ip] = avg
self.q.task_done()
def threaded_pings(self):
for i in range(len(self.q.queue)):
worker = Thread(target=self.ping)
worker.start()
self.q.join()
def get_fastest(self):
fastest = min(self.latencies, key=self.latencies.get)
self.fastest = 'PIA - ' + self.ip_addresses[fastest]
def __repr__(self):
return str(self.latencies)
def __str__(self):
table = ' {:<20} {:<17} {:>11} \n'.format('name', 'ip', 'ping (ms)')
table += '-' * 52 + '\n'
for k in sorted(self.latencies, key=self.latencies.get):
table += '| {:<18} | {:<15} | {:>9.2f} |\n'.format(self.ip_addresses[k],
k, self.latencies[k])
return table
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-r', '--region', choices=['us', 'all', 'int'],
help='"us" for US only, "int" for non-US, "all" for worldwide')
parser.add_argument('-d', '--disconnect', action='store_true',
help='disconnect current PIA vpn connection')
parser.add_argument('-f', '--fastest', action='store_true',
help='connect to network with lowest ping latency')
args = parser.parse_args()
if not args.region:
region = 'us'
else:
region = args.region
if args.disconnect:
disconnect_vpn()
elif args.fastest:
disconnect_vpn()
lat = Latencies(region)
print(lat)
make_connection(lat.fastest)
else:
disconnected = disconnect_vpn()
connect_vpn(region, disconnected)