diff --git a/dapr/clients/grpc/subscription.py b/dapr/clients/grpc/subscription.py index 1cdf1ef5..5ca30119 100644 --- a/dapr/clients/grpc/subscription.py +++ b/dapr/clients/grpc/subscription.py @@ -1,12 +1,13 @@ import json -import grpc +from grpc import StreamStreamMultiCallable, RpcError, StatusCode # type: ignore from dapr.clients.exceptions import StreamInactiveError from dapr.clients.grpc._response import TopicEventResponse from dapr.proto import api_v1, appcallback_v1 import queue import threading +from typing import Optional def success(): @@ -28,11 +29,11 @@ def __init__(self, stub, pubsub_name, topic, metadata=None, dead_letter_topic=No self.topic = topic self.metadata = metadata or {} self.dead_letter_topic = dead_letter_topic or '' - self._stream = None - self._response_thread = None - self._send_queue = queue.Queue() - self._receive_queue = queue.Queue() - self._stream_active = False + self._stream: Optional[StreamStreamMultiCallable] = None # Type annotation for gRPC stream + self._response_thread: Optional[threading.Thread] = None # Type for thread + self._send_queue: queue.Queue = queue.Queue() # Type annotation for send queue + self._receive_queue: queue.Queue = queue.Queue() # Type annotation for receive queue + self._stream_active: bool = False self._stream_lock = threading.Lock() # Protects _stream_active def start(self): @@ -55,9 +56,8 @@ def outgoing_request_iterator(): # Start sending back acknowledgement messages from the send queue while self._is_stream_active(): try: - response = self._send_queue.get() - # The above blocks until a message is available or the stream is closed - # so that's why we need to check again if the stream is still active + response = self._send_queue.get(timeout=1) + # Check again if the stream is still active if not self._is_stream_active(): break yield response @@ -76,17 +76,19 @@ def outgoing_request_iterator(): def _handle_incoming_messages(self): try: - # The first message dapr sends on the stream is for signalling only, so discard it - next(self._stream) - - # Read messages from the stream and put them in the receive queue - for message in self._stream: - if self._is_stream_active(): - self._receive_queue.put(message.event_message) - else: - break - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.CANCELLED: + # Check if the stream is not None + if self._stream is not None: + # The first message dapr sends on the stream is for signalling only, so discard it + next(self._stream) + + # Read messages from the stream and put them in the receive queue + for message in self._stream: + if self._is_stream_active(): + self._receive_queue.put(message.event_message) + else: + break + except RpcError as e: + if e.code() != StatusCode.CANCELLED: print(f'gRPC error in stream: {e.details()}, Status Code: {e.code()}') except Exception as e: raise Exception(f'Error while handling responses: {e}') @@ -157,8 +159,8 @@ def close(self): if self._stream: try: self._stream.cancel() - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.CANCELLED: + except RpcError as e: + if e.code() != StatusCode.CANCELLED: raise Exception(f'Error while closing stream: {e}') except Exception as e: raise Exception(f'Error while closing stream: {e}') diff --git a/examples/pubsub-streaming/README.md b/examples/pubsub-streaming/README.md index 5d80cf0b..f0fe0d93 100644 --- a/examples/pubsub-streaming/README.md +++ b/examples/pubsub-streaming/README.md @@ -27,16 +27,11 @@ Run the following command in a terminal/command prompt: diff --git a/examples/pubsub-streaming/subscriber.py b/examples/pubsub-streaming/subscriber.py index f6f9078a..701f5775 100644 --- a/examples/pubsub-streaming/subscriber.py +++ b/examples/pubsub-streaming/subscriber.py @@ -17,25 +17,20 @@ def main(): try: for i in range(5): - try: - message = subscription.next_message() - if message is None: - print('No message received within timeout period.') - continue - - # Process the message - response_status = process_message(message) - - if response_status == 'success': - subscription.respond_success(message) - elif response_status == 'retry': - subscription.respond_retry(message) - elif response_status == 'drop': - subscription.respond_drop(message) - - except Exception as e: - print(f'Error getting message: {e}') - break + message = subscription.next_message() + if message is None: + print('No message received within timeout period.') + continue + + # Process the message + response_status = process_message(message) + + if response_status == 'success': + subscription.respond_success(message) + elif response_status == 'retry': + subscription.respond_retry(message) + elif response_status == 'drop': + subscription.respond_drop(message) finally: subscription.close()