Skip to content

Commit

Permalink
feat: automate gtfs & gbfs data preprocessing
Browse files Browse the repository at this point in the history
  • Loading branch information
cka-y committed Aug 23, 2024
1 parent e1823aa commit cd6f160
Show file tree
Hide file tree
Showing 16 changed files with 1,525 additions and 1 deletion.
9 changes: 9 additions & 0 deletions functions-python/preprocessed_analytics/.coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[run]
omit =
*/test*/*
*/helpers/*
*/database_gen/*

[report]
exclude_lines =
if __name__ == .__main__.:
177 changes: 177 additions & 0 deletions functions-python/preprocessed_analytics/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
# GTFS & GBFS Analytics Processor

This directory contains Google Cloud Functions that automate the retrieval, processing, and analytics generation for GTFS and GBFS datasets. The project is designed to handle and analyze both GTFS and GBFS data, storing the results in Google Cloud Storage.

## Overview

### `process_analytics_gtfs`

This HTTP-triggered Cloud Function processes GTFS datasets by performing the following steps:

1. **Retrieving Data**: Fetches the latest GTFS dataset per feed from the database.
2. **Processing Data**: Analyzes the dataset, extracting metrics related to validation notices, features, and geographical locations.
3. **Storing Analytics**: Saves the processed data as JSON files in the Google Cloud Storage bucket, updating metrics and analytics files.

#### Files Modified/Created:
- **`analytics_YYYY_MM.json`**: Contains the GTFS analytics data for the specific month in JSON format.
**Format:**
```json
{
"feed_id": "string",
"dataset_id": "string",
"notices": {
"errors": ["string"],
"warnings": ["string"],
"infos": ["string"]
},
"features": ["string"],
"created_on": "datetime",
"last_modified": "datetime",
"provider": "string",
"locations": [
{
"country_code": "string",
"country": "string",
"municipality": "string",
"subdivision_name": "string"
}
]
}
```

- **`feed_metrics.json`**: Stores aggregated feed-level metrics, including error, warning, and info counts.
**Format:**
```json
{
"feed_id": "string",
"computed_on": ["datetime"],
"errors_count": ["int"],
"warnings_count": ["int"],
"infos_count": ["int"]
}
```

- **`features_metrics.json`**: Tracks feature usage across feeds, showing the number of feeds using specific features.
**Format:**
```json
{
"feature": "string",
"computed_on": ["datetime"],
"feeds_count": ["int"]
}
```

- **`notices_metrics.json`**: Records notice metrics by severity level (error, warning, info).
**Format:**
```json
{
"notice": "string",
"severity": "string",
"computed_on": ["datetime"],
"feeds_count": ["int"]
}
```

- **`analytics_files.json`**: Index of all `analytics_YYYY_MM.json` files stored in the bucket.
**Format:**
```json
{
"file_name": "string",
"created_on": "datetime"
}
```

### `process_analytics_gbfs`

This HTTP-triggered Cloud Function processes GBFS datasets by performing the following steps:

1. **Retrieving Data**: Fetches the latest GBFS snapshot per feed from the database.
2. **Processing Data**: Analyzes the snapshot, extracting metrics related to validation notices, versions, and geographical locations.
3. **Storing Analytics**: Saves the processed data as JSON files in the Google Cloud Storage bucket, updating metrics and analytics files.

#### Files Modified/Created:
- **`analytics_YYYY_MM.json`**: Contains the GBFS analytics data for the specific month in JSON format.
**Format:**
```json
{
"feed_id": "string",
"snapshot_id": "string",
"notices": [
{
"keyword": "string",
"gbfs_file": "string",
"schema_path": "string"
}
],
"created_on": "datetime",
"operator": "string",
"locations": [
{
"country_code": "string",
"country": "string",
"municipality": "string",
"subdivision_name": "string"
}
]
}
```

- **`feed_metrics.json`**: Stores aggregated feed-level metrics, including error counts.
**Format:**
```json
{
"feed_id": "string",
"computed_on": ["datetime"],
"errors_count": ["int"]
}
```

- **`versions_metrics.json`**: Tracks the usage of different GBFS versions across feeds.
**Format:**
```json
{
"version": "string",
"computed_on": ["datetime"],
"feeds_count": ["int"]
}
```

- **`notices_metrics.json`**: Records notice metrics specific to GBFS, categorized by keyword, file, and schema path.
**Format:**
```json
{
"keyword": "string",
"gbfs_file": "string",
"schema_path": "string",
"computed_on": ["datetime"],
"feeds_count": ["int"]
}
```

- **`analytics_files.json`**: Index of all `analytics_YYYY_MM.json` files stored in the bucket.
**Format:**
```json
{
"file_name": "string",
"created_on": "datetime"
}
```

## Project Structure

- **`main.py`**: Defines the HTTP-triggered Cloud Functions that initiate the GTFS and GBFS data analytics processes.
- **`processors/base_analytics_processor.py`**: Contains the base class for analytics processing, providing common logic for GTFS and GBFS processors.
- **`processors/gtfs_analytics_processor.py`**: Implements GTFS-specific data retrieval and processing logic.
- **`processors/gbfs_analytics_processor.py`**: Implements GBFS-specific data retrieval and processing logic.
- **`tests/`**: Unit tests for all modules and functions, ensuring correct functionality and robustness.

## Project Configuration

The following environment variables need to be set:

- `FEEDS_DATABASE_URL`: The URL for the database containing GTFS and GBFS feeds.
- `ANALYTICS_BUCKET`: The name of the Google Cloud Storage bucket where analytics results are stored.

## Local Development

Refer to the main [README.md](../README.md) for general setup instructions for the development environment.
20 changes: 20 additions & 0 deletions functions-python/preprocessed_analytics/function_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"name": "process-analytics",
"description": "Process analytics",
"entry_point": "process_analytics",
"timeout": 540,
"memory": "2Gi",
"trigger_http": false,
"include_folders": ["database_gen", "helpers"],
"environment_variables": [],
"secret_environment_variables": [
{
"key": "FEEDS_DATABASE_URL"
}
],
"ingress_settings": "ALLOW_ALL",
"max_instance_request_concurrency": 1,
"max_instance_count": 5,
"min_instance_count": 0,
"available_cpu": 1
}
15 changes: 15 additions & 0 deletions functions-python/preprocessed_analytics/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
functions-framework==3.*
google-cloud-logging
google-cloud-bigquery
google-cloud-storage
psycopg2-binary==2.9.6
aiohttp~=3.8.6
asyncio~=3.4.3
urllib3~=2.1.0
SQLAlchemy==2.0.23
geoalchemy2==0.14.7
requests~=2.31.0
attrs~=23.1.0
pluggy~=1.3.0
certifi~=2023.7.22
pandas
4 changes: 4 additions & 0 deletions functions-python/preprocessed_analytics/requirements_dev.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Faker
pytest~=7.4.3
urllib3-mock
requests-mock
Empty file.
58 changes: 58 additions & 0 deletions functions-python/preprocessed_analytics/src/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import logging
from datetime import datetime

import flask
import functions_framework
from flask import Response

from helpers.logger import Logger
from .processors.gbfs_analytics_processor import GBFSAnalyticsProcessor
from .processors.gtfs_analytics_processor import GTFSAnalyticsProcessor

logging.basicConfig(level=logging.INFO)


def get_compute_date(request: flask.Request) -> datetime:
"""
Get the compute date from the request JSON.
"""
try:
json_request = request.get_json()
compute_date_str = json_request.get("compute_date", None)
if compute_date_str:
return datetime.strptime(compute_date_str, "%Y%m%d")
except Exception as e:
logging.error(f"Error getting compute date: {e}")
return datetime.now()


def process_analytics(request: flask.Request, processor_class) -> Response:
"""
Common logic to process analytics using the given processor class.
"""
Logger.init_logger()
logging.info(f"{processor_class.__name__} Function triggered")
compute_date = get_compute_date(request)
logging.info(f"Compute date: {compute_date}")
try:
processor = processor_class(compute_date)
processor.run()
except Exception as e:
logging.error(f"Error processing {processor_class.__name__} analytics: {e}")
return Response(
f"Error processing analytics for date {compute_date}: {e}", status=500
)

return Response(
f"Successfully processed analytics for date: {compute_date}", status=200
)


@functions_framework.http
def process_analytics_gtfs(request: flask.Request) -> Response:
return process_analytics(request, GTFSAnalyticsProcessor)


@functions_framework.http
def process_analytics_gbfs(request: flask.Request) -> Response:
return process_analytics(request, GBFSAnalyticsProcessor)
Loading

0 comments on commit cd6f160

Please sign in to comment.