Skip to content

Commit

Permalink
feat: added snapshot storage to gcp
Browse files Browse the repository at this point in the history
  • Loading branch information
cka-y committed Aug 15, 2024
1 parent 379a312 commit fce3464
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 7 deletions.
101 changes: 96 additions & 5 deletions functions-python/gbfs_validator/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,25 @@
import logging
import os
import uuid
from datetime import datetime

import functions_framework
from cloudevents.http import CloudEvent
from google.cloud import pubsub_v1
from sqlalchemy.orm import joinedload

import requests
from database_gen.sqlacodegen_models import Gbfsfeed
from helpers.database import start_db_session
from helpers.logger import Logger
from helpers.parser import jsonify_pubsub
from helpers.utils import create_bucket
from google.cloud import storage

logging.basicConfig(level=logging.INFO)

BUCKET_NAME_PREFIX = os.getenv("BUCKET_NAME", "mobilitydata-gbfs-snapshots")
ENV = os.getenv("ENV", "dev")


def get_all_gbfs_feeds():
"""
Expand All @@ -36,6 +42,50 @@ def get_all_gbfs_feeds():
session.close()


@functions_framework.cloud_event
def fetch_gbfs_files(url):
"""Fetch the GBFS files from the autodiscovery URL."""
response = requests.get(url)
response.raise_for_status()
return response.json()


def store_gbfs_file_in_bucket(bucket, file_url, destination_blob_name):
"""Store a GBFS file in a Cloud Storage bucket."""
response = requests.get(file_url)
response.raise_for_status()
blob = bucket.blob(destination_blob_name)
blob.upload_from_string(response.content)
blob.make_public()
return blob.public_url


def generate_new_gbfs_json(bucket, gbfs_data, stable_id):
"""Generate a new gbfs.json with paths pointing to Cloud Storage."""
new_gbfs_data = gbfs_data.copy()
today = datetime.now().strftime("%Y-%m-%d")

for feed_key, feed in new_gbfs_data["data"].items():
if isinstance(feed["feeds"], dict):
# Case when 'feeds' is a dictionary keyed by language
for feed_language, feed_info in feed["feeds"].items():
old_url = feed_info["url"]
blob_name = f"{stable_id}/{stable_id}-{today}/{feed_info['name']}_{feed_language}.json"
new_url = store_gbfs_file_in_bucket(bucket, old_url, blob_name)
feed_info["url"] = new_url
elif isinstance(feed["feeds"], list):
# Case when 'feeds' is a list without language codes
for feed_info in feed["feeds"]:
old_url = feed_info["url"]
blob_name = f"{stable_id}/{stable_id}-{today}/{feed_info['name']}.json"
new_url = store_gbfs_file_in_bucket(bucket, old_url, blob_name)
feed_info["url"] = new_url
else:
logging.warning(f"Unexpected format in feed: {feed_key}")

return new_gbfs_data


@functions_framework.cloud_event
def gbfs_validator_pubsub(cloud_event: CloudEvent):
"""
Expand All @@ -51,17 +101,58 @@ def gbfs_validator_pubsub(cloud_event: CloudEvent):
maximum_executions = 1
logging.info(f"Maximum allowed executions: {maximum_executions}")

message_json = jsonify_pubsub(cloud_event)
message_json = jsonify_pubsub(data)
if message_json is None:
return "Invalid Pub/Sub message data."
logging.info(f"Parsed message data: {message_json}")
try:
execution_id, stable_id, url, latest_version = (
message_json["execution_id"],
message_json["stable_id"],
message_json["url"],
message_json["latest_version"],
)
except KeyError:
return (
"Invalid Pub/Sub message data. "
"Missing required field(s) execution_id, stable_id, url, or latest_version."
)
logging.info(f"Execution ID: {execution_id}")
logging.info(f"Stable ID: {stable_id}")
logging.info(f"URL: {url}")
logging.info(f"Latest version: {latest_version}")

bucket_name = f"{BUCKET_NAME_PREFIX}-{ENV}"
logging.info(f"Bucket name: {bucket_name}")
create_bucket(bucket_name)

# Step 2: Store all gbfs files and generate new gbfs.json
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
try:
gbfs_data = fetch_gbfs_files(url)
except Exception as e:
logging.error(f"Error fetching data from autodiscovery URL: {e}")
return "Error fetching data from autodiscovery URL."
try:
new_gbfs_json = generate_new_gbfs_json(bucket, gbfs_data, stable_id)
except Exception as e:
logging.error(f"Error generating new gbfs.json: {e}")
return "Error generating new gbfs.json."

# Store the new gbfs.json in the bucket
today = datetime.now().strftime("%Y-%m-%d")
new_gbfs_blob = bucket.blob(f"{stable_id}/{stable_id}-{today}/gbfs.json")
new_gbfs_blob.upload_from_string(
json.dumps(new_gbfs_json), content_type="application/json"
)
logging.info(f"Stored new gbfs.json at {new_gbfs_blob.public_url}")

# TODO: 1. Parse the CloudEvent data to extract the feed information
# TODO: 2. Store all gbfs file and generate new gbfs.json and store it as well
# TODO: 2.5. Store gbfs snapshot information in the database
# TODO: 3. Validate the feed's version otherwise add a version to the feed
# TODO: 4. Validate the feed (summary) and store the results in the database
return

return "GBFS files processed and stored successfully.", 200


@functions_framework.http
Expand Down
5 changes: 3 additions & 2 deletions functions-python/helpers/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#

import hashlib
import logging
import os

import requests
Expand All @@ -34,9 +35,9 @@ def create_bucket(bucket_name):
bucket = storage_client.lookup_bucket(bucket_name)
if bucket is None:
bucket = storage_client.create_bucket(bucket_name)
print(f"Bucket {bucket} created.")
logging.info(f"Bucket {bucket} created.")
else:
print(f"Bucket {bucket_name} already exists.")
logging.info(f"Bucket {bucket_name} already exists.")


def download_url_content(url, with_retry=False):
Expand Down
13 changes: 13 additions & 0 deletions infra/functions-python/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,10 @@ resource "google_cloudfunctions2_function" "gbfs_validator_pubsub" {
ingress_settings = "ALLOW_ALL"
vpc_connector = data.google_vpc_access_connector.vpc_connector.id
vpc_connector_egress_settings = "PRIVATE_RANGES_ONLY"
environment_variables = {
ENV = var.environment
BUCKET_NAME = "${var.gbfs_bucket_name}-${var.environment}"
}
dynamic "secret_environment_variables" {
for_each = local.function_gbfs_validation_report_config.secret_environment_variables
content {
Expand Down Expand Up @@ -529,6 +533,15 @@ resource "google_storage_bucket_iam_binding" "bucket_object_viewer" {
]
}

# Grant write access to the gbfs bucket for the service account
resource "google_storage_bucket_iam_binding" "gbfs_bucket_object_creator" {
bucket = "${var.gbfs_bucket_name}-${var.environment}"
role = "roles/storage.objectCreator"
members = [
"serviceAccount:${google_service_account.functions_service_account.email}"
]
}

# Grant the service account the ability to invoke the workflows
resource "google_project_iam_member" "workflows_invoker" {
project = var.project_id
Expand Down
6 changes: 6 additions & 0 deletions infra/functions-python/vars.tf
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,9 @@ variable "web_validator_url" {
description = "URL of the web validator"
default = "https://stg-gtfs-validator-web-mbzoxaljzq-ue.a.run.app"
}

variable "gbfs_bucket_name" {
type = string
description = "Name of the bucket where the GBFS feeds are stored"
default = "mobilitydata-gbfs-snapshots"
}

0 comments on commit fce3464

Please sign in to comment.