Skip to content

Commit

Permalink
feat: endpoint to create dataset and bulk upload entities (#276)
Browse files Browse the repository at this point in the history
* feat: added endpoint to create dataset and bulk upload entities

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* refactor: rename OdkEntity class --> OdkDataset for clarity

BREAKING CHANGE to OdkCentral usage

* build: update test odk central --> v2024.1.0

* build: ignore server_name for test proxy (else warnings)

* refactor: remove entity registration form to archived xlsforms

* docs: add note about latest state of entity creation apis

* fix: large refactor to OdkCentralAsync, raise errors when required

* test: update entity tests to support latest bulk entity upload api

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: spwoodcock <[email protected]>
  • Loading branch information
3 people authored Jul 29, 2024
1 parent 7da0954 commit 091019c
Show file tree
Hide file tree
Showing 13 changed files with 285 additions and 136 deletions.
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ services:
restart: "unless-stopped"

central:
image: "ghcr.io/hotosm/fmtm/odkcentral:v2023.5.0"
image: "ghcr.io/hotosm/fmtm/odkcentral:v2024.1.0"
depends_on:
central-db:
condition: service_healthy
Expand Down
3 changes: 3 additions & 0 deletions docs/about/odk-entities.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ use the Entities to achieve the above goals.

### Workflow Using Entities

> **UPDATE 29/07/2023** ODK Central now supports creating the Entity List
> / Dataset via API instead of registration form.
The basic workflow would probably resemble:

- Create an Entity registration form.
Expand Down
5 changes: 5 additions & 0 deletions docs/api/OdkCentral.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,8 @@ heading_level: 3
options:
show_source: false
heading_level: 3

::: osm_fieldwork.OdkCentral.OdkDataset
options:
show_source: false
heading_level: 3
2 changes: 1 addition & 1 deletion docs/api/OdkCentralAsync.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ options:
show_source: false
heading_level: 3

::: osm_fieldwork.OdkCentralAsync.OdkEntity
::: osm_fieldwork.OdkCentralAsync.OdkDataset
options:
show_source: false
heading_level: 3
Expand Down
2 changes: 1 addition & 1 deletion nginx/odk.conf
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ server {

server {
listen 443 ssl;
server_name odk.fmtm.localhost;
server_name _;

ssl_certificate /etc/nginx/central-fullchain.crt;
ssl_certificate_key /etc/nginx/central.key;
Expand Down
6 changes: 4 additions & 2 deletions osm_fieldwork/OdkCentral.py
Original file line number Diff line number Diff line change
Expand Up @@ -1358,7 +1358,7 @@ def createQRCode(
return self.qrcode


class OdkEntity(OdkCentral):
class OdkDataset(OdkCentral):
"""Class to manipulate a Entity on an ODK Central server."""

def __init__(
Expand All @@ -1373,7 +1373,7 @@ def __init__(
passwd (str): The user's account password on ODK Central.
Returns:
(OdkEntity): An instance of this object.
(OdkDataset): An instance of this object.
"""
super().__init__(url, user, passwd)
self.name = None
Expand Down Expand Up @@ -1404,6 +1404,8 @@ def listDatasets(
result = self.session.get(url, verify=self.verify)
return result.json()

# TODO add createDataset

def listEntities(
self,
projectId: int,
Expand Down
206 changes: 163 additions & 43 deletions osm_fieldwork/OdkCentralAsync.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,24 @@
import logging
import os
from asyncio import gather
from typing import Optional
from typing import Any, Optional, TypedDict
from uuid import uuid4

import aiohttp

log = logging.getLogger(__name__)


class EntityIn(TypedDict):
"""Required format for Entity uploads to ODK Central."""

label: str
data: dict[str, Any]


class OdkCentral(object):
"""Helper methods for ODK Central API."""

def __init__(
self,
url: Optional[str] = None,
Expand Down Expand Up @@ -141,8 +150,9 @@ async def listForms(self, projectId: int, metadata: bool = False):
self.forms = await response.json()
return self.forms
except aiohttp.ClientError as e:
log.error(f"Error fetching forms: {e}")
return []
msg = f"Error fetching forms: {e}"
log.error(msg)
raise aiohttp.ClientError(msg) from e

async def listSubmissions(self, projectId: int, xform: str, filters: dict = None):
"""Fetch a list of submission instances for a given form.
Expand All @@ -167,8 +177,9 @@ async def listSubmissions(self, projectId: int, xform: str, filters: dict = None
async with self.session.get(url, params=filters, ssl=self.verify) as response:
return await response.json()
except aiohttp.ClientError as e:
log.error(f"Error fetching submissions: {e}")
return {}
msg = f"Error fetching submissions: {e}"
log.error(msg)
raise aiohttp.ClientError(msg) from e

async def getAllProjectSubmissions(self, projectId: int, xforms: list = None, filters: dict = None):
"""Fetch a list of submissions in a project on an ODK Central server.
Expand Down Expand Up @@ -196,7 +207,7 @@ async def getAllProjectSubmissions(self, projectId: int, xforms: list = None, fi
return submission_data


class OdkEntity(OdkCentral):
class OdkDataset(OdkCentral):
"""Class to manipulate a Entity on an ODK Central server."""

def __init__(
Expand All @@ -211,7 +222,7 @@ def __init__(
passwd (str): The user's account password on ODK Central.
Returns:
(OdkEntity): An instance of this object.
(OdkDataset): An instance of this object.
"""
super().__init__(url, user, passwd)

Expand Down Expand Up @@ -242,8 +253,108 @@ async def listDatasets(
async with self.session.get(url, ssl=self.verify) as response:
return await response.json()
except aiohttp.ClientError as e:
log.error(f"Error fetching datasets: {e}")
return []
msg = f"Error fetching datasets: {e}"
log.error(msg)
raise aiohttp.ClientError(msg) from e

async def createDataset(
self,
projectId: int,
datasetName: Optional[str] = "features",
properties: Optional[list[str]] = [],
):
"""Creates a dataset for a given project.
Args:
projectId (int): The ID of the project to create the dataset for.
datasetName (str): The name of the dataset to be created.
properties (list[str]): List of property names to create.
Alternatively call createDatasetProperty for each property manually.
Returns:
dict: The JSON response containing information about the created dataset.
Raises:
aiohttp.ClientError: If an error occurs during the dataset creation process.
"""
# Create the dataset
url = f"{self.base}projects/{projectId}/datasets"
payload = {"name": datasetName}
try:
log.info(f"Creating dataset ({datasetName}) for project ({projectId})")
async with self.session.post(
url,
ssl=self.verify,
json=payload,
) as response:
if response.status not in (200, 201):
error_message = await response.text()
log.error(f"Failed to create Dataset: {error_message}")
log.info(f"Successfully created Dataset {datasetName}")
dataset = await response.json()
except aiohttp.ClientError as e:
msg = f"Failed to create Dataset: {e}"
log.error(msg)
raise aiohttp.ClientError(msg) from e

if not properties:
return dataset

# Add the properties, if specified
# FIXME this is a bit of a hack until ODK Central has better support
# FIXME for adding dataset properties in bulk
try:
log.debug(f"Creating properties for dataset ({datasetName}): {properties}")
properties_tasks = [self.createDatasetProperty(projectId, field, datasetName) for field in properties]
success = await gather(*properties_tasks, return_exceptions=True) # type: ignore
if not success:
log.warning(f"No properties were uploaded for ODK project ({projectId}) dataset name ({datasetName})")
log.info(f"Successfully created properties for dataset ({datasetName})")
except aiohttp.ClientError as e:
msg = f"Failed to create properties: {e}"
log.error(msg)
raise aiohttp.ClientError(msg) from e

# Manually append to prevent another API call
dataset["properties"] = properties
return dataset

async def createDatasetProperty(
self,
projectId: int,
field_name: str,
datasetName: Optional[str] = "features",
):
"""Create a property for a dataset.
Args:
projectId (int): The ID of the project.
datasetName (str): The name of the dataset.
field (dict): A dictionary containing the field information.
Returns:
dict: The response data from the API.
Raises:
aiohttp.ClientError: If an error occurs during the API request.
"""
url = f"{self.base}projects/{projectId}/datasets/{datasetName}/properties"
payload = {
"name": field_name,
}

try:
log.debug(f"Creating property of dataset {datasetName}")
async with self.session.post(url, ssl=self.verify, json=payload) as response:
response_data = await response.json()
if response.status not in (200, 201):
log.debug(f"Failed to create properties: {response.status}, message='{response_data}'")
log.debug(f"Successfully created properties for dataset {datasetName}")
return response_data
except aiohttp.ClientError as e:
msg = f"Failed to create properties: {e}"
log.error(msg)
raise aiohttp.ClientError(msg) from e

async def listEntities(
self,
Expand Down Expand Up @@ -288,8 +399,9 @@ async def listEntities(
async with self.session.get(url, ssl=self.verify) as response:
return await response.json()
except aiohttp.ClientError as e:
log.error(f"Error fetching entities: {e}")
return []
msg = f"Error fetching entities: {e}"
log.error(msg)
raise aiohttp.ClientError(msg) from e

async def getEntity(
self,
Expand Down Expand Up @@ -341,6 +453,7 @@ async def getEntity(
async with self.session.get(url, ssl=self.verify) as response:
return await response.json()
except aiohttp.ClientError as e:
# NOTE skip raising exception on HTTP 404 (not found)
log.error(f"Error fetching entity: {e}")
return {}

Expand Down Expand Up @@ -424,44 +537,47 @@ async def createEntity(
) as response:
return await response.json()
except aiohttp.ClientError as e:
log.error(f"Failed to create Entity: {e}")
return {}
msg = f"Failed to create Entity: {e}"
log.error(msg)
raise aiohttp.ClientError(msg) from e

async def createEntities(
self,
projectId: int,
datasetName: str,
labelDataDict: dict,
) -> list:
entities: list[EntityIn],
) -> dict:
"""Bulk create Entities in a project dataset (entity list).
NOTE this endpoint will be redundant after Central 2024.01 release.
Args:
projectId (int): The ID of the project on ODK Central.
datasetName (int): The name of a dataset, specific to a project.
labelDataDict (dict): Mapping of Entity label:data (str:dict) to insert.
entities (list[EntityIn]): A list of Entities to insert.
Format: {"label": "John Doe", "data": {"firstName": "John", "age": "22"}}
Returns:
list: A list of Entity detail JSONs.
The 'uuid' field includes the unique entity identifier.
# list: A list of Entity detail JSONs.
# The 'uuid' field includes the unique entity identifier.
dict: {'success': true}
When creating bulk entities ODK Central return this for now.
"""
log.info(f"Bulk uploading Entities for project ({projectId}) dataset ({datasetName})")
entity_data = []

entity_tasks = [self.createEntity(projectId, datasetName, label, data) for label, data in labelDataDict.items()]
log.info(f"Creating ({len(entity_tasks)}) entities for project " f"({projectId}) dataset ({datasetName})")
entities = await gather(*entity_tasks, return_exceptions=True)
# Validation
if not isinstance(entities, list):
raise ValueError("Entities must be a list")

for entity in entities:
if not entity or isinstance(entity, Exception):
continue
entity_data.append(entity)

if not entities:
log.warning(f"No entities were uploaded for ODK project ({projectId}) dataset name ({datasetName})")
log.info(f"Bulk uploading ({len(entities)}) Entities for project ({projectId}) dataset ({datasetName})")
url = f"{self.base}projects/{projectId}/datasets/{datasetName}/entities"
payload = {"entities": entities, "source": {"name": "features.csv"}}

return entity_data
try:
async with self.session.post(url, ssl=self.verify, json=payload) as response:
response.raise_for_status()
log.info(f"Successfully created entities for project ({projectId}) in dataset ({datasetName})")
return await response.json()
except aiohttp.ClientError as e:
msg = f"Failed to create Entities: {e}"
log.error(msg)
raise aiohttp.ClientError(msg) from e

async def updateEntity(
self,
Expand Down Expand Up @@ -554,8 +670,9 @@ async def updateEntity(
) as response:
return await response.json()
except aiohttp.ClientError as e:
log.error(f"Failed to update Entity: {e}")
return {}
msg = f"Failed to update Entity: {e}"
log.error(msg)
raise aiohttp.ClientError(msg) from e

async def deleteEntity(
self,
Expand Down Expand Up @@ -585,8 +702,9 @@ async def deleteEntity(
log.debug(f"Server returned deletion unsuccessful: {response_msg}")
return success
except aiohttp.ClientError as e:
log.error(f"Failed to delete Entity: {e}")
return False
msg = f"Failed to delete Entity: {e}"
log.error(msg)
raise aiohttp.ClientError(msg) from e

async def getEntityCount(
self,
Expand All @@ -608,11 +726,12 @@ async def getEntityCount(
async with self.session.get(url, ssl=self.verify) as response:
count = (await response.json()).get("@odata.count", None)
except aiohttp.ClientError as e:
log.error(f"Failed to get Entity count for project ({projectId}): {e}")
return 0
msg = f"Failed to get Entity count for project ({projectId}): {e}"
log.error(msg)
raise aiohttp.ClientError(msg) from e

if count is None:
log.debug(f"Failed to get Entity count for project ({projectId}) " f"dataset ({datasetName})")
log.debug(f"Project ({projectId}) has no Entities in dataset ({datasetName})")
return 0

return count
Expand Down Expand Up @@ -715,5 +834,6 @@ async def getEntityData(
return response_json.get("value", [])
return response_json
except aiohttp.ClientError as e:
log.error(f"Failed to get Entity data: {e}")
return {}
msg = f"Failed to get Entity data for project ({projectId}): {e}"
log.error(msg)
raise aiohttp.ClientError(msg) from e
Loading

0 comments on commit 091019c

Please sign in to comment.