Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Commit

Permalink
Add paginated read of s3 objects and iteration of discovery to avoid …
Browse files Browse the repository at this point in the history
…lambda constraints
  • Loading branch information
moradology committed Nov 3, 2022
1 parent 0f1ae67 commit 45066ac
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 44 deletions.
16 changes: 14 additions & 2 deletions deploy/cdk/step_function_stack.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from re import S
from typing import TYPE_CHECKING
from aws_cdk import (
core,
Expand Down Expand Up @@ -77,6 +78,15 @@ def _discovery_workflow(
queue=queue_stack.stac_ready_queue,
)

maybe_next_discovery = (
stepfunctions.Choice(self, "NextDiscovery?")
.when(
stepfunctions.Condition.is_present("$.Payload.start_after"),
s3_discovery_task
)
.otherwise(stepfunctions.Succeed(self, "Successful Ingest"))
)

maybe_cogify = (
stepfunctions.Choice(self, "Cogify?")
.when(
Expand All @@ -86,6 +96,7 @@ def _discovery_workflow(
"Run concurrent queueing to cogify queue",
max_concurrency=10,
items_path=stepfunctions.JsonPath.string_at("$.Payload.objects"),
result_path=stepfunctions.JsonPath.DISCARD
).iterator(enqueue_cogify_task),
)
.otherwise(
Expand All @@ -94,6 +105,7 @@ def _discovery_workflow(
"Run concurrent queueing to stac ready queue",
max_concurrency=10,
items_path=stepfunctions.JsonPath.string_at("$.Payload.objects"),
result_path=stepfunctions.JsonPath.DISCARD
).iterator(enqueue_ready_task)
)
)
Expand All @@ -102,11 +114,11 @@ def _discovery_workflow(
stepfunctions.Choice(self, "Discovery Choice (CMR or S3)")
.when(
stepfunctions.Condition.string_equals("$.discovery", "s3"),
s3_discovery_task.next(maybe_cogify),
s3_discovery_task.next(maybe_cogify).next(maybe_next_discovery),
)
.when(
stepfunctions.Condition.string_equals("$.discovery", "cmr"),
cmr_discovery_task.next(maybe_cogify),
cmr_discovery_task.next(maybe_cogify).next(maybe_next_discovery),
)
.otherwise(stepfunctions.Fail(self, "Discovery Type not supported"))
)
Expand Down
86 changes: 44 additions & 42 deletions lambdas/s3-discovery/handler.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import os
import re

Expand All @@ -13,58 +14,59 @@ def assume_role(role_arn, session_name):
return creds["Credentials"]


def list_bucket(bucket, prefix, filename_regex):
kwargs = {}
def handler(event, context):
bucket = event.get("bucket")
prefix = event.get("prefix", "")
filename_regex = event.get("filename_regex", None)
collection = event.get("collection", prefix.rstrip("/"))
properties = event.get("properties", {})
cogify = event.pop("cogify", False)


if role_arn := os.environ.get("EXTERNAL_ROLE_ARN"):
creds = assume_role(role_arn, "veda-data-pipelines_s3-discovery")
kwargs = {
"aws_access_key_id": creds["AccessKeyId"],
"aws_secret_access_key": creds["SecretAccessKey"],
"aws_session_token": creds["SessionToken"],
}
s3 = boto3.resource("s3", **kwargs)
try:
files = []
bucket = s3.Bucket(bucket)
for obj in bucket.objects.filter(Prefix=prefix):
if filename_regex:
if re.match(filename_regex, obj.key):
files.append(obj.key)
else:
files.append(obj.key)
return files

except:
print("Failed during s3 item/asset discovery")
raise

s3client = boto3.client("s3", **kwargs)
s3paginator = s3client.get_paginator("list_objects_v2")
start_after = event.get("start_after", None)
pages = s3paginator.paginate(Bucket=bucket, Prefix=prefix, StartAfter=start_after)

def handler(event, context):
bucket = event.pop("bucket")
prefix = event.pop("prefix", "")

filenames = list_bucket(
bucket=bucket, prefix=prefix, filename_regex=event.pop("filename_regex", None)
)

files_objs = []
cogify = event.pop("cogify", False)
collection = event.get("collection", prefix.rstrip("/"))
properties = event.get("properties", {})
for filename in filenames:
files_objs.append(
{
**event,
"collection": collection,
"s3_filename": f"s3://{bucket}/{filename}",
"upload": event.get("upload", False),
"properties": properties,
}
)
return {
file_objs_size = 0
payload = {
**event,
"cogify": cogify,
"objects": files_objs,
"objects": []
}
for page in pages:
for obj in page['Contents']:
# The limit is advertised at 256000, but we'll preserve some breathing room
if file_objs_size > 230000:
payload["start_after"] = start_after
break
else:
filename = obj["Key"]
if filename_regex:
if re.match(filename_regex, filename):
pass
else:
continue
else:
pass
file_obj = {
"collection": collection,
"s3_filename": f"s3://{bucket}/{filename}",
"upload": event.get("upload", False),
"properties": properties,
}
payload.objects.append(file_obj)
file_obj_size = len(json.dumps(file_obj, ensure_ascii=False).encode('utf8'))
file_objs_size = file_objs_size + file_obj_size
start_after = filename
return payload


if __name__ == "__main__":
Expand Down

0 comments on commit 45066ac

Please sign in to comment.