diff --git a/functions-python/helpers/logger.py b/functions-python/helpers/logger.py index 30725d222..887b8accf 100644 --- a/functions-python/helpers/logger.py +++ b/functions-python/helpers/logger.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import os import google.cloud.logging from google.cloud.logging_v2 import Client @@ -45,11 +46,24 @@ def __init__(self, name): @staticmethod def init_logger() -> Client: """ - Initializes the logger + Initializes the logger for both local debugging and GCP environments. """ - client = google.cloud.logging.Client() - client.get_default_handler() - client.setup_logging() + try: + # Check if running in a GCP environment (default credentials available) + if os.getenv("ENV") == "local": + # Local environment: use standard logging + logging.basicConfig(level=logging.DEBUG) + logging.info("Local logger initialized (standard logging).") + client = None # Return None since cloud client is not used + else: + client = google.cloud.logging.Client() + client.setup_logging() + logging.info("Google Cloud Logging initialized.") + except Exception as e: + logging.error(f"Failed to initialize logging: {e}") + logging.basicConfig(level=logging.DEBUG) + logging.info("Fallback to standard local logging.") + client = None return client def get_logger(self) -> Client: diff --git a/functions-python/reverse_geolocation/.coveragerc b/functions-python/reverse_geolocation/.coveragerc new file mode 100644 index 000000000..d3ef5cbc8 --- /dev/null +++ b/functions-python/reverse_geolocation/.coveragerc @@ -0,0 +1,9 @@ +[run] +omit = + */test*/* + */helpers/* + */database_gen/* + +[report] +exclude_lines = + if __name__ == .__main__.: \ No newline at end of file diff --git a/functions-python/reverse_geolocation/.env.rename_me b/functions-python/reverse_geolocation/.env.rename_me new file mode 100644 index 000000000..49776e896 --- /dev/null +++ b/functions-python/reverse_geolocation/.env.rename_me @@ -0,0 +1,5 @@ +# Environment variables for the reverse_geolocation functions +FEEDS_DATABASE_URL=${{FEEDS_DATABASE_URL}} +BUCKET_NAME=${{BUCKET_NAME}} +MAX_RETRIES=${{MAX_RETRIES}} +# TODO: add the other env variables here diff --git a/functions-python/reverse_geolocation/README.md b/functions-python/reverse_geolocation/README.md new file mode 100644 index 000000000..471e5fa2e --- /dev/null +++ b/functions-python/reverse_geolocation/README.md @@ -0,0 +1,11 @@ +# Reverse Geolocation + +## Function Workflow + + +## Function Configuration + + +## Local Development + +Local development of these functions should follow standard practices for GCP serverless functions. For general instructions on setting up the development environment, refer to the main [README.md](../README.md) file. \ No newline at end of file diff --git a/functions-python/reverse_geolocation/function_config.json b/functions-python/reverse_geolocation/function_config.json new file mode 100644 index 000000000..39822dae2 --- /dev/null +++ b/functions-python/reverse_geolocation/function_config.json @@ -0,0 +1,20 @@ +{ + "name": "reverse-geolocation", + "description": "Function to reverse geolocate coordinates", + "entry_point": "reverse_geolocation", + "timeout": 3600, + "trigger_http": false, + "include_folders": ["database_gen", "helpers"], + "environment_variables": [], + "secret_environment_variables": [ + { + "key": "FEEDS_DATABASE_URL" + } + ], + "ingress_settings": "ALLOW_ALL", + "max_instance_request_concurrency": 1, + "max_instance_count": 10, + "min_instance_count": 0, + "available_cpu": 1, + "available_memory": "4Gi" +} diff --git a/functions-python/reverse_geolocation/requirements.txt b/functions-python/reverse_geolocation/requirements.txt new file mode 100644 index 000000000..ebb843f3b --- /dev/null +++ b/functions-python/reverse_geolocation/requirements.txt @@ -0,0 +1,29 @@ +# Common packages +functions-framework==3.* +google-cloud-logging +psycopg2-binary==2.9.6 +aiohttp~=3.10.5 +asyncio~=3.4.3 +urllib3~=2.2.2 +requests~=2.32.3 +attrs~=23.1.0 +pluggy~=1.3.0 +certifi~=2024.7.4 + +# SQL Alchemy and Geo Alchemy +SQLAlchemy==2.0.23 +geoalchemy2==0.14.7 + +# Google specific packages for this function +google-cloud-pubsub +google-cloud-datastore +google-cloud-storage +google-cloud-bigquery +cloudevents~=1.10.1 +google-cloud-tasks +google-cloud-storage + +# Additional packages for this function +pycountry +shapely +gtfs-kit \ No newline at end of file diff --git a/functions-python/reverse_geolocation/requirements_dev.txt b/functions-python/reverse_geolocation/requirements_dev.txt new file mode 100644 index 000000000..800a4ac11 --- /dev/null +++ b/functions-python/reverse_geolocation/requirements_dev.txt @@ -0,0 +1,4 @@ +Faker +pytest~=7.4.3 +urllib3-mock +requests-mock \ No newline at end of file diff --git a/functions-python/reverse_geolocation/src/__init__.py b/functions-python/reverse_geolocation/src/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/functions-python/reverse_geolocation/src/common.py b/functions-python/reverse_geolocation/src/common.py new file mode 100644 index 000000000..802b3f7c2 --- /dev/null +++ b/functions-python/reverse_geolocation/src/common.py @@ -0,0 +1 @@ +ERROR_STATUS_CODE = 299 # Custom status code to stop Cloud Tasks retries diff --git a/functions-python/reverse_geolocation/src/main.py b/functions-python/reverse_geolocation/src/main.py new file mode 100644 index 000000000..b102a3582 --- /dev/null +++ b/functions-python/reverse_geolocation/src/main.py @@ -0,0 +1,39 @@ +import logging +from typing import Dict, Tuple + +import flask +import functions_framework +from cloudevents.http import CloudEvent + +from reverse_geolocation_aggregator import reverse_geolocation_aggregate as aggregator +from reverse_geolocation_processor import reverse_geolocation_process as processor +from reverse_geolocator import reverse_geolocation as geolocator + +# Initialize logging +logging.basicConfig(level=logging.INFO) + + +@functions_framework.cloud_event +def reverse_geolocation(cloud_event: CloudEvent): + """Function that is triggered when a new dataset is uploaded to extract the location information.""" + return geolocator(cloud_event) + + +@functions_framework.http +def reverse_geolocation_process( + request: flask.Request, +) -> Tuple[str, int] | Tuple[Dict, int]: + """ + Main function to handle reverse geolocation population. + """ + return processor(request) + + +@functions_framework.http +def reverse_geolocation_aggregate( + request: flask.Request, +) -> Tuple[str, int] | Tuple[Dict, int]: + """ + Main function to handle reverse geolocation population. + """ + return aggregator(request) diff --git a/functions-python/reverse_geolocation/src/reverse_geolocation_aggregator.py b/functions-python/reverse_geolocation/src/reverse_geolocation_aggregator.py new file mode 100644 index 000000000..73ae4e862 --- /dev/null +++ b/functions-python/reverse_geolocation/src/reverse_geolocation_aggregator.py @@ -0,0 +1,339 @@ +import json +import logging +import os +from collections import defaultdict +from typing import Dict, Tuple, List, Optional +import pycountry + +import flask +from google.cloud import storage +from shapely.geometry import Polygon, mapping +from database_gen.sqlacodegen_models import Geopolygon +from helpers.database import start_db_session +from helpers.logger import Logger +from geoalchemy2.shape import to_shape +from reverse_geolocation.src.common import ERROR_STATUS_CODE +from shapely.validation import make_valid +import matplotlib.pyplot as plt + +# Initialize logging +logging.basicConfig(level=logging.INFO) + +def generate_color(points_match, max_match, colormap_name="BuPu"): + """ + Generate a color based on the points_match value using a matplotlib colormap. + """ + colormap = plt.get_cmap(colormap_name) + normalized_value = points_match / max_match + rgba = colormap(normalized_value) # Returns RGBA + return f"rgba({int(rgba[0]*255)}, {int(rgba[1]*255)}, {int(rgba[2]*255)}, {rgba[3]})" + + +class ReverseGeolocation: + def __init__( + self, + osm_id: int, + iso_3166_1: Optional[str], + iso_3166_2: Optional[str], + name: str, + admin_level: int, + points_match: int, + ): + self.osm_id = osm_id + self.iso_3166_1 = iso_3166_1 + self.iso_3166_2 = iso_3166_2 + self.name = name + self.admin_level = admin_level + self.points_match = points_match + self.children: List[ReverseGeolocation] = [] + self.parent: Optional[ReverseGeolocation] = None + self.geometry: Optional[Polygon] = None + + def __str__(self) -> str: + return f"{self.osm_id} - {self.name} - {self.points_match}" + + def set_geometry(self, geopolygon: Geopolygon) -> None: + shape = to_shape(geopolygon.geometry) + if shape.is_valid: + self.geometry = shape + else: + self.geometry = make_valid(shape) + + @staticmethod + def from_dict(data: dict, parent: Optional["ReverseGeolocation"] = None) -> List["ReverseGeolocation"]: + nodes = [] + locations = data if isinstance(data, list) else data.get("grouped_matches", []) + for location in locations: + node = ReverseGeolocation( + osm_id=location["osm_id"], + iso_3166_1=location.get("iso_3166_1"), + iso_3166_2=location.get("iso_3166_2"), + name=location["name"], + admin_level=location["admin_level"], + points_match=location["points_match"], + ) + if parent: + node.parent = parent + nodes.append(node) + if "sub_levels" in location: + node.children = ReverseGeolocation.from_dict(location["sub_levels"], parent=node) + return nodes + + def to_dict(self) -> dict: + return { + "osm_id": self.osm_id, + "iso_3166_1": self.iso_3166_1, + "iso_3166_2": self.iso_3166_2, + "name": self.name, + "admin_level": self.admin_level, + "points_match": self.points_match, + "sub_levels": [child.to_dict() for child in self.children], + } + + def get_level(self, target_level: int, current_level: int = 0) -> List["ReverseGeolocation"]: + if current_level == target_level: + return [self] + results = [] + for child in self.children: + results.extend(child.get_level(target_level, current_level + 1)) + return results + + def get_leaves(self) -> List["ReverseGeolocation"]: + if not self.children: + return [self] + leaves = [] + for child in self.children: + leaves.extend(child.get_leaves()) + return leaves + + def get_country_code(self) -> str: + if self.iso_3166_1: + return self.iso_3166_1 + if self.parent: + return self.parent.get_country_code() + return "" + + def get_display_name(self) -> str: + display_name = self.name + if self.iso_3166_1: + try: + flag = pycountry.countries.get(alpha_2=self.iso_3166_1).flag + display_name = f"{flag} {display_name}" + except AttributeError: + pass + if self.parent: + display_name = f"{self.parent.get_display_name()}, {display_name}" + return display_name + + def get_geojson_feature(self, max_leaves_points) -> Dict: + if not self.geometry: + return {} + return { + "type": "Feature", + "properties": { + "osm_id": self.osm_id, + "country_code": self.get_country_code(), + "display_name": self.get_display_name(), + "points_match": self.points_match, + "color": generate_color(self.points_match, max_leaves_points), + }, + "geometry": mapping(self.geometry), + } + + +def parse_request_parameters(request: flask.Request) -> Tuple[str, int, int, str]: + """ + Parse the request parameters and return the execution ID, number of batches, and retry count. + """ + logging.info("Parsing request parameters.") + try: + retry_count = int(request.headers.get("X-CloudTasks-TaskRetryCount", 0)) + except ValueError: + logging.error( + f"Error parsing retry count: {request.headers.get('X-CloudTasks-TaskRetryCount')}. Defaulting to 0." + ) + retry_count = 0 + + request_json = request.get_json(silent=True) + if (not request_json + or "execution_id" not in request_json + or "n_batches" not in request_json + or "stable_id" not in request_json): + raise ValueError("Missing required 'execution_id', 'stable_id' or 'n_batches' in the request.") + + return ( + request_json["execution_id"], + int(request_json["n_batches"]), + retry_count, + request_json["stable_id"] + ) + + +def list_blobs(bucket_name: str, prefix: str = "") -> List[storage.Blob]: + """ + List all JSON files in a GCP bucket with the given prefix. + """ + storage_client = storage.Client() + blobs = list(storage_client.list_blobs(bucket_name, prefix=prefix)) + return [blob for blob in blobs if blob.name.endswith(".json")] + + +def merge_reverse_geolocations(locations: List[ReverseGeolocation]) -> List[ReverseGeolocation]: + """ + Recursively merge a list of ReverseGeolocation objects. + """ + if not locations: + return [] + + # Group by osm_id + per_osm_id = defaultdict(list) + for location in locations: + per_osm_id[location.osm_id].append(location) + + merged_results = [] + for osm_id, grouped_locations in per_osm_id.items(): + # Aggregate points_match and merge children + total_points_match = sum(loc.points_match for loc in grouped_locations) + chosen_location = grouped_locations[0] + chosen_location.points_match = total_points_match + + # Merge children recursively + all_children = [child for loc in grouped_locations for child in loc.children] + chosen_location.children = merge_reverse_geolocations(all_children) + + merged_results.append(chosen_location) + + return merged_results + + +def reverse_geolocation_aggregate( + request: flask.Request, +) -> Tuple[str, int] | Tuple[Dict, int]: + """ + Handle reverse geolocation aggregation by merging JSON data into a single result. + """ + Logger.init_logger() + + source_bucket = os.getenv("BUCKET_NAME") + max_retry = int(os.getenv("MAX_RETRY", 10)) + + if not source_bucket: + logging.error("Source bucket name not set.") + return "Source bucket name not set.", ERROR_STATUS_CODE + + try: + execution_id, n_batches, retry_count, stable_id = parse_request_parameters(request) + logging.info(f"Execution ID: {execution_id}, Number of batches: {n_batches}, Retry Count: {retry_count}") + except ValueError as e: + return handle_error("Error parsing request parameters", e, ERROR_STATUS_CODE) + + try: + files = validate_files_ready(source_bucket, execution_id, n_batches, retry_count, max_retry) + except ValueError as e: + return handle_error("Validation error", e) + + try: + aggregated_data, total_points, geojson_data = aggregate_data_from_files(files) + logging.info(f"Aggregated {total_points} points from {len(files)} files.") + except Exception as e: + return handle_error("Error aggregating data", e, ERROR_STATUS_CODE) + + try: + save_aggregated_data(source_bucket, execution_id, aggregated_data, total_points) + save_geojson(os.getenv("DATASETS_BUCKET_NAME"), stable_id, geojson_data) + except Exception as e: + return handle_error("Error saving aggregated data", e, ERROR_STATUS_CODE) + + return "Done", 200 + + +def validate_files_ready( + bucket_name: str, prefix: str, n_batches: int, retry_count: int, max_retry: int +) -> List[storage.Blob]: + """ + Validate that the required number of files is available in the bucket. + """ + files = list_blobs(bucket_name, prefix) + logging.info(f"Found {len(files)} files in the bucket.") + + if len(files) < n_batches: + if retry_count < max_retry: + logging.warning("Files are not ready yet. Retrying...") + raise ValueError("Not yet ready to process") + logging.error("Maximum retries exceeded. Aborting.") + raise ValueError("Maximum retries exceeded.") + return files + + +def aggregate_data_from_files(files: List[storage.Blob]) -> Tuple[List[Dict], int, Dict]: + """ + Aggregate data from the given list of files. + """ + results: List[ReverseGeolocation] = [] + total_points = 0 + + for file in files: + if file.name.endswith("output.json"): + continue + json_data = json.loads(file.download_as_string()) + results.extend(ReverseGeolocation.from_dict(json_data)) + total_points += json_data.get("summary", {}).get("total_points", 0) + + root_nodes = merge_reverse_geolocations(results) + leaves = [leaf for node in root_nodes for leaf in node.get_leaves()] + max_leaves_points = max(leaf.points_match for leaf in leaves) + osm_ids = [leaf.osm_id for leaf in leaves] + + session = start_db_session(os.getenv("FEEDS_DATABASE_URL"), echo=False) + leaves_geopolygons = ( + session.query(Geopolygon).filter(Geopolygon.osm_id.in_(osm_ids)).all() + ) + geopolygons_map = {geopolygon.osm_id: geopolygon for geopolygon in leaves_geopolygons} + for leaf in leaves: + leaf.set_geometry(geopolygons_map[leaf.osm_id]) + geojson_map = { + "type": "FeatureCollection", + "features": [node.get_geojson_feature(max_leaves_points) for node in leaves], + } + return [node.to_dict() for node in root_nodes], total_points, geojson_map + +def save_geojson(bucket_name: str, stable_id: str, geojson: Dict) -> None: + """ + Save the GeoJSON data as a JSON file in the specified bucket. + """ + storage_client = storage.Client() + bucket = storage_client.bucket(bucket_name) + blob = bucket.blob(f"{stable_id}/geolocation.geojson") + blob.upload_from_string(json.dumps(geojson)) + blob.make_public() + logging.info(f"GeoJSON data saved to {blob.name}") + +def save_aggregated_data(bucket_name: str, + execution_id: str, + aggregated_data: List[Dict], + total_points: int) -> None: + """ + Save the aggregated data as a JSON file in the specified bucket. + """ + storage_client = storage.Client() + bucket = storage_client.bucket(bucket_name) + blob = bucket.blob(f"{execution_id}/output.json") + matched_points = sum(node["points_match"] for node in aggregated_data) + output = { + "summary" : { + "total_points": total_points, + "matched_points": matched_points, + "unmatched_points": total_points - matched_points, + }, + "locations": aggregated_data, + } + blob.upload_from_string(json.dumps(output, indent=2)) + logging.info(f"Aggregated data saved to {blob.name}") + + +def handle_error(message: str, exception: Exception, error_code: int = 400) -> Tuple[str, int]: + """ + Log and handle an error, returning an appropriate response. + """ + logging.error(f"{message}: {exception}") + return str(exception), error_code diff --git a/functions-python/reverse_geolocation/src/reverse_geolocation_processor.py b/functions-python/reverse_geolocation/src/reverse_geolocation_processor.py new file mode 100644 index 000000000..a3a2dc439 --- /dev/null +++ b/functions-python/reverse_geolocation/src/reverse_geolocation_processor.py @@ -0,0 +1,251 @@ +import json +import logging +import os +import uuid +from collections import defaultdict +from typing import List, Dict, Tuple + +import flask +from geoalchemy2 import WKTElement +from geoalchemy2.shape import to_shape +from shapely.validation import make_valid +from google.cloud import storage + +from database_gen.sqlacodegen_models import Geopolygon +from helpers.database import start_db_session +from helpers.logger import Logger +from reverse_geolocation.src.common import ERROR_STATUS_CODE + +# Initialize logging +logging.basicConfig(level=logging.INFO) + + +def build_response( + proper_match_geopolygons: Dict[str, Dict], total_points: int, unmatched_points: int +) -> Dict: + """Build a structured response from the matched geopolygons.""" + + # Helper function to merge and build hierarchical groups + def merge_hierarchy(root: Dict, geopolygons: List[Geopolygon], count: int) -> None: + if not geopolygons: + return + + # Process the current geopolygon + current = geopolygons[0] + osm_id = current.osm_id + + # Check if the current node already exists in the root + if osm_id not in root: + root[osm_id] = { + "osm_id": current.osm_id, + "iso_3166_1": current.iso_3166_1_code, + "iso_3166_2": current.iso_3166_2_code, + "name": current.name, + "admin_level": current.admin_level, + "points_match": 0, + "sub_levels": defaultdict(dict), + } + + # Increment points_match for the current node + root[osm_id]["points_match"] += count + + # Recursively process the sub-levels + merge_hierarchy(root[osm_id]["sub_levels"], geopolygons[1:], count) + + # Build the hierarchical response + grouped_matches = defaultdict(dict) + + for match_data in proper_match_geopolygons.values(): + geopolygons = match_data["geopolys"] + count = match_data["count"] + + # Merge into the top-level hierarchy + merge_hierarchy(grouped_matches, geopolygons, count) + + # Recursive function to convert defaultdict to a regular dict and clean sub-levels + def clean_hierarchy(root: Dict) -> List[Dict]: + return [ + { + "osm_id": node["osm_id"], + "iso_3166_1": node["iso_3166_1"], + "iso_3166_2": node["iso_3166_2"], + "name": node["name"], + "admin_level": node["admin_level"], + "points_match": node["points_match"], + "sub_levels": clean_hierarchy(node["sub_levels"]) + if node["sub_levels"] + else [], + } + for node in root.values() + ] + + # Construct the final response + response = { + "summary": { + "total_points": total_points, + "matched_points": total_points - unmatched_points, + "unmatched_points": unmatched_points, + }, + "grouped_matches": clean_hierarchy(grouped_matches), + } + + return response + + +def parse_request_parameters( + request: flask.Request, +) -> Tuple[List[WKTElement], WKTElement, str]: + """ + Parse the request parameters and return a list of WKT points and a bounding box. + """ + logging.info("Parsing request parameters.") + request_json = request.get_json(silent=True) + logging.info(f"Request JSON: {request_json}") + + if ( + not request_json + or "points" not in request_json + or "execution_id" not in request_json + ): + raise ValueError( + "Invalid request: missing 'points' or 'execution_id' parameter." + ) + + execution_id = str(request_json["execution_id"]) + points = request_json["points"] + if not points: + raise ValueError("Invalid request: 'points' parameter is empty.") + if not isinstance(points, list): + raise ValueError("Invalid request: 'points' parameter must be a list.") + if not all(isinstance(lat_lon, list) and len(lat_lon) == 2 for lat_lon in points): + raise ValueError( + "Invalid request: 'points' must be a list of lists with two elements each " + "representing latitude and longitude." + ) + + # Create WKT elements for each point + wkt_points = [ + WKTElement(f"POINT({point[0]} {point[1]})", srid=4326) for point in points + ] + + # Generate bounding box + lons, lats = [point[0] for point in points], [point[1] for point in points] + bounding_box_coords = [ + (min(lons), min(lats)), + (max(lons), min(lats)), + (max(lons), max(lats)), + (min(lons), max(lats)), + (min(lons), min(lats)), + ] + bounding_box = WKTElement( + f"POLYGON(({', '.join([f'{lon} {lat}' for lon, lat in bounding_box_coords])}))", + srid=4326, + ) + + return wkt_points, bounding_box, execution_id + + +def reverse_geolocation_process( + request: flask.Request, +) -> Tuple[str, int] | Tuple[Dict, int]: + """ + Main function to handle reverse geolocation population. + """ + Logger.init_logger() + bucket_name = os.getenv("BUCKET_NAME") + + # Parse request parameters + try: + wkt_points, bounding_box, execution_id = parse_request_parameters(request) + except ValueError as e: + logging.error(f"Error parsing request parameters: {e}") + return str(e), ERROR_STATUS_CODE + + # Start the database session + try: + session = start_db_session(os.getenv("FEEDS_DATABASE_URL"), echo=False) + except Exception as e: + logging.error(f"Error connecting to the database: {e}") + return str(e), 500 + + # Fetch geopolygons within the bounding box + try: + geopolygons = ( + session.query(Geopolygon) + .filter(Geopolygon.geometry.ST_Intersects(bounding_box)) + .all() + ) + geopolygons_ids = [geopolygon.osm_id for geopolygon in geopolygons] + except Exception as e: + logging.error(f"Error fetching geopolygons: {e}") + return str(e), ERROR_STATUS_CODE + + try: + logging.info(f"Found {len(geopolygons)} geopolygons within the bounding box.") + logging.info(f"The osm_ids of the geopolygons are: {geopolygons_ids}") + + # Map geopolygons into shapes + wkb_geopolygons = { + geopolygon.osm_id: { + "polygon": to_shape(geopolygon.geometry), + "object": geopolygon, + } + for geopolygon in geopolygons + } + + # Ensure geometries are valid + for geopolygon in wkb_geopolygons.values(): + if not geopolygon["polygon"].is_valid: + geopolygon["polygon"] = make_valid(geopolygon["polygon"]) + + points = [to_shape(point) for point in wkt_points] + points_match = {} + + # Match points to geopolygons + for point in points: + for osm_id, geopolygon in wkb_geopolygons.items(): + if geopolygon["polygon"].contains(point): + point_id = str(point) + if point_id not in points_match: + points_match[point_id] = [] + points_match[point_id].append(geopolygon["object"]) + + # Clean up duplicate admin levels + proper_match_geopolygons = {} + for point, geopolygons in points_match.items(): + if len(geopolygons) > 1: + admin_levels = {g.admin_level for g in geopolygons} + if len(admin_levels) == len(geopolygons): + valid_iso_3166_1 = any(g.iso_3166_1_code for g in geopolygons) + valid_iso_3166_2 = any(g.iso_3166_2_code for g in geopolygons) + if not valid_iso_3166_1 or not valid_iso_3166_2: + logging.info(f"Invalid ISO codes for point: {point}") + continue + geopolygons.sort(key=lambda x: x.admin_level) + geopolygon_as_string = ", ".join([str(g.osm_id) for g in geopolygons]) + if geopolygon_as_string not in proper_match_geopolygons: + proper_match_geopolygons[geopolygon_as_string] = { + "geopolys": geopolygons, + "count": 0, + } + proper_match_geopolygons[geopolygon_as_string]["count"] += 1 + + unmatched_points = len(wkt_points) - sum( + item["count"] for item in proper_match_geopolygons.values() + ) + logging.info(f"Unmatched points: {unmatched_points}") + response = build_response( + proper_match_geopolygons, len(wkt_points), unmatched_points + ) + logging.info(f"Response: {response}") + storage_client = storage.Client() + bucket = storage_client.bucket(bucket_name) + blob = bucket.blob(f"{execution_id}/{uuid.uuid4()}.json") + blob.upload_from_string(json.dumps(response, indent=2)) + blob.make_public() + + # Save the response to GCP bucket + return response, 200 + except Exception as e: + logging.error(f"Error processing geopolygons: {e}") + return str(e), ERROR_STATUS_CODE diff --git a/functions-python/reverse_geolocation/src/reverse_geolocator.py b/functions-python/reverse_geolocation/src/reverse_geolocator.py new file mode 100644 index 000000000..04371e081 --- /dev/null +++ b/functions-python/reverse_geolocation/src/reverse_geolocator.py @@ -0,0 +1,143 @@ +import hashlib +import json +import logging +import os +import uuid + +import gtfs_kit +import pandas as pd +from cloudevents.http import CloudEvent +from google.cloud import tasks_v2 + +from helpers.logger import Logger + +# Initialize logging +logging.basicConfig(level=logging.INFO) + + +def parse_resource_data(data: dict) -> tuple: + """ + Parse the cloud event data to extract resource information. + @:param data (dict): The data part of the CloudEvent. + @:return tuple: A tuple containing stable_id, dataset_id, and the resource URL. + """ + resource_name = data["protoPayload"]["resourceName"] + stable_id = resource_name.split("/")[-3] + dataset_id = resource_name.split("/")[-2] + file_name = resource_name.split("/")[-1] + bucket_name = data["resource"]["labels"]["bucket_name"] + url = f"https://storage.googleapis.com/{bucket_name}/{stable_id}/{dataset_id}/{file_name}" + return stable_id, dataset_id, url + +def create_http_task( + url: str, + payload: dict, + service_account_email: str, + project: str, + location: str, + queue: str +): + """ + Create an HTTP task in Cloud Tasks. + @param url (str): The URL of the task. + @param payload (dict): The payload to send in the task. + @param endpoint (str): The endpoint to send the task to. + """ + client = tasks_v2.CloudTasksClient() + task = tasks_v2.Task( + http_request=tasks_v2.HttpRequest( + http_method=tasks_v2.HttpMethod.POST, + url=url, + headers={"Content-type": "application/json"}, + oidc_token=tasks_v2.OidcToken( + service_account_email=service_account_email, + audience=url, + ), + body=json.dumps(payload).encode(), + ) + ) + return client.create_task( + tasks_v2.CreateTaskRequest( + parent=client.queue_path(project, location, queue), + task=task, + ) + ) + + +def reverse_geolocation(cloud_event: CloudEvent): + """Function that is triggered when a new dataset is uploaded to extract the location information.""" + Logger.init_logger() + logging.info(f"Cloud event data: {cloud_event.data}") + processing_function_endpoint = os.getenv("PROCESSING_FUNCTION_ENDPOINT") + aggregator_function_endpoint = os.getenv("AGGREGATOR_FUNCTION_ENDPOINT") + service_account_email = os.getenv("SERVICE_ACCOUNT_EMAIL") + project_id = os.getenv("PROJECT_ID") + location = os.getenv("LOCATION") + processing_queue = os.getenv("PROCESSING_QUEUE") + aggregator_queue = os.getenv("AGGREGATOR_QUEUE") + + try: + stable_id, dataset_id, url = parse_resource_data(cloud_event.data) + logging.info( + f"Processing dataset {dataset_id} and stable ID {stable_id} from {url}" + ) + except KeyError as e: + logging.error(f"Error parsing resource data: {e}") + return "Invalid Pub/Sub message data." + + try: + gtfs_feed = gtfs_kit.read_feed(url, dist_units="km") + stops = gtfs_feed.stops[["stop_lon", "stop_lat"]].copy() + stops = stops.sort_values(by=["stop_lon", "stop_lat"]) + stops_hash = hashlib.sha256( + pd.util.hash_pandas_object(stops, index=True).values + ).hexdigest() + # TODO: Check if stops hash has changed + logging.info(f"Stops hash: {stops_hash}") + except Exception as e: + logging.error(f"Error processing GTFS feed from {url}: {e}") + return "Error processing GTFS feed." + + # 9 - Create one GCP task per batch of 1000 remaining stops to extract the location + execution_id = str(uuid.uuid4()) + logging.info(f"Execution ID: {execution_id}") + # process batches of 1000 stops + # TODO fix logic in case of less than 1000 stops + batch_size = 1000 + n_batches = (len(stops) + batch_size - 1) // batch_size # Calculate total number of batches + logging.info(f"Number of batches: {n_batches}") + + for i in range(n_batches): + batch_stops = stops.iloc[i * batch_size: (i + 1) * batch_size] + stops_list = [[stop['stop_lon'], stop['stop_lat']] for stop in batch_stops.to_dict(orient="records")] + payload = { + "execution_id": execution_id, + "points": stops_list, + } + logging.info(f"Processing batch {i + 1} with {len(stops_list)} stops.") + logging.info(f"Payload: {payload}") + response = create_http_task( + url=processing_function_endpoint, + payload=payload, + service_account_email=service_account_email, + project=project_id, + location=location, + queue=processing_queue + ) + logging.info(f"Task created for batch {i + 1}: {response.name}") + + # 10 - Crete a GCP task to aggregate the results + create_http_task( + url=aggregator_function_endpoint, + payload={ + "execution_id": execution_id, + "n_batches": n_batches, + "stable_id": stable_id, + }, + service_account_email=service_account_email, + project=project_id, + location=location, + queue=aggregator_queue + ) + # 11 - Save Execution trace in the database + return "Success" diff --git a/functions-python/reverse_geolocation/tests/test_reverse_geolocation.py b/functions-python/reverse_geolocation/tests/test_reverse_geolocation.py new file mode 100644 index 000000000..b7307e186 --- /dev/null +++ b/functions-python/reverse_geolocation/tests/test_reverse_geolocation.py @@ -0,0 +1,9 @@ +import unittest + +from faker import Faker + +faker = Faker() + + +class TestReverseGeolocation(unittest.TestCase): + pass diff --git a/infra/functions-python/main.tf b/infra/functions-python/main.tf index 226632f6c..d19a05af1 100644 --- a/infra/functions-python/main.tf +++ b/infra/functions-python/main.tf @@ -53,6 +53,9 @@ locals { function_operations_api_config = jsondecode(file("${path.module}/../../functions-python/operations_api/function_config.json")) function_operations_api_zip = "${path.module}/../../functions-python/operations_api/.dist/operations_api.zip" + + function_reverse_geolocation_zip = "${path.module}/../../functions-python/reverse_geolocation/.dist/reverse_geolocation.zip" + function_reverse_geolocation_config = jsondecode(file("${path.module}/../../functions-python/reverse_geolocation/function_config.json")) } locals { @@ -158,6 +161,13 @@ resource "google_storage_bucket_object" "reverse_geolocation_populate_zip" { source = local.function_reverse_geolocation_populate_zip } +# 10. Reverse geolocation +resource "google_storage_bucket_object" "reverse_geolocation_zip" { + bucket = google_storage_bucket.functions_bucket.name + name = "reverse-geolocation-${substr(filebase64sha256(local.function_reverse_geolocation_zip), 0, 10)}.zip" + source = local.function_reverse_geolocation_zip +} + # Secrets access resource "google_secret_manager_secret_iam_member" "secret_iam_member" { for_each = local.unique_secret_keys @@ -787,6 +797,209 @@ resource "google_cloudfunctions2_function" "reverse_geolocation_populate" { } } +# 10. functions/reverse_geolocation cloud function +# Bucket to store reverse geolocation results +resource "google_storage_bucket" "reverse_geolocation_results_bucket" { + name = "moblitydata-reverse-geolocation-results-${var.environment}" + location = var.gcp_region + project = var.project_id + # Delete objects after 30 days + lifecycle_rule { + action { + type = "Delete" + } + condition { + age = 30 + } + } +} +# Task queue 1 to process the stops +resource "google_cloud_tasks_queue" "reverse_geolocation_process_queue" { + name = "reverse-geolocation-process-queue-${var.environment}" + project = var.project_id + location = var.gcp_region + rate_limits { + max_dispatches_per_second = 1 + max_concurrent_dispatches = 5 + } + retry_config { + max_attempts = 100 + max_retry_duration = "3600s" + min_backoff = "10s" + max_backoff = "600s" + } +} +# Task queue 2 to aggregate the results +resource "google_cloud_tasks_queue" "reverse_geolocation_aggregate_queue" { + name = "reverse-geolocation-aggregate-queue-${var.environment}" + project = var.project_id + location = var.gcp_region + rate_limits { + max_dispatches_per_second = 1 + max_concurrent_dispatches = 5 + } + retry_config { + max_attempts = 100 + max_retry_duration = "3600s" + min_backoff = "10s" + max_backoff = "600s" + } +} +# 10.1 Cloud event triggered function for reverse geolocation (when a new dataset is uploaded) +resource "google_cloudfunctions2_function" "reverse_geolocation" { + depends_on = [google_secret_manager_secret_iam_member.secret_iam_member, + google_storage_bucket.reverse_geolocation_results_bucket, + google_cloud_tasks_queue.reverse_geolocation_process_queue, + google_cloud_tasks_queue.reverse_geolocation_aggregate_queue, + google_project_iam_member.event-receiving] + name = local.function_reverse_geolocation_config.name + description = local.function_reverse_geolocation_config.description + location = var.gcp_region + event_trigger { + event_type = "google.cloud.audit.log.v1.written" + service_account_email = google_service_account.functions_service_account.email + event_filters { + attribute = "serviceName" + value = "storage.googleapis.com" + } + event_filters { + attribute = "methodName" + value = "storage.objects.create" + } + event_filters { + attribute = "resourceName" + value = "projects/_/buckets/mobilitydata-datasets-${var.environment}/objects/*/*/*.zip" + operator = "match-path-pattern" + } + } + build_config { + runtime = var.python_runtime + entry_point = local.function_reverse_geolocation_config.entry_point + source { + storage_source { + bucket = google_storage_bucket.functions_bucket.name + object = google_storage_bucket_object.reverse_geolocation_zip.name + } + } + } + service_config { + available_memory = local.function_reverse_geolocation_config.available_memory + timeout_seconds = 540 + available_cpu = local.function_reverse_geolocation_config.available_cpu + max_instance_request_concurrency = local.function_reverse_geolocation_config.max_instance_request_concurrency + max_instance_count = local.function_reverse_geolocation_config.max_instance_count + min_instance_count = local.function_reverse_geolocation_config.min_instance_count + service_account_email = google_service_account.functions_service_account.email + ingress_settings = local.function_reverse_geolocation_config.ingress_settings + vpc_connector = data.google_vpc_access_connector.vpc_connector.id + vpc_connector_egress_settings = "PRIVATE_RANGES_ONLY" + environment_variables = { + PROCESSING_FUNCTION_ENDPOINT = google_cloudfunctions2_function.reverse_geolocation_process.url + AGGREGATOR_FUNCTION_ENDPOINT = google_cloudfunctions2_function.reverse_geolocation_aggregate.url + PROJECT_ID = var.project_id + SERVICE_ACCOUNT_EMAIL = google_service_account.functions_service_account.email + PROCESSING_QUEUE = google_cloud_tasks_queue.reverse_geolocation_process_queue.name + AGGREGATOR_QUEUE = google_cloud_tasks_queue.reverse_geolocation_aggregate_queue.name + LOCATION = var.gcp_region + } + dynamic "secret_environment_variables" { + for_each = local.function_reverse_geolocation_config.secret_environment_variables + content { + key = secret_environment_variables.value["key"] + project_id = var.project_id + secret = "${upper(var.environment)}_${secret_environment_variables.value["key"]}" + version = "latest" + } + } + } +} + +# 10.2 Http triggered function for reverse geolocation +resource "google_cloudfunctions2_function" "reverse_geolocation_process" { + name = "${local.function_reverse_geolocation_config.name}-process" + description = local.function_reverse_geolocation_config.description + location = var.gcp_region + depends_on = [google_secret_manager_secret_iam_member.secret_iam_member] + build_config { + runtime = var.python_runtime + entry_point = "${local.function_reverse_geolocation_config.entry_point}_process" + source { + storage_source { + bucket = google_storage_bucket.functions_bucket.name + object = google_storage_bucket_object.reverse_geolocation_zip.name + } + } + } + service_config { + available_memory = local.function_reverse_geolocation_config.available_memory + timeout_seconds = local.function_reverse_geolocation_config.timeout + available_cpu = local.function_reverse_geolocation_config.available_cpu + max_instance_request_concurrency = local.function_reverse_geolocation_config.max_instance_request_concurrency + max_instance_count = local.function_reverse_geolocation_config.max_instance_count + min_instance_count = local.function_reverse_geolocation_config.min_instance_count + service_account_email = google_service_account.functions_service_account.email + ingress_settings = local.function_reverse_geolocation_config.ingress_settings + vpc_connector = data.google_vpc_access_connector.vpc_connector.id + vpc_connector_egress_settings = "PRIVATE_RANGES_ONLY" + environment_variables = { + BUCKET_NAME = google_storage_bucket.reverse_geolocation_results_bucket.name + } + dynamic "secret_environment_variables" { + for_each = local.function_reverse_geolocation_config.secret_environment_variables + content { + key = secret_environment_variables.value["key"] + project_id = var.project_id + secret = "${upper(var.environment)}_${secret_environment_variables.value["key"]}" + version = "latest" + } + } + } +} + +# 10.3 Http triggered function for aggregating reverse geolocation results +resource "google_cloudfunctions2_function" "reverse_geolocation_aggregate" { + name = "${local.function_reverse_geolocation_config.name}-aggregate" + description = local.function_reverse_geolocation_config.description + location = var.gcp_region + depends_on = [google_secret_manager_secret_iam_member.secret_iam_member, + google_storage_bucket.reverse_geolocation_results_bucket] + build_config { + runtime = var.python_runtime + entry_point = "${local.function_reverse_geolocation_config.entry_point}_aggregate" + source { + storage_source { + bucket = google_storage_bucket.functions_bucket.name + object = google_storage_bucket_object.reverse_geolocation_zip.name + } + } + } + service_config { + available_memory = local.function_reverse_geolocation_config.available_memory + timeout_seconds = local.function_reverse_geolocation_config.timeout + available_cpu = local.function_reverse_geolocation_config.available_cpu + max_instance_request_concurrency = local.function_reverse_geolocation_config.max_instance_request_concurrency + max_instance_count = local.function_reverse_geolocation_config.max_instance_count + min_instance_count = local.function_reverse_geolocation_config.min_instance_count + service_account_email = google_service_account.functions_service_account.email + ingress_settings = local.function_reverse_geolocation_config.ingress_settings + vpc_connector = data.google_vpc_access_connector.vpc_connector.id + vpc_connector_egress_settings = "PRIVATE_RANGES_ONLY" + environment_variables = { + BUCKET_NAME = google_storage_bucket.reverse_geolocation_results_bucket.name + DATASETS_BUCKET_NAME = "${var.datasets_bucket_name}-${var.environment}" + } + dynamic "secret_environment_variables" { + for_each = local.function_reverse_geolocation_config.secret_environment_variables + content { + key = secret_environment_variables.value["key"] + project_id = var.project_id + secret = "${upper(var.environment)}_${secret_environment_variables.value["key"]}" + version = "latest" + } + } + } +} + # IAM entry for all users to invoke the function resource "google_cloudfunctions2_function_iam_member" "tokens_invoker" { @@ -840,6 +1053,7 @@ resource "google_project_iam_member" "event-receiving" { resource "google_storage_bucket_iam_binding" "bucket_object_viewer" { for_each = { datasets_bucket = "${var.datasets_bucket_name}-${var.environment}" + reverse_geolocation_results_bucket = google_storage_bucket.reverse_geolocation_results_bucket.name } bucket = each.value depends_on = [] @@ -853,8 +1067,9 @@ resource "google_storage_bucket_iam_binding" "bucket_object_viewer" { resource "google_storage_bucket_iam_binding" "bucket_object_creator" { for_each = { gbfs_snapshots_bucket = google_storage_bucket.gbfs_snapshots_bucket.name + reverse_geolocation_results_bucket = google_storage_bucket.reverse_geolocation_results_bucket.name } - depends_on = [google_storage_bucket.gbfs_snapshots_bucket] + depends_on = [google_storage_bucket.gbfs_snapshots_bucket, google_storage_bucket.reverse_geolocation_results_bucket] bucket = each.value role = "roles/storage.objectCreator" members = [ @@ -862,6 +1077,22 @@ resource "google_storage_bucket_iam_binding" "bucket_object_creator" { ] } +# Grant the service account the ability to enqueue tasks in the task queue +resource "google_cloud_tasks_queue_iam_member" "enqueue_tasks" { + depends_on = [ + google_cloud_tasks_queue.reverse_geolocation_process_queue, + google_cloud_tasks_queue.reverse_geolocation_aggregate_queue + ] + for_each = { + reverse_geolocation_process_queue = google_cloud_tasks_queue.reverse_geolocation_process_queue.name + reverse_geolocation_aggregate_queue = google_cloud_tasks_queue.reverse_geolocation_aggregate_queue.name + } + location = var.gcp_region + member = "serviceAccount:${google_service_account.functions_service_account.email}" + name = each.value + role = "roles/cloudtasks.enqueuer" +} + # Grant the service account the ability to invoke the workflows resource "google_project_iam_member" "workflows_invoker" { project = var.project_id @@ -883,14 +1114,31 @@ resource "google_project_iam_audit_config" "all-services" { } } +# Grant permission to creat oidc tokens +resource "google_service_account_iam_member" "oidc_token_creator" { + service_account_id = google_service_account.functions_service_account.name + role = "roles/iam.serviceAccountUser" + member = "serviceAccount:${google_service_account.functions_service_account.email}" +} + output "function_tokens_name" { value = google_cloudfunctions2_function.tokens.name } -resource "google_cloudfunctions2_function_iam_member" "extract_location_invoker" { +resource "google_cloudfunctions2_function_iam_member" "cloud_function_invoker" { + depends_on = [ + google_cloudfunctions2_function.extract_location, + google_cloudfunctions2_function.reverse_geolocation_process, + google_cloudfunctions2_function.reverse_geolocation_aggregate + ] + for_each = { + extract_location = google_cloudfunctions2_function.extract_location.name + reverse_geolocation_process = google_cloudfunctions2_function.reverse_geolocation_process.name + reverse_geolocation_aggregate = google_cloudfunctions2_function.reverse_geolocation_aggregate.name + } project = var.project_id location = var.gcp_region - cloud_function = google_cloudfunctions2_function.extract_location.name + cloud_function = each.value role = "roles/cloudfunctions.invoker" member = "serviceAccount:${google_service_account.functions_service_account.email}" } diff --git a/infra/terraform-init/main.tf b/infra/terraform-init/main.tf index abca9f033..1d3737358 100644 --- a/infra/terraform-init/main.tf +++ b/infra/terraform-init/main.tf @@ -235,6 +235,12 @@ resource "google_project_iam_member" "cloud_tasks_admin" { member = "serviceAccount:${google_service_account.ci_service_account.email}" } +resource "google_project_iam_member" "cloudtasks_viewer" { + project = var.project_id + role = "roles/cloudtasks.admin" + member = "serviceAccount:${google_service_account.ci_service_account.email}" +} + resource "google_storage_bucket" "tf_state_bucket" { name = "${var.terraform_state_bucket_name_prefix}-${var.environment}" force_destroy = false diff --git a/web-app/src/app/components/MapGeoJSON.tsx b/web-app/src/app/components/MapGeoJSON.tsx new file mode 100644 index 000000000..dd88d574d --- /dev/null +++ b/web-app/src/app/components/MapGeoJSON.tsx @@ -0,0 +1,96 @@ +import * as React from 'react'; +import 'leaflet/dist/leaflet.css'; +import { MapContainer, TileLayer, GeoJSON } from 'react-leaflet'; +import { type LatLngBoundsExpression } from 'leaflet'; + +export interface MapProps { + latest_dataset_url: string; // The base URL to construct the GeoJSON file path +} + +export const MapGeoJSON = ( + props: React.PropsWithChildren, +): JSX.Element => { + const [geoJsonData, setGeoJsonData] = React.useState(null); + + // Construct the GeoJSON URL based on the latest_dataset_url + const geoJsonUrl = props.latest_dataset_url + .split('/') + .slice(0, -2) + .concat('geolocation.geojson') + .join('/'); + console.log('geoJsonUrl = ', geoJsonUrl); + + React.useEffect(() => { + const fetchGeoJson = async (): Promise => { + try { + const response = await fetch(geoJsonUrl); + if (!response.ok) { + throw new Error(`Failed to fetch GeoJSON: ${response.statusText}`); + } + const data = await response.json(); + setGeoJsonData(data); + } catch (error) { + console.error(error); + } + }; + + fetchGeoJson().then( + () => { + console.log('GeoJSON fetched successfully'); + }, + (error) => { + console.error('Failed to fetch GeoJSON: ', error); + }, + ); + }, [geoJsonUrl]); + + const getBoundsFromGeoJson = ( + geoJson: any, + ): LatLngBoundsExpression | undefined => { + if (!geoJson?.features) return undefined; + + const coordinates = geoJson.features.flatMap((feature: any) => + feature.geometry.coordinates.flat(), + ); + const lats = coordinates.map((coord: [number, number]) => coord[1]); + const lngs = coordinates.map((coord: [number, number]) => coord[0]); + + const southWest = [Math.min(...lats), Math.min(...lngs)] as [ + number, + number, + ]; + const northEast = [Math.max(...lats), Math.max(...lngs)] as [ + number, + number, + ]; + return [southWest, northEast] as LatLngBoundsExpression; + }; + + const bounds = geoJsonData ? getBoundsFromGeoJson(geoJsonData) : undefined; + + return ( + + + {geoJsonData && ( + ({ + fillColor: feature?.properties.color || '#3388ff', // Default to blue if no color is specified + weight: 2, + opacity: 1, + color: 'black', // Border color + fillOpacity: 0.7, + })} + /> + )} + + ); +}; diff --git a/web-app/src/app/interface/RemoteConfig.ts b/web-app/src/app/interface/RemoteConfig.ts index 025394986..9cdfa3f87 100644 --- a/web-app/src/app/interface/RemoteConfig.ts +++ b/web-app/src/app/interface/RemoteConfig.ts @@ -29,6 +29,7 @@ export interface RemoteConfigValues extends FirebaseDefaultConfig { gbfsMetricsBucketEndpoint: string; featureFlagBypass: string; enableFeatureFilterSearch: boolean; + enableGeoJSONLocationExtractionMap: boolean; } const featureByPassDefault: BypassConfig = { @@ -48,6 +49,7 @@ export const defaultRemoteConfigValues: RemoteConfigValues = { 'https://storage.googleapis.com/mobilitydata-gbfs-analytics-dev', featureFlagBypass: JSON.stringify(featureByPassDefault), enableFeatureFilterSearch: false, + enableGeoJSONLocationExtractionMap: true, // TODO: change before merge }; remoteConfig.defaultConfig = defaultRemoteConfigValues; diff --git a/web-app/src/app/screens/Feed/index.tsx b/web-app/src/app/screens/Feed/index.tsx index a125c1239..988772bac 100644 --- a/web-app/src/app/screens/Feed/index.tsx +++ b/web-app/src/app/screens/Feed/index.tsx @@ -37,7 +37,6 @@ import { selectDatasetsLoadingStatus, selectLatestDatasetsData, } from '../../store/dataset-selectors'; -import { Map } from '../../components/Map'; import PreviousDatasets from './PreviousDatasets'; import FeedSummary from './FeedSummary'; import DataQualitySummary from './DataQualitySummary'; @@ -51,6 +50,7 @@ import { Trans, useTranslation } from 'react-i18next'; import { type TFunction } from 'i18next'; import { theme } from '../../Theme'; import { Helmet, HelmetProvider } from 'react-helmet-async'; +import { MapGeoJSON } from '../../components/MapGeoJSON'; export function formatProvidersSorted(provider: string): string[] { const providers = provider.split(',').filter((n) => n); @@ -503,7 +503,12 @@ export default function Feed(): React.ReactElement { )} {boundingBox !== undefined && ( - + {/* */} + {latestDataset?.hosted_url !== undefined && ( + + )} )}