Skip to content

Commit

Permalink
Split up topic names between tests
Browse files Browse the repository at this point in the history
Signed-off-by: Elena Kolevska <[email protected]>
  • Loading branch information
elena-kolevska committed Oct 21, 2024
1 parent df2fa9f commit fc0c5fa
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 19 deletions.
28 changes: 14 additions & 14 deletions examples/pubsub-streaming/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ Run the following command in a terminal/command prompt:
<!-- STEP
name: Run subscriber
expected_stdout_lines:
- "== APP == Processing message: {'id': 1, 'message': 'hello world'} from TOPIC_A..."
- "== APP == Processing message: {'id': 2, 'message': 'hello world'} from TOPIC_A..."
- "== APP == Processing message: {'id': 3, 'message': 'hello world'} from TOPIC_A..."
- "== APP == Processing message: {'id': 4, 'message': 'hello world'} from TOPIC_A..."
- "== APP == Processing message: {'id': 5, 'message': 'hello world'} from TOPIC_A..."
- "== APP == Processing message: {'id': 1, 'message': 'hello world'} from TOPIC_A1..."
- "== APP == Processing message: {'id': 2, 'message': 'hello world'} from TOPIC_A1..."
- "== APP == Processing message: {'id': 3, 'message': 'hello world'} from TOPIC_A1..."
- "== APP == Processing message: {'id': 4, 'message': 'hello world'} from TOPIC_A1..."
- "== APP == Processing message: {'id': 5, 'message': 'hello world'} from TOPIC_A1..."
- "== APP == Closing subscription..."
output_match_mode: substring
background: true
Expand All @@ -41,7 +41,7 @@ sleep: 3

```bash
# 1. Start Subscriber
dapr run --app-id python-subscriber --app-protocol grpc python3 subscriber.py
dapr run --app-id python-subscriber --app-protocol grpc -- python3 subscriber.py --topic=TOPIC_A1
```

<!-- END_STEP -->
Expand All @@ -63,7 +63,7 @@ sleep: 15

```bash
# 2. Start Publisher
dapr run --app-id python-publisher --app-protocol grpc --dapr-grpc-port=3500 --enable-app-health-check python3 publisher.py
dapr run --app-id python-publisher --app-protocol grpc --dapr-grpc-port=3500 --enable-app-health-check -- python3 publisher.py --topic=TOPIC_A1
```

<!-- END_STEP -->
Expand All @@ -75,11 +75,11 @@ Run the following command in a terminal/command prompt:
<!-- STEP
name: Run subscriber
expected_stdout_lines:
- "== APP == Processing message: {'id': 1, 'message': 'hello world'} from TOPIC_A..."
- "== APP == Processing message: {'id': 2, 'message': 'hello world'} from TOPIC_A..."
- "== APP == Processing message: {'id': 3, 'message': 'hello world'} from TOPIC_A..."
- "== APP == Processing message: {'id': 4, 'message': 'hello world'} from TOPIC_A..."
- "== APP == Processing message: {'id': 5, 'message': 'hello world'} from TOPIC_A..."
- "== APP == Processing message: {'id': 1, 'message': 'hello world'} from TOPIC_A2..."
- "== APP == Processing message: {'id': 2, 'message': 'hello world'} from TOPIC_A2..."
- "== APP == Processing message: {'id': 3, 'message': 'hello world'} from TOPIC_A2..."
- "== APP == Processing message: {'id': 4, 'message': 'hello world'} from TOPIC_A2..."
- "== APP == Processing message: {'id': 5, 'message': 'hello world'} from TOPIC_A2..."
- "== APP == Closing subscription..."
output_match_mode: substring
background: true
Expand All @@ -89,7 +89,7 @@ sleep: 3

```bash
# 1. Start Subscriber
dapr run --app-id python-subscriber --app-protocol grpc python3 subscriber-handler.py
dapr run --app-id python-subscriber --app-protocol grpc -- python3 subscriber-handler.py --topic=TOPIC_A2
```

<!-- END_STEP -->
Expand All @@ -111,7 +111,7 @@ sleep: 15

```bash
# 2. Start Publisher
dapr run --app-id python-publisher --app-protocol grpc --dapr-grpc-port=3500 --enable-app-health-check python3 publisher.py
dapr run --app-id python-publisher --app-protocol grpc --dapr-grpc-port=3500 --enable-app-health-check -- python3 publisher.py --topic=TOPIC_A2
```

<!-- END_STEP -->
Expand Down
10 changes: 8 additions & 2 deletions examples/pubsub-streaming/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ------------------------------------------------------------

import argparse
import json
import time

from dapr.clients import DaprClient

parser = argparse.ArgumentParser(description='Publish events to a Dapr pub/sub topic.')
parser.add_argument('--topic', type=str, required=True, help='The topic name to publish to.')
args = parser.parse_args()

topic_name = args.topic

with DaprClient() as d:
id = 0
while id < 5:
Expand All @@ -25,7 +31,7 @@
# Create a typed message with content type and body
resp = d.publish_event(
pubsub_name='pubsub',
topic_name='TOPIC_A',
topic_name=topic_name,
data=json.dumps(req_data),
data_content_type='application/json',
publish_metadata={'ttlInSeconds': '100', 'rawPayload': 'false'},
Expand Down
11 changes: 9 additions & 2 deletions examples/pubsub-streaming/subscriber-handler.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
import argparse
import time

from dapr.clients import DaprClient
from dapr.clients.grpc._response import TopicEventResponse

counter = 0

parser = argparse.ArgumentParser(description='Publish events to a Dapr pub/sub topic.')
parser.add_argument('--topic', type=str, required=True, help='The topic name to publish to.')
args = parser.parse_args()

topic_name = args.topic
dlq_topic_name = topic_name + '_DEAD'

def process_message(message):
# Process the message here
Expand All @@ -20,9 +27,9 @@ def main():
# and process them in the `process_message` function
close_fn = client.subscribe_with_handler(
pubsub_name='pubsub',
topic='TOPIC_A',
topic=topic_name,
handler_fn=process_message,
dead_letter_topic='TOPIC_A_DEAD',
dead_letter_topic=dlq_topic_name,
)

while counter < 5:
Expand Down
10 changes: 9 additions & 1 deletion examples/pubsub-streaming/subscriber.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
import argparse
import time

from dapr.clients import DaprClient
from dapr.clients.grpc.subscription import StreamInactiveError

counter = 0

parser = argparse.ArgumentParser(description='Publish events to a Dapr pub/sub topic.')
parser.add_argument('--topic', type=str, required=True, help='The topic name to publish to.')
args = parser.parse_args()

topic_name = args.topic
dlq_topic_name = topic_name + '_DEAD'


def process_message(message):
global counter
Expand All @@ -20,7 +28,7 @@ def main():

try:
subscription = client.subscribe(
pubsub_name='pubsub', topic='TOPIC_A', dead_letter_topic='TOPIC_A_DEAD'
pubsub_name='pubsub', topic=topic_name, dead_letter_topic=dlq_topic_name
)
except Exception as e:
print(f'Error occurred: {e}')
Expand Down

0 comments on commit fc0c5fa

Please sign in to comment.