Skip to content

Commit

Permalink
Sync bidi streaming and tests
Browse files Browse the repository at this point in the history
Signed-off-by: Elena Kolevska <[email protected]>
  • Loading branch information
elena-kolevska committed Sep 23, 2024
1 parent 2add93f commit c325a2a
Show file tree
Hide file tree
Showing 9 changed files with 279 additions and 105 deletions.
23 changes: 17 additions & 6 deletions dapr/clients/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,22 @@
class DaprInternalError(Exception):
"""DaprInternalError encapsulates all Dapr exceptions"""

def __init__(self, message: Optional[str], error_code: Optional[str] = ERROR_CODE_UNKNOWN,
raw_response_bytes: Optional[bytes] = None, ):
def __init__(
self,
message: Optional[str],
error_code: Optional[str] = ERROR_CODE_UNKNOWN,
raw_response_bytes: Optional[bytes] = None,
):
self._message = message
self._error_code = error_code
self._raw_response_bytes = raw_response_bytes

def as_dict(self):
return {'message': self._message, 'errorCode': self._error_code,
'raw_response_bytes': self._raw_response_bytes, }
return {
'message': self._message,
'errorCode': self._error_code,
'raw_response_bytes': self._raw_response_bytes,
}


class StatusDetails:
Expand Down Expand Up @@ -112,8 +119,12 @@ def get_grpc_status(self):
return self._grpc_status

def json(self):
error_details = {'status_code': self.code().name, 'message': self.details(),
'error_code': self.error_code(), 'details': self._details.as_dict(), }
error_details = {
'status_code': self.code().name,
'message': self.details(),
'error_code': self.error_code(),
'details': self._details.as_dict(),
}
return json.dumps(error_details)


Expand Down
12 changes: 1 addition & 11 deletions dapr/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
from dapr.clients.health import DaprHealth
from dapr.clients.retry import RetryPolicy
from dapr.conf import settings
from dapr.proto import api_v1, api_service_v1, common_v1, appcallback_v1
from dapr.proto import api_v1, api_service_v1, common_v1
from dapr.proto.runtime.v1.dapr_pb2 import UnsubscribeConfigurationResponse
from dapr.version import __version__

Expand Down Expand Up @@ -482,16 +482,6 @@ def publish_event(

return DaprResponse(call.initial_metadata())

# def subscribe(self, pubsub_name, topic, metadata=None, dead_letter_topic=None):
# stream = self._stub.SubscribeTopicEventsAlpha1()
#
# # Send InitialRequest
# initial_request = api_v1.SubscribeTopicEventsInitialRequestAlpha1(pubsub_name=pubsub_name, topic=topic, metadata=metadata, dead_letter_topic=dead_letter_topic)
# request = api_v1.SubscribeTopicEventsRequestAlpha1(initial_request=initial_request)
# stream.write(request)
#
# return stream

def subscribe(self, pubsub_name, topic, metadata=None, dead_letter_topic=None):
subscription = Subscription(self._stub, pubsub_name, topic, metadata, dead_letter_topic)
subscription.start()
Expand Down
138 changes: 113 additions & 25 deletions dapr/clients/grpc/subscription.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import json

import grpc

from dapr.clients.exceptions import StreamInactiveError
Expand Down Expand Up @@ -34,32 +36,45 @@ def __init__(self, stub, pubsub_name, topic, metadata=None, dead_letter_topic=No
self._stream_lock = threading.Lock() # Protects _stream_active

def start(self):
def request_iterator():
def outgoing_request_iterator():
"""
Generator function to create the request iterator for the stream
"""
try:
# Send InitialRequest needed to establish the stream
initial_request = api_v1.SubscribeTopicEventsRequestAlpha1(
initial_request=api_v1.SubscribeTopicEventsRequestInitialAlpha1(
pubsub_name=self.pubsub_name, topic=self.topic, metadata=self.metadata or {},
dead_letter_topic=self.dead_letter_topic or ''))
pubsub_name=self.pubsub_name,
topic=self.topic,
metadata=self.metadata or {},
dead_letter_topic=self.dead_letter_topic or '',
)
)
yield initial_request

# Start sending back acknowledgement messages from the send queue
while self._is_stream_active():
try:
yield self._send_queue.get() # TODO Should I add a timeout?
response = self._send_queue.get()
# The above blocks until a message is available or the stream is closed
# so that's why we need to check again if the stream is still active
if not self._is_stream_active():
break
yield response
except queue.Empty:
continue
except Exception as e:
raise Exception(f"Error in request iterator: {e}")
raise Exception(f'Error in request iterator: {e}')

# Create the bidirectional stream
self._stream = self._stub.SubscribeTopicEventsAlpha1(request_iterator())
self._stream = self._stub.SubscribeTopicEventsAlpha1(outgoing_request_iterator())
self._set_stream_active()

# Start a thread to handle incoming messages
self._response_thread = threading.Thread(target=self._handle_responses, daemon=True)
self._response_thread = threading.Thread(target=self._handle_incoming_messages, daemon=True)
self._response_thread.start()

def _handle_responses(self):
def _handle_incoming_messages(self):
try:
# The first message dapr sends on the stream is for signalling only, so discard it
next(self._stream)
Expand All @@ -72,30 +87,31 @@ def _handle_responses(self):
break
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.CANCELLED:
print(f"gRPC error in stream: {e.details()}, Status Code: {e.code()}")
print(f'gRPC error in stream: {e.details()}, Status Code: {e.code()}')
except Exception as e:
raise Exception(f"Error while handling responses: {e}")
raise Exception(f'Error while handling responses: {e}')
finally:
self._set_stream_inactive()

def next_message(self, timeout=1):
"""
Gets the next message from the receive queue
@param timeout: Timeout in seconds
@return: The next message
"""
return self.read_message_from_queue(self._receive_queue, timeout=timeout)
def next_message(self, timeout=None):
msg = self.read_message_from_queue(self._receive_queue, timeout=timeout)

if msg is None:
return None

return SubscriptionMessage(msg)

def _respond(self, message, status):
try:
status = appcallback_v1.TopicEventResponse(status=status.value)
response = api_v1.SubscribeTopicEventsRequestProcessedAlpha1(id=message.id,
status=status)
response = api_v1.SubscribeTopicEventsRequestProcessedAlpha1(
id=message.id(), status=status
)
msg = api_v1.SubscribeTopicEventsRequestAlpha1(event_processed=response)

self.send_message_to_queue(self._send_queue, msg)
except Exception as e:
print(f"Exception in send_message: {e}")
print(f'Exception in send_message: {e}')

def respond_success(self, message):
self._respond(message, TopicEventResponse('success').status)
Expand All @@ -108,12 +124,12 @@ def respond_drop(self, message):

def send_message_to_queue(self, q, message):
if not self._is_stream_active():
raise StreamInactiveError("Stream is not active")
raise StreamInactiveError('Stream is not active')
q.put(message)

def read_message_from_queue(self, q, timeout):
def read_message_from_queue(self, q, timeout=None):
if not self._is_stream_active():
raise StreamInactiveError("Stream is not active")
raise StreamInactiveError('Stream is not active')
try:
return q.get(timeout=timeout)
except queue.Empty:
Expand Down Expand Up @@ -143,12 +159,84 @@ def close(self):
self._stream.cancel()
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.CANCELLED:
raise Exception(f"Error while closing stream: {e}")
raise Exception(f'Error while closing stream: {e}')
except Exception as e:
raise Exception(f"Error while closing stream: {e}")
raise Exception(f'Error while closing stream: {e}')

# Join the response-handling thread to ensure it has finished
if self._response_thread:
self._response_thread.join()
self._response_thread = None


class SubscriptionMessage:
def __init__(self, msg):
self._id = msg.id
self._source = msg.source
self._type = msg.type
self._spec_version = msg.spec_version
self._data_content_type = msg.data_content_type
self._topic = msg.topic
self._pubsub_name = msg.pubsub_name
self._raw_data = msg.data
self._extensions = msg.extensions
self._data = None

# Parse the content based on its media type
if self._raw_data and len(self._raw_data) > 0:
self._parse_data_content()

def id(self):
return self._id

def source(self):
return self._source

def type(self):
return self._type

def spec_version(self):
return self._spec_version

def data_content_type(self):
return self._data_content_type

def topic(self):
return self._topic

def pubsub_name(self):
return self._pubsub_name

def raw_data(self):
return self._raw_data

def extensions(self):
return self._extensions

def data(self):
return self._data

def _parse_data_content(self):
try:
if self._data_content_type == 'application/json':
try:
self._data = json.loads(self._raw_data)
except json.JSONDecodeError:
pass # If JSON parsing fails, keep `data` as None
elif self._data_content_type == 'text/plain':
# Assume UTF-8 encoding
try:
self._data = self._raw_data.decode('utf-8')
except UnicodeDecodeError:
pass
elif self._data_content_type.startswith(
'application/'
) and self._data_content_type.endswith('+json'):
# Handle custom JSON-based media types (e.g., application/vnd.api+json)
try:
self._data = json.loads(self._raw_data)
except json.JSONDecodeError:
pass # If JSON parsing fails, keep `data` as None
except Exception as e:
# Log or handle any unexpected exceptions
print(f'Error parsing media type: {e}')
76 changes: 76 additions & 0 deletions examples/pubsub-streaming/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Example - Publish and subscribe to messages

This example utilizes a publisher and a subscriber to show the bidirectional pubsub pattern.
It creates a publisher and calls the `publish_event` method in the `DaprClient`.
In the s`subscriber.py` file it creates a subscriber object that can call the `next_message` method to get new messages from the stream. After processing the new message, it returns a status to the stream.


> **Note:** Make sure to use the latest proto bindings
## Pre-requisites

- [Dapr CLI and initialized environment](https://docs.dapr.io/getting-started)
- [Install Python 3.8+](https://www.python.org/downloads/)

## Install Dapr python-SDK

<!-- Our CI/CD pipeline automatically installs the correct version, so we can skip this step in the automation -->

```bash
pip3 install dapr
```

## Run the example

Run the following command in a terminal/command prompt:

<!-- STEP
name: Run subscriber
expected_stdout_lines:
- '== APP == Subscriber received: id=1, message="hello world", content_type="application/json"'
- 'RETRY status returned from app while processing pub/sub event'
- '== APP == Subscriber received: id=2, message="hello world", content_type="application/json"'
- '== APP == Subscriber received: id=3, message="hello world", content_type="application/json"'
- '== APP == Wildcard-Subscriber received: id=4, message="hello world", content_type="application/json"'
- '== APP == Wildcard-Subscriber received: id=5, message="hello world", content_type="application/json"'
- '== APP == Wildcard-Subscriber received: id=6, message="hello world", content_type="application/json"'
- '== APP == Dead-Letter Subscriber received: id=7, message="hello world", content_type="application/json"'
- '== APP == Dead-Letter Subscriber. Received via deadletter topic: TOPIC_D_DEAD'
- '== APP == Dead-Letter Subscriber. Originally intended topic: TOPIC_D'
output_match_mode: substring
background: true
match_order: none
sleep: 3
-->

```bash
# 1. Start Subscriber
dapr run --app-id python-subscriber --app-protocol grpc python3 subscriber.py
```

<!-- END_STEP -->

In another terminal/command prompt run:

<!-- STEP
name: Run publisher
expected_stdout_lines:
- "== APP == {'id': 1, 'message': 'hello world'}"
- "== APP == {'id': 2, 'message': 'hello world'}"
- "== APP == {'id': 3, 'message': 'hello world'}"
- "== APP == {'id': 4, 'message': 'hello world'}"
- "== APP == {'id': 5, 'message': 'hello world'}"
background: true
sleep: 15
-->

```bash
# 2. Start Publisher
dapr run --app-id python-publisher --app-protocol grpc --dapr-grpc-port=3500 --enable-app-health-check python3 publisher.py
```

<!-- END_STEP -->

## Cleanup


Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

with DaprClient() as d:
id = 0
while id < 3:
while id < 5:
id += 1
req_data = {'id': time.time(), 'message': 'hello world'}
req_data = {'id': id, 'message': 'hello world'}

# Create a typed message with content type and body
resp = d.publish_event(
Expand All @@ -34,35 +34,3 @@
print(req_data, flush=True)

time.sleep(1)

# we can publish events to different topics but handle them with the same method
# by disabling topic validation in the subscriber
#
# id = 3
# while id < 6:
# id += 1
# req_data = {'id': id, 'message': 'hello world'}
# resp = d.publish_event(
# pubsub_name='pubsub',
# topic_name=f'topic/{id}',
# data=json.dumps(req_data),
# data_content_type='application/json',
# )
#
# # Print the request
# print(req_data, flush=True)
#
# time.sleep(0.5)
#
# # This topic will fail - initiate a retry which gets routed to the dead letter topic
# req_data['id'] = 7
# resp = d.publish_event(
# pubsub_name='pubsub',
# topic_name='TOPIC_D',
# data=json.dumps(req_data),
# data_content_type='application/json',
# publish_metadata={'custommeta': 'somevalue'},
# )
#
# # Print the request
# print(req_data, flush=True)
Loading

0 comments on commit c325a2a

Please sign in to comment.