Skip to content

Commit

Permalink
feat: added reverse geolocation
Browse files Browse the repository at this point in the history
  • Loading branch information
cka-y committed Jul 29, 2024
1 parent e8f4ce7 commit fa54c0d
Show file tree
Hide file tree
Showing 3 changed files with 254 additions and 1 deletion.
169 changes: 169 additions & 0 deletions functions-python/extract_location/src/location_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import requests
import logging
from typing import Tuple, Optional, List, NamedTuple
from collections import Counter
from sqlalchemy.orm import Session

from database_gen.sqlacodegen_models import Gtfsdataset, Location

NOMINATIM_ENDPOINT = (
"https://nominatim.openstreetmap.org/reverse?format=json&zoom=13&addressdetails=1"
)
DEFAULT_HEADERS = {
"User-Agent": "Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/126.0.0.0 Mobile Safari/537.36"
}
EN_LANG_HEADER = {
"User-Agent": "Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/126.0.0.0 Mobile Safari/537.36",
"Accept-Language": "en",
}

logging.basicConfig(level=logging.INFO)


class LocationInfo(NamedTuple):
country_codes: List[str]
countries: List[str]
most_common_subdivision_name: Optional[str]
most_common_municipality: Optional[str]


def reverse_coord(
lat: float, lon: float, include_lang_header: bool = False
) -> (Tuple)[Optional[str], Optional[str], Optional[str], Optional[str]]:
"""
Retrieves location details for a given latitude and longitude using the Nominatim API.
:param lat: Latitude of the location.
:param lon: Longitude of the location.
:param include_lang_header: If True, include English language header in the request.
:return: A tuple containing country code, country, subdivision code, subdivision name, and municipality.
"""
request_url = f"{NOMINATIM_ENDPOINT}&lat={lat}&lon={lon}"
headers = EN_LANG_HEADER if include_lang_header else DEFAULT_HEADERS

try:
response = requests.get(request_url, headers=headers)
response.raise_for_status()
response_json = response.json()
address = response_json.get("address", {})

country_code = (
address.get("country_code").upper() if address.get("country_code") else None
)
country = address.get("country")
municipality = address.get("city", address.get("town"))
subdivision_name = address.get("state", address.get("province"))

except requests.exceptions.RequestException as e:
logging.error(f"Error occurred while requesting location data: {e}")
country_code = country = subdivision_name = municipality = None

return country_code, country, subdivision_name, municipality


def reverse_coords(
points: List[Tuple[float, float]],
include_lang_header: bool = False,
decision_threshold: float = 0.5,
) -> LocationInfo:
"""
Retrieves location details for multiple latitude and longitude points.
:param points: A list of tuples, each containing latitude and longitude.
:param include_lang_header: If True, include English language header in the request.
:param decision_threshold: Threshold to decide on a common location attribute.
:return: A LocationInfo object containing lists of country codes and countries,
and the most common subdivision name and municipality if above the threshold.
"""
results = []
municipalities = []
subdivisions = []
countries = []
country_codes = []

for lat, lon in points:
(
country_code,
country,
subdivision_name,
municipality,
) = reverse_coord(lat, lon, include_lang_header)
if country_code is not None:
municipalities.append(municipality) if municipality else None
subdivisions.append(subdivision_name) if subdivision_name else None
countries.append(country)
country_codes.append(country_code)
results.append(
(
country_code,
country,
subdivision_name,
municipality,
)
)

# Determine the most common attributes
most_common_municipality = None
most_common_subdivision = None
municipality_count = subdivision_count = 0

if municipalities:
most_common_municipality, municipality_count = Counter(
municipalities
).most_common(1)[0]

if subdivisions:
most_common_subdivision, subdivision_count = Counter(subdivisions).most_common(
1
)[0]

# Apply decision threshold to determine final values
if municipality_count / len(points) < decision_threshold:
most_common_municipality = None

if subdivision_count / len(points) < decision_threshold:
most_common_subdivision = None

return LocationInfo(
country_codes=country_codes,
countries=countries,
most_common_subdivision_name=most_common_subdivision,
most_common_municipality=most_common_municipality,
)


def update_location(location_info: LocationInfo, dataset_id: str, session: Session):
"""
Update the location details of a dataset in the database.
:param location_info: A LocationInfo object containing location details.
:param dataset_id: The ID of the dataset.
:param session: The database session.
"""
dataset: Gtfsdataset | None = (
session.query(Gtfsdataset)
.filter(Gtfsdataset.stable_id == dataset_id)
.one_or_none()
)
if dataset is None:
raise Exception(f"Dataset {dataset_id} does not exist in the database.")
locations = []
for i in range(len(location_info.country_codes)):
location = Location(
country_code=location_info.country_codes[i],
country=location_info.countries[i],
subdivision_name=location_info.most_common_subdivision_name,
municipality=location_info.most_common_municipality,
)
locations.append(location)
if len(locations) == 0:
raise Exception("No locations found for the dataset.")
dataset.locations = locations

# Update the location of the related feed as well
dataset.feed.locations = locations

session.add(dataset)
session.commit()
4 changes: 3 additions & 1 deletion functions-python/extract_location/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
create_polygon_wkt_element,
update_dataset_bounding_box,
)
from .location_extractor import update_location, reverse_coords
from .stops_utils import get_gtfs_feed_bounds_and_points

logging.basicConfig(level=logging.INFO)
Expand Down Expand Up @@ -114,7 +115,7 @@ def extract_location_pubsub(cloud_event: CloudEvent):
try:
logging.info(f"[{dataset_id}] accessing url: {url}")
try:
bounds, _ = get_gtfs_feed_bounds_and_points(
bounds, location_geo_points = get_gtfs_feed_bounds_and_points(
url, dataset_id, location_extraction_n_points
)
except Exception as e:
Expand All @@ -128,6 +129,7 @@ def extract_location_pubsub(cloud_event: CloudEvent):
try:
session = start_db_session(os.getenv("FEEDS_DATABASE_URL"))
update_dataset_bounding_box(session, dataset_id, geometry_polygon)
update_location(reverse_coords(location_geo_points), dataset_id, session)
except Exception as e:
error = f"Error updating bounding box in database: {e}"
logging.error(f"[{dataset_id}] Error while processing: {e}")
Expand Down
82 changes: 82 additions & 0 deletions functions-python/extract_location/tests/test_extract_location.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,19 @@
from cloudevents.http import CloudEvent
from faker import Faker
from geoalchemy2 import WKTElement
from sqlalchemy.orm import Session

from database_gen.sqlacodegen_models import Gtfsdataset
from extract_location.src.bounding_box_extractor import (
create_polygon_wkt_element,
update_dataset_bounding_box,
)
from extract_location.src.location_extractor import (
reverse_coord,
reverse_coords,
LocationInfo,
update_location,
)
from extract_location.src.main import (
extract_location,
extract_location_pubsub,
Expand All @@ -28,6 +35,81 @@


class TestExtractBoundingBox(unittest.TestCase):
def test_reverse_coord(self):
lat, lon = 34.0522, -118.2437 # Coordinates for Los Angeles, California, USA
result = reverse_coord(lat, lon)

self.assertEqual(result, ("US", "United States", "California", "Los Angeles"))

@patch("requests.get")
def test_reverse_coords(self, mock_get):
# Mocking the response from the API for multiple calls
mock_response = MagicMock()
mock_response.json.side_effect = [
{
"address": {
"country_code": "us",
"country": "United States",
"state": "California",
"city": "Los Angeles",
}
},
{
"address": {
"country_code": "us",
"country": "United States",
"state": "California",
"city": "San Francisco",
}
},
{
"address": {
"country_code": "us",
"country": "United States",
"state": "California",
"city": "Los Angeles",
}
},
]
mock_response.status_code = 200
mock_get.return_value = mock_response

points = [(34.0522, -118.2437), (37.7749, -122.4194)]
location_info = reverse_coords(points)

self.assertEqual(location_info.country_codes, ["US", "US"])
self.assertEqual(location_info.countries, ["United States", "United States"])
self.assertEqual(location_info.most_common_subdivision_name, "California")
self.assertEqual(location_info.most_common_municipality, "Los Angeles")

def test_update_location(self):
# Setup mock database session and models
mock_session = MagicMock(spec=Session)
mock_dataset = MagicMock()
mock_dataset.stable_id = "123"
mock_dataset.feed = MagicMock()

mock_session.query.return_value.filter.return_value.one_or_none.return_value = (
mock_dataset
)

location_info = LocationInfo(
country_codes=["us"],
countries=["United States"],
most_common_subdivision_name="California",
most_common_municipality="Los Angeles",
)
dataset_id = "123"

update_location(location_info, dataset_id, mock_session)

# Verify if dataset and feed locations are set correctly
mock_session.add.assert_called_once_with(mock_dataset)
mock_session.commit.assert_called_once()

self.assertEqual(mock_dataset.locations[0].country, "United States")
self.assertEqual(mock_dataset.feed.locations[0].country, "United States")

def test_create_polygon_wkt_element(self):
bounds = np.array(
[faker.longitude(), faker.latitude(), faker.longitude(), faker.latitude()]
Expand Down

0 comments on commit fa54c0d

Please sign in to comment.