Skip to content

Commit

Permalink
example fix
Browse files Browse the repository at this point in the history
Signed-off-by: Elena Kolevska <[email protected]>

fixes typing

Signed-off-by: Elena Kolevska <[email protected]>

more readable example

Signed-off-by: Elena Kolevska <[email protected]>

linter

Signed-off-by: Elena Kolevska <[email protected]>
  • Loading branch information
elena-kolevska committed Sep 23, 2024
1 parent c325a2a commit fbd12a7
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 51 deletions.
46 changes: 24 additions & 22 deletions dapr/clients/grpc/subscription.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import json

import grpc
from grpc import StreamStreamMultiCallable, RpcError, StatusCode # type: ignore

from dapr.clients.exceptions import StreamInactiveError
from dapr.clients.grpc._response import TopicEventResponse
from dapr.proto import api_v1, appcallback_v1
import queue
import threading
from typing import Optional


def success():
Expand All @@ -28,11 +29,11 @@ def __init__(self, stub, pubsub_name, topic, metadata=None, dead_letter_topic=No
self.topic = topic
self.metadata = metadata or {}
self.dead_letter_topic = dead_letter_topic or ''
self._stream = None
self._response_thread = None
self._send_queue = queue.Queue()
self._receive_queue = queue.Queue()
self._stream_active = False
self._stream: Optional[StreamStreamMultiCallable] = None # Type annotation for gRPC stream
self._response_thread: Optional[threading.Thread] = None # Type for thread
self._send_queue: queue.Queue = queue.Queue() # Type annotation for send queue
self._receive_queue: queue.Queue = queue.Queue() # Type annotation for receive queue
self._stream_active: bool = False
self._stream_lock = threading.Lock() # Protects _stream_active

def start(self):
Expand All @@ -55,9 +56,8 @@ def outgoing_request_iterator():
# Start sending back acknowledgement messages from the send queue
while self._is_stream_active():
try:
response = self._send_queue.get()
# The above blocks until a message is available or the stream is closed
# so that's why we need to check again if the stream is still active
response = self._send_queue.get(timeout=1)
# Check again if the stream is still active
if not self._is_stream_active():
break
yield response
Expand All @@ -76,17 +76,19 @@ def outgoing_request_iterator():

def _handle_incoming_messages(self):
try:
# The first message dapr sends on the stream is for signalling only, so discard it
next(self._stream)

# Read messages from the stream and put them in the receive queue
for message in self._stream:
if self._is_stream_active():
self._receive_queue.put(message.event_message)
else:
break
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.CANCELLED:
# Check if the stream is not None
if self._stream is not None:
# The first message dapr sends on the stream is for signalling only, so discard it
next(self._stream)

# Read messages from the stream and put them in the receive queue
for message in self._stream:
if self._is_stream_active():
self._receive_queue.put(message.event_message)
else:
break

Check warning on line 89 in dapr/clients/grpc/subscription.py

View check run for this annotation

Codecov / codecov/patch

dapr/clients/grpc/subscription.py#L89

Added line #L89 was not covered by tests
except RpcError as e:
if e.code() != StatusCode.CANCELLED:
print(f'gRPC error in stream: {e.details()}, Status Code: {e.code()}')
except Exception as e:
raise Exception(f'Error while handling responses: {e}')

Check warning on line 94 in dapr/clients/grpc/subscription.py

View check run for this annotation

Codecov / codecov/patch

dapr/clients/grpc/subscription.py#L92-L94

Added lines #L92 - L94 were not covered by tests
Expand Down Expand Up @@ -157,8 +159,8 @@ def close(self):
if self._stream:
try:
self._stream.cancel()
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.CANCELLED:
except RpcError as e:
if e.code() != StatusCode.CANCELLED:
raise Exception(f'Error while closing stream: {e}')
except Exception as e:
raise Exception(f'Error while closing stream: {e}')

Check warning on line 166 in dapr/clients/grpc/subscription.py

View check run for this annotation

Codecov / codecov/patch

dapr/clients/grpc/subscription.py#L162-L166

Added lines #L162 - L166 were not covered by tests
Expand Down
16 changes: 6 additions & 10 deletions examples/pubsub-streaming/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,11 @@ Run the following command in a terminal/command prompt:
<!-- STEP
name: Run subscriber
expected_stdout_lines:
- '== APP == Subscriber received: id=1, message="hello world", content_type="application/json"'
- 'RETRY status returned from app while processing pub/sub event'
- '== APP == Subscriber received: id=2, message="hello world", content_type="application/json"'
- '== APP == Subscriber received: id=3, message="hello world", content_type="application/json"'
- '== APP == Wildcard-Subscriber received: id=4, message="hello world", content_type="application/json"'
- '== APP == Wildcard-Subscriber received: id=5, message="hello world", content_type="application/json"'
- '== APP == Wildcard-Subscriber received: id=6, message="hello world", content_type="application/json"'
- '== APP == Dead-Letter Subscriber received: id=7, message="hello world", content_type="application/json"'
- '== APP == Dead-Letter Subscriber. Received via deadletter topic: TOPIC_D_DEAD'
- '== APP == Dead-Letter Subscriber. Originally intended topic: TOPIC_D'
- "== APP == Processing message: {'id': 1, 'message': 'hello world'} from TOPIC_A"
- "== APP == Processing message: {'id': 2, 'message': 'hello world'} from TOPIC_A"
- "== APP == Processing message: {'id': 3, 'message': 'hello world'} from TOPIC_A"
- "== APP == Processing message: {'id': 4, 'message': 'hello world'} from TOPIC_A"
- "== APP == Processing message: {'id': 5, 'message': 'hello world'} from TOPIC_A"
output_match_mode: substring
background: true
match_order: none
Expand All @@ -61,6 +56,7 @@ expected_stdout_lines:
- "== APP == {'id': 4, 'message': 'hello world'}"
- "== APP == {'id': 5, 'message': 'hello world'}"
background: true
output_match_mode: substring
sleep: 15
-->

Expand Down
33 changes: 14 additions & 19 deletions examples/pubsub-streaming/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,20 @@ def main():

try:
for i in range(5):
try:
message = subscription.next_message()
if message is None:
print('No message received within timeout period.')
continue

# Process the message
response_status = process_message(message)

if response_status == 'success':
subscription.respond_success(message)
elif response_status == 'retry':
subscription.respond_retry(message)
elif response_status == 'drop':
subscription.respond_drop(message)

except Exception as e:
print(f'Error getting message: {e}')
break
message = subscription.next_message()
if message is None:
print('No message received within timeout period.')
continue

# Process the message
response_status = process_message(message)

if response_status == 'success':
subscription.respond_success(message)
elif response_status == 'retry':
subscription.respond_retry(message)
elif response_status == 'drop':
subscription.respond_drop(message)

finally:
subscription.close()
Expand Down

0 comments on commit fbd12a7

Please sign in to comment.