diff --git a/tests/clients/fake_dapr_server.py b/tests/clients/fake_dapr_server.py index 8627ab46..9ae39aa1 100644 --- a/tests/clients/fake_dapr_server.py +++ b/tests/clients/fake_dapr_server.py @@ -179,14 +179,15 @@ def PublishEvent(self, request, context): return empty_pb2.Empty() def SubscribeTopicEventsAlpha1(self, request_iterator, context): - yield api_v1.SubscribeTopicEventsResponseAlpha1( - initial_response=api_v1.SubscribeTopicEventsResponseInitialAlpha1() - ) + for request in request_iterator: + if request.HasField('initial_request'): + yield api_v1.SubscribeTopicEventsResponseAlpha1( + initial_response=api_v1.SubscribeTopicEventsResponseInitialAlpha1() + ) + break extensions = struct_pb2.Struct() - extensions['field1'] = 'value1' - extensions['field2'] = 42 - extensions['field3'] = True + extensions.update({'field1': 'value1', 'field2': 42, 'field3': True}) msg1 = appcallback_v1.TopicEventRequest( id='111', @@ -201,6 +202,10 @@ def SubscribeTopicEventsAlpha1(self, request_iterator, context): ) yield api_v1.SubscribeTopicEventsResponseAlpha1(event_message=msg1) + for request in request_iterator: + if request.HasField('event_processed'): + break + msg2 = appcallback_v1.TopicEventRequest( id='222', topic='TOPIC_A', @@ -214,9 +219,16 @@ def SubscribeTopicEventsAlpha1(self, request_iterator, context): ) yield api_v1.SubscribeTopicEventsResponseAlpha1(event_message=msg2) + for request in request_iterator: + if request.HasField('event_processed'): + break + # On the third message simulate a disconnection - status = status_pb2.Status(code=code_pb2.UNAVAILABLE, message='Simulated disconnection') - context.abort_with_status(rpc_status.to_status(status)) + context.abort_with_status( + rpc_status.to_status( + status_pb2.Status(code=code_pb2.UNAVAILABLE, message='Simulated disconnection') + ) + ) def SaveState(self, request, context): self.check_for_exception(context) diff --git a/tests/clients/test_dapr_grpc_client_async.py b/tests/clients/test_dapr_grpc_client_async.py index 42bbd830..f15a2d1a 100644 --- a/tests/clients/test_dapr_grpc_client_async.py +++ b/tests/clients/test_dapr_grpc_client_async.py @@ -305,18 +305,18 @@ async def test_subscribe_topic(self): # # The client already reconnected and will start reading the messages again # # Since we're working with a fake server, the messages will be the same - # message4 = await subscription.next_message() - # await subscription.respond_success(message4) - # self.assertEqual('111', message4.id()) - # self.assertEqual('app1', message4.source()) - # self.assertEqual('com.example.type2', message4.type()) - # self.assertEqual('1.0', message4.spec_version()) - # self.assertEqual('text/plain', message4.data_content_type()) - # self.assertEqual('TOPIC_A', message4.topic()) - # self.assertEqual('pubsub', message4.pubsub_name()) - # self.assertEqual(b'hello2', message4.raw_data()) - # self.assertEqual('text/plain', message4.data_content_type()) - # self.assertEqual('hello2', message4.data()) + message4 = await subscription.next_message() + await subscription.respond_success(message4) + self.assertEqual('111', message4.id()) + self.assertEqual('app1', message4.source()) + self.assertEqual('com.example.type2', message4.type()) + self.assertEqual('1.0', message4.spec_version()) + self.assertEqual('text/plain', message4.data_content_type()) + self.assertEqual('TOPIC_A', message4.topic()) + self.assertEqual('pubsub', message4.pubsub_name()) + self.assertEqual(b'hello2', message4.raw_data()) + self.assertEqual('text/plain', message4.data_content_type()) + self.assertEqual('hello2', message4.data()) await subscription.close() diff --git a/tox.ini b/tox.ini index 6400e329..78f23086 100644 --- a/tox.ini +++ b/tox.ini @@ -51,6 +51,7 @@ commands = ./validate.sh error_handling ./validate.sh pubsub-simple ./validate.sh pubsub-streaming + ./validate.sh pubsub-streaming-async ./validate.sh state_store ./validate.sh state_store_query ./validate.sh secret_store