Skip to content

Commit

Permalink
updates fake server to wait for confirmation message before sending n…
Browse files Browse the repository at this point in the history
…ew message

Signed-off-by: Elena Kolevska <[email protected]>
  • Loading branch information
elena-kolevska committed Oct 21, 2024
1 parent da09a54 commit 8c9ce85
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 20 deletions.
28 changes: 20 additions & 8 deletions tests/clients/fake_dapr_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,14 +179,15 @@ def PublishEvent(self, request, context):
return empty_pb2.Empty()

def SubscribeTopicEventsAlpha1(self, request_iterator, context):
yield api_v1.SubscribeTopicEventsResponseAlpha1(
initial_response=api_v1.SubscribeTopicEventsResponseInitialAlpha1()
)
for request in request_iterator:
if request.HasField('initial_request'):
yield api_v1.SubscribeTopicEventsResponseAlpha1(
initial_response=api_v1.SubscribeTopicEventsResponseInitialAlpha1()
)
break

extensions = struct_pb2.Struct()
extensions['field1'] = 'value1'
extensions['field2'] = 42
extensions['field3'] = True
extensions.update({'field1': 'value1', 'field2': 42, 'field3': True})

msg1 = appcallback_v1.TopicEventRequest(
id='111',
Expand All @@ -201,6 +202,10 @@ def SubscribeTopicEventsAlpha1(self, request_iterator, context):
)
yield api_v1.SubscribeTopicEventsResponseAlpha1(event_message=msg1)

for request in request_iterator:
if request.HasField('event_processed'):
break

msg2 = appcallback_v1.TopicEventRequest(
id='222',
topic='TOPIC_A',
Expand All @@ -214,9 +219,16 @@ def SubscribeTopicEventsAlpha1(self, request_iterator, context):
)
yield api_v1.SubscribeTopicEventsResponseAlpha1(event_message=msg2)

for request in request_iterator:
if request.HasField('event_processed'):
break

# On the third message simulate a disconnection
status = status_pb2.Status(code=code_pb2.UNAVAILABLE, message='Simulated disconnection')
context.abort_with_status(rpc_status.to_status(status))
context.abort_with_status(
rpc_status.to_status(
status_pb2.Status(code=code_pb2.UNAVAILABLE, message='Simulated disconnection')
)
)

def SaveState(self, request, context):
self.check_for_exception(context)
Expand Down
24 changes: 12 additions & 12 deletions tests/clients/test_dapr_grpc_client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,18 +305,18 @@ async def test_subscribe_topic(self):

# # The client already reconnected and will start reading the messages again
# # Since we're working with a fake server, the messages will be the same
# message4 = await subscription.next_message()
# await subscription.respond_success(message4)
# self.assertEqual('111', message4.id())
# self.assertEqual('app1', message4.source())
# self.assertEqual('com.example.type2', message4.type())
# self.assertEqual('1.0', message4.spec_version())
# self.assertEqual('text/plain', message4.data_content_type())
# self.assertEqual('TOPIC_A', message4.topic())
# self.assertEqual('pubsub', message4.pubsub_name())
# self.assertEqual(b'hello2', message4.raw_data())
# self.assertEqual('text/plain', message4.data_content_type())
# self.assertEqual('hello2', message4.data())
message4 = await subscription.next_message()
await subscription.respond_success(message4)
self.assertEqual('111', message4.id())
self.assertEqual('app1', message4.source())
self.assertEqual('com.example.type2', message4.type())
self.assertEqual('1.0', message4.spec_version())
self.assertEqual('text/plain', message4.data_content_type())
self.assertEqual('TOPIC_A', message4.topic())
self.assertEqual('pubsub', message4.pubsub_name())
self.assertEqual(b'hello2', message4.raw_data())
self.assertEqual('text/plain', message4.data_content_type())
self.assertEqual('hello2', message4.data())

await subscription.close()

Expand Down
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ commands =
./validate.sh error_handling
./validate.sh pubsub-simple
./validate.sh pubsub-streaming
./validate.sh pubsub-streaming-async
./validate.sh state_store
./validate.sh state_store_query
./validate.sh secret_store
Expand Down

0 comments on commit 8c9ce85

Please sign in to comment.