Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: transitFeedSyncProcessing implementation #819

Merged
merged 21 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
46cca35
feat: Add Transitland feed sync processor
AlfredNwolisa Nov 12, 2024
8f2740f
lint fix
AlfredNwolisa Nov 13, 2024
af81f14
Refactor to use SQLAlchemy models for database operations
AlfredNwolisa Nov 18, 2024
e5306b1
Remove unused freeze_time import from tests
AlfredNwolisa Nov 19, 2024
cfc11cc
Update functions-python/feed_sync_process_transitland/src/main.py
AlfredNwolisa Nov 19, 2024
19cde6c
Refactor FeedProcessor for enhanced logging and error handling
AlfredNwolisa Nov 19, 2024
3818ab9
Update logging and refactor feed processing
AlfredNwolisa Nov 26, 2024
33e5e17
lint fix
AlfredNwolisa Nov 26, 2024
10f5437
added pycountry to requirements.txt
AlfredNwolisa Nov 26, 2024
88e76ea
added additional test cases & included pycountry in requirements.txt
AlfredNwolisa Nov 27, 2024
da195c8
Merge remote-tracking branch 'origin/Tlnd_feed_sync_process' into Tln…
AlfredNwolisa Nov 27, 2024
8146047
added additional test cases & included pycountry in requirements.txt
AlfredNwolisa Nov 27, 2024
8c77e62
fix
AlfredNwolisa Nov 27, 2024
6837add
Add detailed error handling and checks for feed creation
AlfredNwolisa Nov 28, 2024
8a83af4
Refactor mocking of PublisherClient in test setup.
AlfredNwolisa Nov 28, 2024
1fac142
Update requirements: move pycountry to helpers
AlfredNwolisa Nov 28, 2024
28d5f12
Update requirements: pycountry
AlfredNwolisa Nov 28, 2024
bc58aaa
Handle empty country name in get_country_code function
AlfredNwolisa Nov 28, 2024
200d720
Update test log message for empty country code
AlfredNwolisa Nov 28, 2024
ff2a79a
Merge branch 'main' into Tlnd_feed_sync_process
cka-y Nov 28, 2024
8a92ed8
fix: last test
cka-y Nov 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions functions-python/feed_sync_process_transitland/.env.rename_me
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Environment variables for tokens function to run locally. Delete this line after rename the file.
FEEDS_DATABASE_URL=postgresql://postgres:postgres@localhost:54320/MobilityDatabase
PROJECT_ID=my-project-id
PUBSUB_TOPIC_NAME=my-topic
TRANSITLAND_API_KEY=your-api-key
AlfredNwolisa marked this conversation as resolved.
Show resolved Hide resolved
DATASET_BATCH_TOPIC_NAME=dataset_batch_topic_{env}_
107 changes: 107 additions & 0 deletions functions-python/feed_sync_process_transitland/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# TLD Feed Sync Process

Subscribed to the topic set in the `feed-sync-dispatcher` function, `feed-sync-process` is triggered for each message published. It handles the processing of feed updates, ensuring data consistency and integrity. The function performs the following operations:

1. **Feed Status Check**: It verifies the current state of the feed in the database using external_id and source.
2. **URL Validation**: Checks if the feed URL already exists in the database.
3. **Feed Processing**: Based on the current state:
- If no existing feed is found, creates a new feed entry
- If feed exists with a different URL, creates a new feed and deprecates the old one
- If feed exists with the same URL, no action is taken
4. **Batch Processing Trigger**: For non-authenticated feeds, publishes events to the dataset batch topic for further processing.

The function maintains feed history through the `redirectingid` table and ensures proper status tracking with 'active' and 'deprecated' states.

# Message Format
The function expects a Pub/Sub message with the following format:
```json
{
"message": {
"data": {
"external_id": "feed-identifier",
"feed_id": "unique-feed-id",
"feed_url": "http://example.com/feed",
"execution_id": "execution-identifier",
"spec": "gtfs",
"auth_info_url": null,
"auth_param_name": null,
"type": null,
"operator_name": "Transit Agency Name",
"country": "Country Name",
"state_province": "State/Province",
"city_name": "City Name",
"source": "TLD",
"payload_type": "new|update"
}
}
}
```

# Function Configuration
The function is configured using the following environment variables:
- `PROJECT_ID`: The Google Cloud project ID
- `DATASET_BATCH_TOPIC_NAME`: The name of the topic for batch processing triggers
- `FEEDS_DATABASE_URL`: The URL of the feeds database
- `ENV`: [Optional] Environment identifier (e.g., 'dev', 'prod')

# Database Schema
The function interacts with the following tables:
1. `feed`: Stores feed information
- Contains fields like id, data_type, feed_name, producer_url, etc.
- Tracks feed status ('active' or 'deprecated')
- Uses CURRENT_TIMESTAMP for created_at

2. `externalid`: Maps external identifiers to feed IDs
- Links external_id and source to feed entries
- Maintains source tracking

3. `redirectingid`: Tracks feed updates
- Maps old feed IDs to new ones
- Maintains update history

# Local development
The local development of this function follows the same steps as the other functions.

Install Google Pub/Sub emulator, please refer to the [README.md](../README.md) file for more information.

## Python requirements

- Install the requirements
```bash
pip install -r ./functions-python/feed_sync_process_transitland/requirements.txt
```

## Test locally with Google Cloud Emulators

- Execute the following commands to start the emulators:
```bash
gcloud beta emulators pubsub start --project=test-project --host-port='localhost:8043'
```

- Create a Pub/Sub topic in the emulator:
```bash
curl -X PUT "http://localhost:8043/v1/projects/test-project/topics/feed-sync-transitland"
```

- Start function
```bash
export PUBSUB_EMULATOR_HOST=localhost:8043 && ./scripts/function-python-run.sh --function_name feed_sync_process_transitland
```

- [Optional]: Create a local subscription to print published messages:
```bash
./scripts/pubsub_message_print.sh feed-sync-process-transitland
```

- Execute function
```bash
curl http://localhost:8080
```

- To run/debug from your IDE use the file `main_local_debug.py`

# Test
- Run the tests
```bash
./scripts/api-tests.sh --folder functions-python/feed_sync_dispatcher_transitland
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"name": "feed-sync-process-transitland",
"description": "Feed Sync process for Transitland feeds",
"entry_point": "process_feed_event",
"timeout": 540,
"memory": "512Mi",
"trigger_http": true,
"include_folders": ["database_gen", "helpers"],
"secret_environment_variables": [
{
"key": "FEEDS_DATABASE_URL"
}
],
"ingress_settings": "ALLOW_INTERNAL_AND_GCLB",
"max_instance_request_concurrency": 20,
"max_instance_count": 10,
"min_instance_count": 0,
"available_cpu": 1
}
131 changes: 131 additions & 0 deletions functions-python/feed_sync_process_transitland/main_local_debug.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# Code to be able to debug locally without affecting the runtime cloud function
#
# Requirements:
# - Google Cloud SDK installed
# - Make sure to have the following environment variables set in your .env.local file
# - Local database in running state
# - Follow the instructions in the README.md file
#
# Usage:
# - python feed_sync_process_transitland/main_local_debug.py

import base64
import json
import logging
from dataclasses import dataclass
from dotenv import load_dotenv
from feed_sync_process_transitland.src.main import process_feed_event
import src.main
from unittest.mock import Mock
from google.cloud import pubsub_v1

# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)

logger = logging.getLogger("feed_processor")
handler = logging.StreamHandler()
handler.setFormatter(
logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
)
logger.addHandler(handler)
logger.setLevel(logging.INFO)

src.main.logger = logger

load_dotenv(dotenv_path=".env.local_test")


@dataclass
class CloudEvent:
attributes: dict
data: dict


# mock publisher client
class MockPublisherClient:
def topic_path(self, project_id, topic_id):
return f"projects/{project_id}/topics/{topic_id}"

def publish(self, topic_path, data):
logger.info(
f"[LOCAL DEBUG] Would publish to {topic_path}: {data.decode('utf-8')}"
)
return Mock() # Returns a mock future


# Mock real publisher
pubsub_v1.PublisherClient = MockPublisherClient


def process_event_safely(cloud_event, description=""):
"""Wrapper to handle event processing with better error handling"""
try:
logger.info(f"\nProcessing {description}:")
logger.info("-" * 50)
result = process_feed_event(cloud_event)
logger.info(f"Process result: {result}")
except Exception as e:
logger.error(f"Error processing {description}: {str(e)}")
return False
return True


if __name__ == "__main__":
logger.info("Starting local debug session...")

# Define cloud event attributes
attributes = {
"type": "com.google.cloud.pubsub.topic.publish",
"source": "//pubsub.googleapis.com/projects/sample-project/topics/sample-topic",
}

# New Feed
feed_payload = {
"external_id": "test-feed-1",
"feed_id": "feed1",
"feed_url": "https://example.com/test-feed",
"execution_id": "local-debug-123",
"spec": "gtfs",
"auth_info_url": None,
"auth_param_name": None,
"type": None,
"operator_name": "Test Operator",
"country": "USA",
"state_province": "CA",
"city_name": "Test City",
"source": "TLD",
"payload_type": "new",
}

data = {
"message": {
"data": base64.b64encode(json.dumps(feed_payload).encode("utf-8")).decode(
"utf-8"
)
}
}

# Process new feed event
cloud_event = CloudEvent(attributes, data)
new_feed_success = process_event_safely(cloud_event, "new feed event")

# Update Feed (only if new feed was successful)
if new_feed_success:
update_payload = feed_payload.copy()
update_payload["feed_url"] = "http://example.com/test-feed-updated"
update_payload["payload_type"] = "update"

update_data = {
"message": {
"data": base64.b64encode(
json.dumps(update_payload).encode("utf-8")
).decode("utf-8")
}
}

cloud_event_update = CloudEvent(attributes, update_data)
process_event_safely(cloud_event_update, "update feed event")

logger.info("Local debug session completed.")
22 changes: 22 additions & 0 deletions functions-python/feed_sync_process_transitland/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Common packages
functions-framework==3.*
google-cloud-logging
psycopg2-binary==2.9.6
aiohttp~=3.10.5
asyncio~=3.4.3
urllib3~=2.2.2
requests~=2.32.3
attrs~=23.1.0
pluggy~=1.3.0
certifi~=2024.8.30

# SQL Alchemy and Geo Alchemy
SQLAlchemy==2.0.23
geoalchemy2==0.14.7

# Google specific packages for this function
google-cloud-pubsub
cloudevents~=1.10.1

# Additional packages for this function
pandas
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Faker
pytest~=7.4.3
Empty file.
Loading
Loading