Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: transitFeedSyncProcessing implementation #819

Merged
merged 21 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
46cca35
feat: Add Transitland feed sync processor
AlfredNwolisa Nov 12, 2024
8f2740f
lint fix
AlfredNwolisa Nov 13, 2024
af81f14
Refactor to use SQLAlchemy models for database operations
AlfredNwolisa Nov 18, 2024
e5306b1
Remove unused freeze_time import from tests
AlfredNwolisa Nov 19, 2024
cfc11cc
Update functions-python/feed_sync_process_transitland/src/main.py
AlfredNwolisa Nov 19, 2024
19cde6c
Refactor FeedProcessor for enhanced logging and error handling
AlfredNwolisa Nov 19, 2024
3818ab9
Update logging and refactor feed processing
AlfredNwolisa Nov 26, 2024
33e5e17
lint fix
AlfredNwolisa Nov 26, 2024
10f5437
added pycountry to requirements.txt
AlfredNwolisa Nov 26, 2024
88e76ea
added additional test cases & included pycountry in requirements.txt
AlfredNwolisa Nov 27, 2024
da195c8
Merge remote-tracking branch 'origin/Tlnd_feed_sync_process' into Tln…
AlfredNwolisa Nov 27, 2024
8146047
added additional test cases & included pycountry in requirements.txt
AlfredNwolisa Nov 27, 2024
8c77e62
fix
AlfredNwolisa Nov 27, 2024
6837add
Add detailed error handling and checks for feed creation
AlfredNwolisa Nov 28, 2024
8a83af4
Refactor mocking of PublisherClient in test setup.
AlfredNwolisa Nov 28, 2024
1fac142
Update requirements: move pycountry to helpers
AlfredNwolisa Nov 28, 2024
28d5f12
Update requirements: pycountry
AlfredNwolisa Nov 28, 2024
bc58aaa
Handle empty country name in get_country_code function
AlfredNwolisa Nov 28, 2024
200d720
Update test log message for empty country code
AlfredNwolisa Nov 28, 2024
ff2a79a
Merge branch 'main' into Tlnd_feed_sync_process
cka-y Nov 28, 2024
8a92ed8
fix: last test
cka-y Nov 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion functions-python/batch_process_dataset/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ google-api-core
google-cloud-firestore
google-cloud-datastore
google-cloud-bigquery
cloudevents~=1.10.1
cloudevents~=1.10.1
9 changes: 9 additions & 0 deletions functions-python/feed_sync_process_transitland/.coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[run]
omit =
*/test*/*
*/dataset_service/*
*/helpers/*

[report]
exclude_lines =
if __name__ == .__main__.:
5 changes: 5 additions & 0 deletions functions-python/feed_sync_process_transitland/.env.rename_me
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Environment variables for tokens function to run locally. Delete this line after rename the file.
FEEDS_DATABASE_URL=postgresql://postgres:postgres@localhost:54320/MobilityDatabase
PROJECT_ID=mobility-feeds-dev
PUBSUB_TOPIC_NAME=my-topic
DATASET_BATCH_TOPIC_NAME=dataset_batch_topic_{env}_
107 changes: 107 additions & 0 deletions functions-python/feed_sync_process_transitland/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# TLD Feed Sync Process

Subscribed to the topic set in the `feed-sync-dispatcher` function, `feed-sync-process` is triggered for each message published. It handles the processing of feed updates, ensuring data consistency and integrity. The function performs the following operations:

1. **Feed Status Check**: It verifies the current state of the feed in the database using external_id and source.
2. **URL Validation**: Checks if the feed URL already exists in the database.
3. **Feed Processing**: Based on the current state:
- If no existing feed is found, creates a new feed entry
- If feed exists with a different URL, creates a new feed and deprecates the old one
- If feed exists with the same URL, no action is taken
4. **Batch Processing Trigger**: For non-authenticated feeds, publishes events to the dataset batch topic for further processing.

The function maintains feed history through the `redirectingid` table and ensures proper status tracking with 'active' and 'deprecated' states.

# Message Format
The function expects a Pub/Sub message with the following format:
```json
{
"message": {
"data": {
"external_id": "feed-identifier",
"feed_id": "unique-feed-id",
"feed_url": "http://example.com/feed",
"execution_id": "execution-identifier",
"spec": "gtfs",
"auth_info_url": null,
"auth_param_name": null,
"type": null,
"operator_name": "Transit Agency Name",
"country": "Country Name",
"state_province": "State/Province",
"city_name": "City Name",
"source": "TLD",
"payload_type": "new|update"
}
}
}
```

# Function Configuration
The function is configured using the following environment variables:
- `PROJECT_ID`: The Google Cloud project ID
- `DATASET_BATCH_TOPIC_NAME`: The name of the topic for batch processing triggers
- `FEEDS_DATABASE_URL`: The URL of the feeds database
- `ENV`: [Optional] Environment identifier (e.g., 'dev', 'prod')

# Database Schema
The function interacts with the following tables:
1. `feed`: Stores feed information
- Contains fields like id, data_type, feed_name, producer_url, etc.
- Tracks feed status ('active' or 'deprecated')
- Uses CURRENT_TIMESTAMP for created_at

2. `externalid`: Maps external identifiers to feed IDs
- Links external_id and source to feed entries
- Maintains source tracking

3. `redirectingid`: Tracks feed updates
- Maps old feed IDs to new ones
- Maintains update history

# Local development
The local development of this function follows the same steps as the other functions.

Install Google Pub/Sub emulator, please refer to the [README.md](../README.md) file for more information.

## Python requirements

- Install the requirements
```bash
pip install -r ./functions-python/feed_sync_process_transitland/requirements.txt
```

## Test locally with Google Cloud Emulators

- Execute the following commands to start the emulators:
```bash
gcloud beta emulators pubsub start --project=test-project --host-port='localhost:8043'
```

- Create a Pub/Sub topic in the emulator:
```bash
curl -X PUT "http://localhost:8043/v1/projects/test-project/topics/feed-sync-transitland"
```

- Start function
```bash
export PUBSUB_EMULATOR_HOST=localhost:8043 && ./scripts/function-python-run.sh --function_name feed_sync_process_transitland
```

- [Optional]: Create a local subscription to print published messages:
```bash
./scripts/pubsub_message_print.sh feed-sync-process-transitland
```

- Execute function
```bash
curl http://localhost:8080
```

- To run/debug from your IDE use the file `main_local_debug.py`

# Test
- Run the tests
```bash
./scripts/api-tests.sh --folder functions-python/feed_sync_dispatcher_transitland
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"name": "feed-sync-process-transitland",
"description": "Feed Sync process for Transitland feeds",
"entry_point": "process_feed_event",
"timeout": 540,
"memory": "512Mi",
"trigger_http": true,
"include_folders": ["database_gen", "helpers"],
"secret_environment_variables": [
{
"key": "FEEDS_DATABASE_URL"
}
],
"ingress_settings": "ALLOW_INTERNAL_AND_GCLB",
"max_instance_request_concurrency": 20,
"max_instance_count": 10,
"min_instance_count": 0,
"available_cpu": 1
}
173 changes: 173 additions & 0 deletions functions-python/feed_sync_process_transitland/main_local_debug.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
"""
Code to be able to debug locally without affecting the runtime cloud function.

Requirements:
- Google Cloud SDK installed
- Make sure to have the following environment variables set in your .env.local file:
- PROJECT_ID
- DATASET_BATCH_TOPIC_NAME
- FEEDS_DATABASE_URL
- Local database in running state

Usage:
- python feed_sync_process_transitland/main_local_debug.py
"""

import base64
import json
import os
from unittest.mock import MagicMock, patch
import logging
import sys

import pytest
from dotenv import load_dotenv

# Configure local logging first
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
stream=sys.stdout,
)

logger = logging.getLogger("feed_processor")

# Mock the Google Cloud Logger


class MockLogger:

"""Mock logger class"""

@staticmethod
def init_logger():
return MagicMock()

def __init__(self, name):
self.name = name

def get_logger(self):
return logger

def addFilter(self, filter):
pass


with patch("helpers.logger.Logger", MockLogger):
from feed_sync_process_transitland.src.main import process_feed_event

# Load environment variables
load_dotenv(dotenv_path=".env.rename_me")


class CloudEvent:
"""Cloud Event data structure."""

def __init__(self, attributes: dict, data: dict):
self.attributes = attributes
self.data = data


@pytest.fixture
def mock_pubsub():
"""Fixture to mock PubSub client"""
with patch("google.cloud.pubsub_v1.PublisherClient") as mock_publisher:
publisher_instance = MagicMock()

def mock_topic_path(project_id, topic_id):
return f"projects/{project_id}/topics/{topic_id}"

def mock_publish(topic_path, data):
logger.info(
f"[LOCAL DEBUG] Would publish to {topic_path}: {data.decode('utf-8')}"
)
future = MagicMock()
future.result.return_value = "message_id"
return future

publisher_instance.topic_path.side_effect = mock_topic_path
publisher_instance.publish.side_effect = mock_publish
mock_publisher.return_value = publisher_instance

yield mock_publisher


def process_event_safely(cloud_event, description=""):
"""Process event with error handling."""
try:
logger.info(f"\nProcessing {description}:")
logger.info("-" * 50)
result = process_feed_event(cloud_event)
logger.info(f"Process result: {result}")
return True
except Exception as e:
logger.error(f"Error processing {description}: {str(e)}")
return False


def main():
"""Main function to run local debug tests"""
logger.info("Starting local debug session...")

# Define test event data
test_payload = {
"external_id": "test-feed-1",
"feed_id": "feed1",
"feed_url": "https://example.com/test-feed-2",
"execution_id": "local-debug-123",
"spec": "gtfs",
"auth_info_url": None,
"auth_param_name": None,
"type": None,
"operator_name": "Test Operator",
"country": "USA",
"state_province": "CA",
"city_name": "Test City",
"source": "TLD",
"payload_type": "new",
}

# Create cloud event
cloud_event = CloudEvent(
attributes={
"type": "com.google.cloud.pubsub.topic.publish",
"source": f"//pubsub.googleapis.com/projects/{os.getenv('PROJECT_ID')}/topics/test-topic",
},
data={
"message": {
"data": base64.b64encode(
json.dumps(test_payload).encode("utf-8")
).decode("utf-8")
}
},
)

# Set up mocks
with patch(
"google.cloud.pubsub_v1.PublisherClient", new_callable=MagicMock
) as mock_publisher, patch("google.cloud.logging.Client", MagicMock()):
publisher_instance = MagicMock()

def mock_topic_path(project_id, topic_id):
return f"projects/{project_id}/topics/{topic_id}"

def mock_publish(topic_path, data):
logger.info(
f"[LOCAL DEBUG] Would publish to {topic_path}: {data.decode('utf-8')}"
)
future = MagicMock()
future.result.return_value = "message_id"
return future

publisher_instance.topic_path.side_effect = mock_topic_path
publisher_instance.publish.side_effect = mock_publish
mock_publisher.return_value = publisher_instance

# Process test event
process_event_safely(cloud_event, "test feed event")

logger.info("Local debug session completed.")


if __name__ == "__main__":
main()
23 changes: 23 additions & 0 deletions functions-python/feed_sync_process_transitland/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Common packages
functions-framework==3.*
google-cloud-logging
psycopg2-binary==2.9.6
aiohttp~=3.10.5
asyncio~=3.4.3
urllib3~=2.2.2
requests~=2.32.3
attrs~=23.1.0
pluggy~=1.3.0
certifi~=2024.8.30

# SQL Alchemy and Geo Alchemy
SQLAlchemy==2.0.23
geoalchemy2==0.14.7

# Google specific packages for this function
google-cloud-pubsub
cloudevents~=1.10.1

# Additional packages for this function
pandas
pycountry
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Faker
pytest~=7.4.3
Empty file.
Loading
Loading