-
Notifications
You must be signed in to change notification settings - Fork 13
/
main.py
162 lines (155 loc) · 7.42 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import csv
import logging
import requests
import json
import time
from parameters import DATA
from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
class WQSession(requests.Session):
def __init__(self, json_fn='credentials.json'):
super().__init__()
for handler in logging.root.handlers:
logging.root.removeHandler(handler)
logging.basicConfig(encoding='utf-8', level=logging.INFO, format='%(asctime)s: %(message)s')
self.json_fn = json_fn
self.login()
old_get, old_post = self.get, self.post
def new_get(*args, **kwargs):
try: return old_get(*args, **kwargs)
except: return new_get(*args, **kwargs)
def new_post(*args, **kwargs):
try: return old_post(*args, **kwargs)
except: return new_post(*args, **kwargs)
self.get, self.post = new_get, new_post
self.login_expired = False
self.rows_processed = []
def login(self):
with open(self.json_fn, 'r') as f:
creds = json.loads(f.read())
email, password = creds['email'], creds['password']
self.auth = (email, password)
r = self.post('https://api.worldquantbrain.com/authentication')
if 'user' not in r.json():
if 'inquiry' in r.json():
input(f"Please complete biometric authentication at {r.url}/persona?inquiry={r.json()['inquiry']} before continuing...")
self.post(f"{r.url}/persona", json=r.json())
else:
print(f'WARNING! {r.json()}')
input('Press enter to quit...')
logging.info('Logged in to WQBrain!')
def simulate(self, data):
self.rows_processed = []
def process_simulation(writer, f, simulation):
if self.login_expired: return # expired crendentials
thread = current_thread().name
alpha = simulation['code'].strip()
delay = simulation.get('delay', 1)
universe = simulation.get('universe', 'TOP3000')
truncation = simulation.get('truncation', 0.1)
region = simulation.get('region', 'USA')
decay = simulation.get('decay', 6)
neutralization = simulation.get('neutralization', 'SUBINDUSTRY').upper()
pasteurization = simulation.get('pasteurization', 'ON')
nan = simulation.get('nanHandling', 'OFF')
logging.info(f"{thread} -- Simulating alpha: {alpha}")
while True:
# keep sending a post request until the simulation link is found
try:
r = self.post('https://api.worldquantbrain.com/simulations', json={
'regular': alpha,
'type': 'REGULAR',
'settings': {
"nanHandling":nan,
"instrumentType":"EQUITY",
"delay":delay,
"universe":universe,
"truncation":truncation,
"unitHandling":"VERIFY",
"pasteurization":pasteurization,
"region":region,
"language":"FASTEXPR",
"decay":decay,
"neutralization":neutralization,
"visualization":False
}
})
nxt = r.headers['Location']
break
except:
try:
if 'credentials' in r.json()['detail']:
self.login_expired = True
return
except:
logging.info(f'{thread} -- {r.content}') # usually gateway timeout
return
logging.info(f'{thread} -- Obtained simulation link: {nxt}')
ok = True
while True:
r = self.get(nxt).json()
if 'alpha' in r:
alpha_link = r['alpha']
break
try:
logging.info(f"{thread} -- Waiting for simulation to end ({int(100*r['progress'])}%)")
except Exception as e:
ok = (False, r['message']); break
time.sleep(10)
if ok != True:
logging.info(f'{thread} -- Issue when sending simulation request: {ok[1]}')
row = [
0, delay, region,
neutralization, decay, truncation,
0, 0, 0, 'FAIL', 0, -1, universe, nxt, alpha
]
else:
r = self.get(f'https://api.worldquantbrain.com/alphas/{alpha_link}').json()
logging.info(f'{thread} -- Obtained alpha link: https://platform.worldquantbrain.com/alpha/{alpha_link}')
passed = 0
for check in r['is']['checks']:
passed += check['result'] == 'PASS'
if check['name'] == 'CONCENTRATED_WEIGHT':
weight_check = check['result']
if check['name'] == 'LOW_SUB_UNIVERSE_SHARPE':
subsharpe = check['value']
try: subsharpe
except: subsharpe = -1
row = [
passed, delay, region,
neutralization, decay, truncation,
r['is']['sharpe'],
r['is']['fitness'],
round(100*r['is']['turnover'], 2),
weight_check, subsharpe, -1,
universe, f'https://platform.worldquantbrain.com/alpha/{alpha_link}', alpha
]
writer.writerow(row)
f.flush()
self.rows_processed.append(simulation)
logging.info(f'{thread} -- Result added to CSV!')
try:
for handler in logging.root.handlers:
logging.root.removeHandler(handler)
csv_file = f"data/api_{str(time.time()).replace('.', '_')}.csv"
logging.basicConfig(encoding='utf-8', level=logging.INFO, format='%(asctime)s: %(message)s', filename=csv_file.replace('csv', 'log'))
logging.info(f'Creating CSV file: {csv_file}')
with open(csv_file, 'w', newline='') as f:
writer = csv.writer(f)
header = [
'passed', 'delay', 'region', 'neutralization', 'decay', 'truncation',
'sharpe', 'fitness', 'turnover', 'weight',
'subsharpe', 'correlation', 'universe', 'link', 'code'
]
writer.writerow(header)
with ThreadPoolExecutor(max_workers=10) as executor: # 10 threads, only 3 can go in concurrently so this is no harm
_ = executor.map(lambda sim: process_simulation(writer, f, sim), data)
except Exception as e:
print(f'Issue occurred! {type(e).__name__}: {e}')
return [sim for sim in data if sim not in self.rows_processed]
if __name__ == '__main__':
TOTAL_ROWS = len(DATA)
while DATA:
wq = WQSession()
print(f'{TOTAL_ROWS-len(DATA)}/{TOTAL_ROWS} alpha simulations...')
DATA = wq.simulate(DATA)