Skip to content

Commit

Permalink
Defi datasets cacheability
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiimk committed Dec 10, 2024
1 parent 7cb6cc3 commit 82b61d2
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 51 deletions.
6 changes: 3 additions & 3 deletions com.defillama/chains.tvl.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ content:
- kind: SetPollingSource
fetch:
kind: Container
image: "ghcr.io/kamu-data/fetch-com.defillama:0.1.5"
image: "ghcr.io/kamu-data/fetch-com.defillama:0.2.0"
args:
- --request-interval
- '${{ env.request_interval }}'
- '${{ env.request_interval || 1 }}'
- chains
- tvl
- --top-n
- '${{ env.top_n_chains }}'
- '${{ env.top_n_chains || 0 }}'
read:
kind: NdJson
schema:
Expand Down
2 changes: 1 addition & 1 deletion com.defillama/image/Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
IMAGE = ghcr.io/kamu-data/fetch-com.defillama:0.1.5
IMAGE = ghcr.io/kamu-data/fetch-com.defillama:0.2.0

.PHONY: requirements
requirements:
Expand Down
88 changes: 80 additions & 8 deletions com.defillama/image/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,39 @@ def api_request(url, **kwargs):
raise


def get_last_updated():
last_updated = os.environ.get('ODF_ETAG')
if last_updated:
return dt.datetime.fromisoformat(last_updated)
else:
return None


def write_last_updated(last_updated):
etag_path = os.environ.get("ODF_NEW_ETAG_PATH")
if etag_path:
with open(etag_path, 'w') as f:
f.write(last_updated.isoformat())


def is_up_to_date(last_updated, now):
if last_updated:
delta = now - last_updated
log("Time since last update:", delta)
if delta.days < 1:
log("Considering up-to-date")
write_last_updated(last_updated)
return True
return False


def write_has_more():
path = os.environ.get("ODF_NEW_HAS_MORE_DATA_PATH")
if path:
with open(path, 'w') as f:
pass


#################################
# Protocols
#################################
Expand Down Expand Up @@ -53,6 +86,11 @@ def protocols(args):


def protocols_chain_tvls(args):
now = dt.datetime.now(dt.timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
last_updated = get_last_updated()
if is_up_to_date(last_updated, now):
return

protocols = protocols_list(top_n_tvl = args.top_n)
log("Fetching chain tvls for protocols:", protocols)

Expand All @@ -67,6 +105,8 @@ def protocols_chain_tvls(args):

time.sleep(args.request_interval)

write_last_updated(now)


#################################
# Chains
Expand Down Expand Up @@ -95,6 +135,11 @@ def chains(args):


def chains_tvl(args):
now = dt.datetime.now(dt.timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
last_updated = get_last_updated()
if is_up_to_date(last_updated, now):
return

chains = chains_list(top_n_tvl = args.top_n)

for c in chains:
Expand All @@ -104,6 +149,8 @@ def chains_tvl(args):

time.sleep(args.request_interval)

write_last_updated(now)


#################################
# Pools
Expand Down Expand Up @@ -159,6 +206,11 @@ def pool_yield(pool):


def pools_yield(args):
now = dt.datetime.now(dt.timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
last_updated = get_last_updated()
if is_up_to_date(last_updated, now):
return

pools = pools_list(top_n_tvl = args.top_n_tvl, predefined_subset = args.predefined_subset)

for p in pools:
Expand Down Expand Up @@ -189,6 +241,8 @@ def pools_yield(args):

time.sleep(args.request_interval)

write_last_updated(now)


#################################
# Tokens
Expand Down Expand Up @@ -218,6 +272,25 @@ def chunks(iterable, size):


def token_prices(args):
# Align to midnight to make sure we don't ingest data that might still be updated
now = dt.datetime.now(dt.timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
now_ts = now.timestamp()

last_updated = get_last_updated()
if is_up_to_date(last_updated, now):
return

if not last_updated:
last_updated = dt.datetime.fromisoformat(args.start)

est_data_points = (now - last_updated).days
has_more = False
if est_data_points > args.span:
now = last_updated + dt.timedelta(days = args.span)
has_more = True

log(f"Ingest iteration start: {last_updated.isoformat()}, end: {now.isoformat()}, span: {args.span}, estimated: {est_data_points}, has_more: {has_more}")

tokens = [
(chain, address)
for token in tokens_list()
Expand All @@ -232,7 +305,7 @@ def token_prices(args):

resp = api_request(
f"https://coins.llama.fi/chart/{coins}",
params={"end": args.end, "span": args.span, "period": args.period},
params={"start": last_updated.timestamp(), "span": args.span, "period": args.period},
)

for chain_addr, coin in resp["coins"].items():
Expand All @@ -250,6 +323,10 @@ def token_prices(args):

time.sleep(args.request_interval)

write_last_updated(now)
if has_more:
write_has_more()

#################################


Expand Down Expand Up @@ -290,13 +367,8 @@ def token_prices(args):
sp_tokens = p_tokens.add_subparsers(dest='scmd', required=False)

p_tokens_prices = sp_tokens.add_parser('prices')
# By default we will request data with 1-day period and will align the end of the requested interval
# with midnight UTC to ideally get the same values upon next ingestion and keep the ledger from skewing
now_utc = dt.datetime.now(dt.timezone.utc)
midnight_utc = now_utc.replace(hour=0, minute=0, second=0, microsecond=0)
midnight_utc = int(midnight_utc.timestamp())
p_tokens_prices.add_argument('--end', type=int, default=midnight_utc)
p_tokens_prices.add_argument('--span', type=int, default=365 * 3)
p_tokens_prices.add_argument('--start', type=str, default='2021-01-01:00:00:00Z')
p_tokens_prices.add_argument('--span', type=int, default=365)
p_tokens_prices.add_argument('--period', default="1d")
p_tokens_prices.add_argument('--batch', type=int, default=1)

Expand Down
2 changes: 1 addition & 1 deletion com.defillama/pools.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ content:
- kind: SetPollingSource
fetch:
kind: Container
image: "ghcr.io/kamu-data/fetch-com.defillama:0.1.5"
image: "ghcr.io/kamu-data/fetch-com.defillama:0.2.0"
args:
- pools
- --predefined-subset
Expand Down
4 changes: 2 additions & 2 deletions com.defillama/pools.yield.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ content:
- kind: SetPollingSource
fetch:
kind: Container
image: "ghcr.io/kamu-data/fetch-com.defillama:0.1.5"
image: "ghcr.io/kamu-data/fetch-com.defillama:0.2.0"
args:
- --request-interval
- '${{ env.request_interval }}'
- '${{ env.request_interval || 1 }}'
- pools
- yield
- --predefined-subset
Expand Down
6 changes: 3 additions & 3 deletions com.defillama/protocols.chain-tvls.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ content:
- kind: SetPollingSource
fetch:
kind: Container
image: "ghcr.io/kamu-data/fetch-com.defillama:0.1.5"
image: "ghcr.io/kamu-data/fetch-com.defillama:0.2.0"
args:
- --request-interval
- '${{ env.request_interval }}'
- '${{ env.request_interval || 1 }}'
- protocols
- chain-tvls
- --top-n
- '${{ env.top_n_protocols }}'
- '${{ env.top_n_protocols || 200 }}'
read:
kind: NdJson
schema:
Expand Down
10 changes: 7 additions & 3 deletions com.defillama/tokens.prices.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,18 @@ content:
- kind: SetPollingSource
fetch:
kind: Container
image: "ghcr.io/kamu-data/fetch-com.defillama:0.1.5"
image: "ghcr.io/kamu-data/fetch-com.defillama:0.2.0"
args:
- --request-interval
- '${{ env.request_interval }}'
- ${{ env.request_interval || 1 }}
- tokens
- prices
- --start
- ${{ env.tokens_start || '2021-01-01:00:00:00Z' }}
- --span
- '${{ env.tokens_span }}'
- ${{ env.tokens_span || 100 }}
- --batch
- ${{ env.tokens_batch || 5 }}
read:
kind: NdJson
schema:
Expand Down
2 changes: 1 addition & 1 deletion io.codex/image/Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
IMAGE = ghcr.io/kamu-data/fetch-io.codex:0.1.0
IMAGE = ghcr.io/kamu-data/fetch-io.codex:0.2.0

.PHONY: requirements
requirements:
Expand Down
60 changes: 33 additions & 27 deletions io.codex/image/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,33 +75,7 @@ def tokens_list_resolved():
]


def tokens_bars(args):
t_from = getattr(args, 'from')
if t_from is None:
t_from = os.environ.get('ODF_ETAG')
if t_from is None:
t_from = DATE_MIN

t_to = args.to
if t_to is None:
# Align the end of the requested interval with midnight UTC to
# ideally get the same values upon next ingestion and keep the ledger from skewing
t_to = dt.datetime.now(dt.timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0).isoformat()

t_from = int(dt.datetime.fromisoformat(t_from).timestamp())
t_to = int(dt.datetime.fromisoformat(t_to).timestamp())

est_data_points = (t_to - t_from) / (60 * 60 * 24)
has_more = False

log("Estimated data points:", est_data_points)
if est_data_points > GET_BARS_MAX_DATAPOINTS:
est_data_points = GET_BARS_MAX_DATAPOINTS
t_to = t_from + (60 * 60 * 24) * est_data_points
has_more = True

log(f"Adjusted from: {t_from}, to: {t_to}, est_points: {est_data_points}")

def tokens_bars_impl(t_from, t_to, args):
tokens = tokens_list_resolved()

gql = """
Expand Down Expand Up @@ -207,6 +181,38 @@ def tokens_bars(args):

time.sleep(args.request_interval)


def tokens_bars(args):
t_from = getattr(args, 'from')
if not t_from:
t_from = os.environ.get('ODF_ETAG')
if not t_from:
t_from = DATE_MIN

t_to = args.to
if not t_to:
# Align the end of the requested interval with midnight UTC to
# ideally get the same values upon next ingestion and keep the ledger from skewing
t_to = dt.datetime.now(dt.timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0).isoformat()

t_from = int(dt.datetime.fromisoformat(t_from).timestamp())
t_to = int(dt.datetime.fromisoformat(t_to).timestamp())

est_data_points = (t_to - t_from) / (60 * 60 * 24)
has_more = False

log("Estimated data points:", est_data_points)

if est_data_points > GET_BARS_MAX_DATAPOINTS:
est_data_points = GET_BARS_MAX_DATAPOINTS
t_to = t_from + (60 * 60 * 24) * est_data_points
has_more = True

log(f"Adjusted from: {t_from}, to: {t_to}, est_points: {est_data_points}")

if est_data_points >= 1:
tokens_bars_impl(t_from, t_to, args)

etag = dt.datetime.fromtimestamp(t_to, dt.timezone.utc).isoformat()
log(f"Finished ingest iteration has_more: {has_more}, etag: {etag}")

Expand Down
4 changes: 2 additions & 2 deletions io.codex/tokens.olhcv.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ content:
- kind: SetPollingSource
fetch:
kind: Container
image: "ghcr.io/kamu-data/fetch-io.codex:0.1.0"
image: "ghcr.io/kamu-data/fetch-io.codex:0.2.0"
env:
- name: CODEX_API_KEY
args:
- --request-interval
- '${{ env.request_interval }}'
- '${{ env.request_interval || 1 }}'
- tokens
- bars
read:
Expand Down

0 comments on commit 82b61d2

Please sign in to comment.