-
Notifications
You must be signed in to change notification settings - Fork 13
/
03-export-csv-uniswap-v2-v3-ethereum-top-100-sniffed-agg.py
278 lines (224 loc) · 11.7 KB
/
03-export-csv-uniswap-v2-v3-ethereum-top-100-sniffed-agg.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
"""Download top 100 liquid pairs from Uniswap v2, Uniswap v3, Sushi and output OHLCV as CSV, filter with TokenSniffer
- Aggregate output across multiple trading pairs
- See `export-csv-uniswap-v2-v3-ethereum-top-100-sniffer.py` for more details
"""
from collections import Counter
import os
from pathlib import Path
import pandas as pd
from tradingstrategy.pair import DEXPair
from tradingstrategy.pair import PandasPairUniverse
from tradingstrategy.chain import ChainId
from tradingstrategy.client import Client
from tradingstrategy.timebucket import TimeBucket
from tradingstrategy.utils.time import floor_pandas_week
from tradingstrategy.utils.forward_fill import forward_fill
from tradingstrategy.utils.wrangle import fix_dex_price_data
from tradingstrategy.utils.liquidity_filter import build_liquidity_summary, get_top_liquidity_pairs_by_base_token
from tradingstrategy.utils.token_filter import filter_pairs_default
from tradingstrategy.utils.aggregate_ohlcv import aggregate_ohlcv_across_pairs
from tradingstrategy.utils.wrangle import examine_anomalies
from eth_defi.token_analysis.tokensniffer import CachedTokenSniffer, is_tradeable_token, KNOWN_GOOD_TOKENS
TOKENSNIFFER_API_KEY = os.environ.get("TOKENSNIFFER_API_KEY")
assert TOKENSNIFFER_API_KEY, "TOKENSNIFFER_API_KEY env missing"
def main():
#
# Set up filtering and output parameters
#
chain_id = ChainId.ethereum
time_bucket = TimeBucket.d1 # OHCLV data frequency
liquidity_time_bucket = TimeBucket.d1 # TVL data for Uniswap v3 is only sampled daily, more fine granular is not needed
exchange_slugs = {"uniswap-v3", "uniswap-v2", "sushi"}
exported_top_pair_count = 500
liquidity_comparison_date = floor_pandas_week(pd.Timestamp.now() - pd.Timedelta(days=7)) # What date we use to select top 100 liquid pairs
tokensniffer_threshold = 24 # We want our TokenSniffer score to be higher than this for base tokens
min_liquidity_threshold = 1_000_000 # Prefilter pairs with this liquidity before calling token sniffer
allowed_pairs_for_token_sniffer = 450 # How many pairs we let to go through TokenSniffer filtering process (even if still above min_liquidity_threshold)
#
# Set up output files - use Trading Strategy client's cache folder
#
client = Client.create_jupyter_client()
cache_path = client.transport.cache_path
fname = "uniswap-v2-v3-ethereum-top-sniffed-agg"
os.makedirs(f"{cache_path}/prefiltered", exist_ok=True)
liquidity_output_fname = Path(f"{cache_path}/prefiltered/liquidity-{fname}.csv")
price_output_fname = Path(f"{cache_path}/prefiltered/price-{fname}.csv")
#
# Setup TokenSniffer
#
db_file = Path(cache_path) / "tokensniffer.sqlite"
sniffer = CachedTokenSniffer(
db_file,
TOKENSNIFFER_API_KEY,
)
#
# Set out trading pair universe
#
print("Downloading/opening exchange dataset")
exchange_universe = client.fetch_exchange_universe()
# Resolve uniswap-v3 internal id
exchanges = [exchange_universe.get_by_chain_and_slug(chain_id, exchange_slug) for exchange_slug in exchange_slugs]
exchange_ids = [exchange.exchange_id for exchange in exchanges]
print(f"Exchange {exchange_slugs} ids are {exchange_ids}")
# We need pair metadata to know which pairs belong to Polygon
print("Downloading/opening pairs dataset")
pairs_df = client.fetch_pair_universe().to_pandas()
pairs_df = filter_pairs_default(
pairs_df,
chain_id=chain_id,
exchange_ids=exchange_ids,
)
our_chain_pair_ids = pairs_df["pair_id"]
pair_universe = PandasPairUniverse(pairs_df)
print(f"We have data for {len(our_chain_pair_ids)} trading pairs on {fname} set")
#
# Filter by liquidity
#
# Download all liquidity data, extract
# trading pairs that exceed our prefiltering threshold
print(f"Downloading/opening TVL/liquidity dataset {liquidity_time_bucket}")
liquidity_df = client.fetch_all_liquidity_samples(liquidity_time_bucket).to_pandas()
print(f"Setting up per-pair liquidity filtering, raw liquidity data os {len(liquidity_df):,} entries")
liquidity_df = liquidity_df.loc[liquidity_df.pair_id.isin(our_chain_pair_ids)]
liquidity_df = liquidity_df.set_index("timestamp").groupby("pair_id")
print(f"Forward-filling liquidity, before forward-fill the size is {len(liquidity_df)} samples, target frequency is {liquidity_time_bucket.to_frequency()}")
liquidity_df = forward_fill(liquidity_df, liquidity_time_bucket.to_frequency(), columns=("close",)) # Only daily close liq needed for analysis, don't bother resample other cols
# Get top liquidity for all of our pairs
print(f"Filtering out historical liquidity of pairs")
pair_liquidity_max_historical, pair_liquidity_today = build_liquidity_summary(liquidity_df, our_chain_pair_ids)
print(f"Chain {chain_id.name} has liquidity data for {len(pair_liquidity_max_historical)} pairs at {liquidity_comparison_date}")
# Check how many pairs did not have good values for liquidity
broken_pairs = {pair_id for pair_id, liquidity in pair_liquidity_max_historical.items() if liquidity < 0}
print(f"Liquidity data is broken for {len(broken_pairs)} trading pairs")
# Remove duplicate pairs
print("Prefiltering and removing duplicate pairs")
top_liquid_pairs_filtered = Counter()
processed_base_tokens = set()
# List of base token addresses that have reached the threshold.
# Is ordered.
all_base_tokens = list()
for pair_id, liquidity in pair_liquidity_max_historical.most_common():
pair_metadata = pair_universe.get_pair_by_id(pair_id)
base_token_symbol = pair_metadata.base_token_symbol
if liquidity < min_liquidity_threshold:
# Prefilter pairs
continue
if base_token_symbol in processed_base_tokens:
# This pair is already in the dataset under a different pool
# with more liquidity
continue
top_liquid_pairs_filtered[pair_id] = liquidity
all_base_tokens.append(pair_metadata.base_token_address)
# Remove duplicat base tokens, maintain the list order
seen = set()
included_base_tokens = [x for x in all_base_tokens if not (x in seen or seen.add(x))]
print(f"After prefilter, we have {len(top_liquid_pairs_filtered):,} pairs left")
print(f"This is {len(all_base_tokens)} pairs with {len(included_base_tokens)} included base tokens")
# Remove tokens failing sniff test
print(f"Sniffing out bad trading pairs, max allowed {allowed_pairs_for_token_sniffer}")
safe_top_liquid_pairs_filtered = Counter()
base_token_liquidity = get_top_liquidity_pairs_by_base_token(
pair_universe,
top_liquid_pairs_filtered,
good_base_tokens=included_base_tokens,
count=allowed_pairs_for_token_sniffer
)
print(f"Base token mapped liquidity has {len(base_token_liquidity)} pairs")
for pair_id, liquidity in base_token_liquidity:
dex_pair = pair_universe.get_pair_by_id(pair_id)
ticker = dex_pair.get_ticker()
base_token_symbol = dex_pair.base_token_symbol
address = dex_pair.base_token_address
try:
sniffed_data = sniffer.fetch_token_info(chain_id.value, address)
except Exception as e:
if "404" in str(e):
# No idea what's going on with these ones
# RuntimeError: Could not verify OLAS-WETH-uniswap-v2-30bps, address 0x0001a500a6b18995b03f44bb040a5ffc28e45cb0: TokeSniffer replied: <Response [404]>: {"message":"Contract not found"}
print(f"WARN: TokenSniffer 404 not found {ticker}, address {address} as the TokenSniffer score {score} is below our risk threshold, liquidity is {liquidity:,.2f} USD")
continue
raise RuntimeError(f"Could not sniff {ticker}, address {address}: {e}") from e
if not is_tradeable_token(
sniffed_data,
symbol=base_token_symbol,
risk_score_threshold=tokensniffer_threshold):
score = sniffed_data["score"]
whitelisted = base_token_symbol in KNOWN_GOOD_TOKENS
print(f"WARN: Skipping pair {ticker}, address {address} as the TokenSniffer score {score} (whitelisted {whitelisted}) is below our risk threshold, liquidity is {liquidity:,.2f} USD")
continue
safe_top_liquid_pairs_filtered[pair_id] = liquidity
skipped_tokens = len(top_liquid_pairs_filtered) - len(safe_top_liquid_pairs_filtered)
print(f"We skip {skipped_tokens:,} in TokenSniffer filter")
print(f"Token sniffer info is:\n{sniffer.get_diagnostics()}")
print(f"Top liquid 10 pairs, by historical peak, sorted by base token")
for idx, tpl in enumerate(safe_top_liquid_pairs_filtered.most_common(10), start=1):
pair_id, liquidity = tpl
pair_metadata = pair_universe.get_pair_by_id(pair_id)
ticker = f"{pair_metadata.get_ticker()} with fee {pair_metadata.fee} BPS on {pair_metadata.exchange_slug}"
print(f"{idx}. {ticker}: {liquidity:,.2f} USD")
top_liquid_pair_ids = {key for key, _ in safe_top_liquid_pairs_filtered.most_common(exported_top_pair_count)}
# Check how much liquidity we can address
total_liq = 0
for pair_id in top_liquid_pair_ids:
total_liq += pair_liquidity_max_historical[pair_id]
print(f"Historical tradeable liquidity for {len(top_liquid_pair_ids)} pairs is {total_liq:,.2f} USD")
total_liq = 0
for pair_id in top_liquid_pair_ids:
total_liq += pair_liquidity_today[pair_id]
print(f"Today's tradeable liquidity for {len(top_liquid_pair_ids)} pairs is {total_liq:,.2f} USD")
# After we know pair ids that fill the liquidity criteria,
# we can build OHLCV dataset for these pairs
print(f"Downloading/opening OHLCV dataset {time_bucket}")
price_df = client.fetch_all_candles(time_bucket).to_pandas()
print(f"Filtering out {len(top_liquid_pair_ids)} pairs")
price_df = price_df.loc[price_df.pair_id.isin(top_liquid_pair_ids)]
print("Wrangling DEX price data")
price_df = price_df.set_index("timestamp", drop=False).groupby("pair_id")
price_df = fix_dex_price_data(
price_df,
freq=time_bucket.to_frequency(),
forward_fill=True,
)
pair_universe_left = pair_universe.limit_to_pairs(top_liquid_pair_ids)
print("Checking price data issues")
examine_anomalies(
pair_universe_left,
price_df.obj,
pair_id_column=None,
)
print(f"Building OHLCVL aggregate data across {pair_universe_left.get_count()} pairs, down from {pair_universe.get_count()} pairs")
agg_df = aggregate_ohlcv_across_pairs(
pair_universe_left,
price_df,
liquidity_df["close"],
)
# Convert pair id list to comma-separated strings
agg_df["pair_ids"] = agg_df["pair_ids"].apply(lambda x: str(x))
print("Checking aggregate data issues")
examine_anomalies(
None,
agg_df,
pair_id_column="aggregate_id",
)
print(f"The aggregate price/liquidity dataset contains total {len(agg_df['base'].unique())} base tokens, {len(agg_df):,} rows")
# Export data, make sure we got columns in an order we want
print(f"Writing OHLCV CSV")
column_order = (
"base",
"quote",
"open",
"high",
"low",
"close",
"volume",
"liquidity",
"aggregate_id",
"pair_ids"
)
agg_df = agg_df.reindex(columns=column_order) # Sort columns in a specific order
agg_df.to_csv(
price_output_fname,
)
print(f"Wrote {price_output_fname}, {price_output_fname.stat().st_size:,} bytes")
if __name__ == "__main__":
main()