Skip to content

Commit

Permalink
fix: python error
Browse files Browse the repository at this point in the history
  • Loading branch information
cka-y committed Oct 7, 2023
1 parent ff10648 commit 5b0b011
Showing 1 changed file with 15 additions and 8 deletions.
23 changes: 15 additions & 8 deletions infra/batch/datasets/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class Status(Enum):
UPDATED = 0
NOT_UPDATED = 1
FAILED = 2
DO_NOT_RETRY = 3


def upload_dataset(url, bucket_name, stable_id, latest_hash):
Expand Down Expand Up @@ -175,19 +176,20 @@ def update_feed_status(status, stable_id, bucket_name):
def retrieve_feed_status(stable_id, bucket_name):
"""
Retrieves status from GCS
:return tuple where the first element is the status and the second element check if it was updated today
"""
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)

blob = bucket.get_blob(f"states/{stable_id}.json")
if blob is None:
return None
return None, False

feed_state_json = blob.download_as_text()
feed_state = json.loads(feed_state_json)
if 'status' not in feed_state:
return None
return feed_state['status']
return None, False
return feed_state['status'], feed_state_json['last_updated'] == datetime.now().strftime('%Y%m%d')


def handle_error(bucket_name, e, errors, stable_id):
Expand Down Expand Up @@ -269,6 +271,11 @@ def process_dataset(cloud_event: CloudEvent):
error_return_message = f'ERROR - Unsuccessful processing of dataset with stable id {stable_id}.'
bucket_name = os.getenv("BUCKET_NAME")
date = datetime.now().strftime('%Y%m%d')

# Allow raised exception to trigger the retry process until a connection is available
engine = get_db_engine()
connection = engine.connect()

try:
# Extract data from message
data = base64.b64decode(cloud_event.data["message"]["data"]).decode()
Expand All @@ -278,10 +285,12 @@ def process_dataset(cloud_event: CloudEvent):

print(f"[{stable_id} INFO] JSON Payload:", json_payload)

update_feed_status(Status.DO_NOT_RETRY, stable_id, bucket_name)

# Validate that the feed wasn't previously processed
feed_status = retrieve_feed_status(stable_id, bucket_name)
feed_status, updated_today = retrieve_feed_status(stable_id, bucket_name)
print(f"[{stable_id} INFO] Feed status is {feed_status}")
if feed_status is not None:
if updated_today:
print(f"[{stable_id} INFO] Feed was already processed")
return 'Completed.'

Expand Down Expand Up @@ -310,9 +319,7 @@ def process_dataset(cloud_event: CloudEvent):
return error_return_message

if hosted_url is not None:
# Allow raised exception to trigger the retry process until a connection is available
engine = get_db_engine()
validate_dataset_version(engine.connect(), json_payload, bucket_name, sha256_file_hash, hosted_url)
validate_dataset_version(connection, json_payload, bucket_name, sha256_file_hash, hosted_url)

logging.getLogger(bucket_name).debug("Feed processing status", extra={
"stable_id": stable_id,
Expand Down

0 comments on commit 5b0b011

Please sign in to comment.