Skip to content

Commit

Permalink
Feature #19 Data agreement synchronisation between aries cloudagent a…
Browse files Browse the repository at this point in the history
…nd api backend
  • Loading branch information
albinpa committed Jun 27, 2023
1 parent 7c4c06f commit 8c71457
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 4 deletions.
46 changes: 46 additions & 0 deletions mydata_did/v1_0/kafka_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from confluent_kafka import Producer
import os
import json
from logging import Logger
from enum import Enum
# from mydata_did.v1_0.manager import ADAManager

class DataAgreementOperations(Enum):
DACREATE = "DataAgreementCreate"
DAUPDATE = "DataAgreementUpdate"
DADELETE = "DataAgreementDelete"
DAPUBLISH = "DataAgreementPublish"
DAPERSONALDATAUPDATE = "DataAgreementPersonalDataUpdate"
DAPERSONALDATADELETE = "DataAgreementPersonalDataDelete"

async def publish_event_to_kafka_topic(key: str, message: str, topic: str, logger: Logger):
kafka_server_address = os.environ.get("KAFKA_SERVER_ADDRESS", 'localhost:9092')
# Fetch iGrant.io config
# igrantio_config =await ada_manager.fetch_igrantio_config_from_os_environ()
igrantio_org_id = os.environ.get("IGRANTIO_ORG_ID")
data = json.loads(message)
data['igrantio_org_id'] = igrantio_org_id

message_with_org_id = json.dumps(data)

kafka_producer_configuration = {
'bootstrap.servers': kafka_server_address,
}
kafka_producer = Producer(kafka_producer_configuration)

def kafka_event_delivery_callback_handler(err: str, msg: str):
if err is not None:
log_message = f"Message delivery failed: {err}"
else:
log_message = f'Message delivered to {msg.topic()} [{msg.partition()}] partition'
logger.debug(log_message)

# Publish event to Kafka topic
kafka_producer.produce(topic,key=key, value=message_with_org_id, callback=kafka_event_delivery_callback_handler)

# Wait for the message to be delivered
kafka_producer.flush()

async def publish_event_to_data_agreement_topic(key: str, message: str, logger: Logger):
topic = "data_agreement"
await publish_event_to_kafka_topic(key, message, topic, logger)
16 changes: 14 additions & 2 deletions mydata_did/v1_0/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
from ..patched_protocols.present_proof.v1_0.message_types import ATTACH_DECO_IDS, PRESENTATION_REQUEST
from ..patched_protocols.present_proof.v1_0.manager import PresentationManager
from mydata_did.v1_0.utils.jsonld import data_agreement
from .kafka_publisher import publish_event_to_data_agreement_topic, DataAgreementOperations


class ADAManagerError(BaseError):
Expand Down Expand Up @@ -1452,6 +1453,7 @@ async def delete_data_agreement_in_wallet(self, data_agreement_id: str):

# Save the data agreement record
await data_agreement_record.save(self.context)

except StorageError as err:
# Raise an error
raise ADAManagerError(
Expand Down Expand Up @@ -4055,7 +4057,15 @@ def serialize_data_agreement_record(self, *, data_agreement_records: typing.List

return data_agreement_record_list if is_list else data_agreement_record_list[0]

async def create_data_agreement_and_personal_data_records(self, *, data_agreement: dict, existing_schema_id: str = None, draft: bool = False, existing_version: int = None, existing_data_agreement_id: str = None, update_ssi_payload: bool = True, existing_data_agreement_record: DataAgreementV1Record = None) -> typing.Tuple[DataAgreementV1Record, dict]:
async def create_data_agreement_and_personal_data_records(self,
*,
data_agreement: dict,
existing_schema_id: str = None,
draft: bool = False,
existing_version: int = None,
existing_data_agreement_id: str = None,
update_ssi_payload: bool = True,
existing_data_agreement_record: DataAgreementV1Record = None) -> typing.Tuple[DataAgreementV1Record, dict]:
"""
Create data agreement and personal data records.
Expand Down Expand Up @@ -4526,7 +4536,7 @@ async def delete_da_personal_data_in_wallet(self, *, personal_data_id: str) -> N

if len(personal_data) != 0:
# Create new data agreement with incremented version
await self.create_data_agreement_and_personal_data_records(
(new_data_agreement_record, new_data_agreement_dict) = await self.create_data_agreement_and_personal_data_records(
data_agreement=data_agreement_dict.serialize(),
draft=data_agreement_record.is_draft,
existing_version=data_agreement_dict.data_agreement_template_version,
Expand All @@ -4538,6 +4548,8 @@ async def delete_da_personal_data_in_wallet(self, *, personal_data_id: str) -> N
data_agreement_record._publish_flag = False
await data_agreement_record.save(self.context)

return data_agreement_record.data_agreement_id, personal_data_record.attribute_name

except StorageError as e:
raise ADAManagerError(
f"Failed to delete data agreement; Reason: {e.roll_up}"
Expand Down
53 changes: 52 additions & 1 deletion mydata_did/v1_0/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
from .utils.util import str_to_bool, bool_to_str, comma_separated_str_to_list, get_slices
from .utils.regex import MYDATA_DID
from .utils.jsonld.data_agreement import sign_data_agreement, verify_data_agreement, verify_data_agreement_with_proof_chain
from .kafka_publisher import publish_event_to_data_agreement_topic, DataAgreementOperations


LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -1572,6 +1573,12 @@ async def create_and_store_data_agreement_in_wallet_v2(request: web.BaseRequest)

if not data_agreement_v2_record:
raise web.HTTPBadRequest(reason="Data agreement not created")

# Notify the iGrant.io backend about the creation of data agreement.

da_record_json_str = json.dumps(data_agreement_v2_record.serialize())
key = DataAgreementOperations.DACREATE.value
await publish_event_to_data_agreement_topic(key, da_record_json_str, LOGGER)

except ADAManagerError as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
Expand Down Expand Up @@ -1624,6 +1631,14 @@ async def publish_data_agreement_handler(request: web.BaseRequest):

if not data_agreement_v1_record:
raise web.HTTPBadRequest(reason="Data agreement not published")

# Notify the iGrant.io backend about the publish of data agreement.

daObject = {
"data_agreement_id": data_agreement_id
}
key = DataAgreementOperations.DAPUBLISH.value
await publish_event_to_data_agreement_topic(key, json.dumps(daObject), LOGGER)

except ADAManagerError as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
Expand Down Expand Up @@ -1773,6 +1788,13 @@ async def update_data_agreement_in_wallet_v2(request: web.BaseRequest):
draft=draft
)

# Notify the iGrant.io backend about the updation of data agreement.

da_record_json_str = json.dumps(data_agreement_v2_record.serialize())
key = DataAgreementOperations.DAUPDATE.value
await publish_event_to_data_agreement_topic(key, da_record_json_str, LOGGER)


except ADAManagerError as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err

Expand Down Expand Up @@ -1812,6 +1834,14 @@ async def delete_data_agreement_in_wallet(request: web.BaseRequest):
# Delete data agreement in the wallet
await mydata_did_manager.delete_data_agreement_in_wallet(data_agreement_id=data_agreement_id)

# Notify iGrant.io backend about data agreement deletion

daObject = {
"data_agreement_id": data_agreement_id
}
key = DataAgreementOperations.DADELETE.value
await publish_event_to_data_agreement_topic(key,json.dumps(daObject), LOGGER)

except ADAManagerError as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err

Expand Down Expand Up @@ -2023,6 +2053,12 @@ async def update_da_personal_data_in_wallet(request: web.BaseRequest):
updated_description=attribute_description
)

# Notify the iGrant.io backend about the updation of data agreement personal data

da_personal_data_json_str = json.dumps(personal_data_dict)
key = DataAgreementOperations.DAPERSONALDATAUPDATE.value
await publish_event_to_data_agreement_topic(key, da_personal_data_json_str, LOGGER)

except ADAManagerError as err:

raise web.HTTPBadRequest(reason=err.roll_up) from err
Expand Down Expand Up @@ -2065,9 +2101,18 @@ async def delete_da_personal_data_in_wallet(request: web.BaseRequest):

try:

await ada_mgr.delete_da_personal_data_in_wallet(
data_agreement_id, attribute_name = await ada_mgr.delete_da_personal_data_in_wallet(
personal_data_id=personal_data_id
)
# Notify the iGrant.io backend about the deletion of data agreement personal data

daObject = {
"data_agreement_id": data_agreement_id,
"attribute_name": attribute_name
}

key = DataAgreementOperations.DAPERSONALDATADELETE.value
await publish_event_to_data_agreement_topic(key, json.dumps(daObject), LOGGER)

except ADAManagerError as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
Expand Down Expand Up @@ -3554,10 +3599,12 @@ async def register(app: web.Application):
"/v1/data-agreements/didcomm/transactions/{da_crud_didcomm_tx_id}",
data_agreement_crud_didcomm_transaction_records_delete_by_id,
),
# TODO: Notify the backend that data agreement is created.
web.post(
"/v1/data-agreements",
create_and_store_data_agreement_in_wallet_v2,
),
# TODO: Notify the backend that data agreement is published.
web.post(
"/v1/data-agreements/{data_agreement_id}/publish",
publish_data_agreement_handler,
Expand All @@ -3567,10 +3614,12 @@ async def register(app: web.Application):
query_data_agreements_in_wallet,
allow_head=False
),
# TODO: Notify the backend that data agreement is updated.
web.put(
"/v1/data-agreements/{data_agreement_id}",
update_data_agreement_in_wallet_v2,
),
# TODO: Notify the backend that data agreement is deleted.
web.delete(
"/v1/data-agreements/{data_agreement_id}",
delete_data_agreement_in_wallet,
Expand All @@ -3585,10 +3634,12 @@ async def register(app: web.Application):
query_da_personal_data_in_wallet,
allow_head=False
),
# TODO: Check if the backend needs to be notified.
web.put(
"/v1/data-agreements/personal-data/{attribute_id}",
update_da_personal_data_in_wallet,
),
# TODO: Check if the backend needs to be notified.
web.delete(
"/v1/data-agreements/personal-data/{attribute_id}",
delete_da_personal_data_in_wallet,
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,5 @@ wrapt==1.12.1
yarl==1.5.1
py-multibase==1.0.3
validators==0.18.2
semver==2.13.0
semver==2.13.0
confluent-kafka =="*"

0 comments on commit 8c71457

Please sign in to comment.