Skip to content

Commit

Permalink
fix: gh action
Browse files Browse the repository at this point in the history
  • Loading branch information
cka-y committed Oct 7, 2023
1 parent cdc87dc commit bf3d6f2
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 38 deletions.
8 changes: 1 addition & 7 deletions .github/workflows/datasets-batch-deployer.yml
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,6 @@ jobs:

- name: Google Cloud Setup
uses: google-github-actions/setup-gcloud@v1
# with:
# project_id: ${{ inputs.PROJECT_ID }}
# service_account_key: ${{ secrets.GCP_MOBILITY_FEEDS_SA_KEY }}

- name: Set Variables
run: |
Expand Down Expand Up @@ -221,7 +218,4 @@ jobs:
cd infra/batch
terraform apply -auto-approve tf.plan
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

- name: Create Firestore Database
run: gcloud firestore databases create --location=${{ inputs.REGION }}
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
67 changes: 42 additions & 25 deletions infra/batch/datasets/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import logging

import requests
from google.cloud import storage, firestore
from google.cloud import storage
from datetime import datetime
from hashlib import sha256
from cloudevents.http import CloudEvent
Expand Down Expand Up @@ -153,22 +153,41 @@ def validate_dataset_version(connection, json_payload, bucket_name, sha256_file_
connection.close()


def update_feed_status(status, stable_id):
def update_feed_status(status, stable_id, bucket_name):
"""
Update status in Firestore to disable further retries
Update status in GCS
"""
date = datetime.now().strftime('%Y%m%d')
db = firestore.Client()
doc_ref = db.collection('feeds').document(stable_id)
doc = doc_ref.get()
doc_content = {
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)

feed_state = {
'stable_id': stable_id,
'status': status,
'last_updated': date
'last_updated': datetime.now().strftime('%Y%m%d')
}
if doc.exists:
doc_ref.update(doc_content)
else:
doc_ref.set(doc_content)
feed_state_json = json.dumps(feed_state)

# Create a new blob or update an existing one
blob = bucket.blob(f"states/{stable_id}.json")
blob.upload_from_string(feed_state_json)


def retrieve_feed_status(stable_id, bucket_name):
"""
Retrieves status from GCS
"""
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)

blob = bucket.get_blob(f"states/{stable_id}.json")
if not blob.exists():
return None

feed_state_json = blob.download_as_text()
feed_state = json.loads(feed_state_json)
if 'status' not in feed_state:
return None
return feed_state['status']


def handle_error(bucket_name, e, errors, stable_id):
Expand Down Expand Up @@ -201,13 +220,13 @@ def handle_error(bucket_name, e, errors, stable_id):
# Upload error logs to GCP
blob = bucket.blob(f"errors/{datetime.now().strftime('%Y%m%d')}/{error_type}/{stable_id}.log")
blob.upload_from_string(errors)
logging.getLogger(bucket_name).info("Feed processing status", extra={
logging.getLogger(bucket_name).debug("Feed processing status", extra={
"stable_id": stable_id,
"status": Status.FAILED,
"timestamp": date
})

update_feed_status(Status.FAILED, stable_id)
update_feed_status(Status.FAILED, stable_id, bucket_name)


def create_bucket(bucket_name):
Expand Down Expand Up @@ -260,17 +279,15 @@ def process_dataset(cloud_event: CloudEvent):
print(f"[{stable_id} INFO] JSON Payload:", json_payload)

# Validate that the feed wasn't previously processed
db = firestore.Client()
doc_ref = db.collection('feeds').document(stable_id)
doc = doc_ref.get()
if doc.exists:
if doc.to_dict()['last_updated'] == date:
print(f"[{stable_id} INFO] Feed was already processed")
return 'Completed.'
feed_status = retrieve_feed_status(stable_id, bucket_name)
print(f"[{stable_id} INFO] Feed status is {feed_status}")
if feed_status is not None:
print(f"[{stable_id} INFO] Feed was already processed")
return 'Completed.'

if dataset_id is None:
print(f"[{stable_id} INTERNAL ERROR] Couldn't find latest dataset related to feed_id.\n")
logging.getLogger(bucket_name).info("Feed processing status", extra={
logging.getLogger(bucket_name).debug("Feed processing status", extra={
"stable_id": stable_id,
"status": Status.FAILED,
"timestamp": date
Expand All @@ -281,7 +298,7 @@ def process_dataset(cloud_event: CloudEvent):

if hosted_url is None:
print(f'[{stable_id} INFO] Process completed. No database update required.')
logging.getLogger(bucket_name).info("Feed processing status", extra={
logging.getLogger(bucket_name).debug("Feed processing status", extra={
"stable_id": stable_id,
"status": Status.NOT_UPDATED,
"timestamp": date
Expand All @@ -297,7 +314,7 @@ def process_dataset(cloud_event: CloudEvent):
engine = get_db_engine()
validate_dataset_version(engine.connect(), json_payload, bucket_name, sha256_file_hash, hosted_url)

logging.getLogger(bucket_name).info("Feed processing status", extra={
logging.getLogger(bucket_name).debug("Feed processing status", extra={
"stable_id": stable_id,
"status": Status.UPDATED,
"timestamp": date
Expand Down
6 changes: 0 additions & 6 deletions infra/batch/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,6 @@ data "google_service_account" "ci_impersonator_service_account" {
project = var.project_id
}

resource "google_project_iam_member" "ci_binding_cloudsql_admin" {
project = var.project_id
role = "roles/datastore.databases.owner"
member = "serviceAccount:${data.google_service_account.ci_impersonator_service_account.email}"
}

resource "google_project_service" "services" {
for_each = toset(local.services)
service = each.value
Expand Down

0 comments on commit bf3d6f2

Please sign in to comment.