-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
sweep: fix error in _invoke_callback(). In broadcast_service/core.py
, _invoice_callback() function uses the result method of the future object, which can cause thread blocking and thus fail to achieve asynchronous effects. We should sovle it, so that when multiple callback functions listen to a topic at the same time, they will be triggered simultaneously instead of serially.
#18
Comments
Here's the PR! #21. See Sweep's process at dashboard.⚡ Sweep Basic Tier: I'm using GPT-4. You have 4 GPT-4 tickets left for the month and 2 for the day. (tracking ID:
c7fdbe6b6a )For more GPT-4 tickets, visit our payment portal. For a one week free trial, try Sweep Pro (unlimited GPT-4 tickets). Actions (click)
Sandbox Execution ✓Here are the sandbox execution logs prior to making any changes: Sandbox logs for
|
broadcast_service.config( | |
callback=handle_publisher_callback, | |
).publish("topic") | |
if __name__ == '__main__': | |
main() | |
``` | |
It should be noted that the order of the three elements `args[0]`, `args[1]`, and `args[2]` in the above example is not uniquely determined, which depends on the execution time of the subscriber's callback function. However, in most cases, we cannot judge which subscriber callback function ends first, so `broadcast-service` development specifications recommend that when using this function, let the return value types of the subscriber callback functions be consistent as much as possible to reduce the cost of additional data judgment. | |
## Passing different parameters when publishing a topic multiple times | |
If you want to pass different parameters when publishing a topic multiple times. The following example show how to do. | |
```python |
broadcast-service/docs/publisher_dispatch.md
Lines 30 to 45 in 1db4a59
if __name__ == '__main__': | |
main() | |
``` | |
**output:** | |
```text | |
handle_subscriber_callback 1 | |
handle_subscriber_callback 2 | |
handle_publisher_callback | |
``` | |
## Publisher Multiple Executions | |
`broadcast-service` also supports publishing multiple topics at the same time. The following example shows how to publish multiple topics at the same time. |
broadcast-service/docs/publisher_dispatch.md
Lines 45 to 60 in 1db4a59
`broadcast-service` also supports publishing multiple topics at the same time. The following example shows how to publish multiple topics at the same time. | |
```python | |
from broadcast_service import broadcast_service | |
@broadcast_service.on_listen("topic") | |
def handle_subscriber_callback(): | |
print("handle_subscriber_callback") | |
def main(): | |
broadcast_service.config( | |
num_of_executions=5, | |
).publish("topic") | |
if __name__ == '__main__': | |
main() |
broadcast-service/docs/publisher_dispatch.md
Lines 120 to 135 in 1db4a59
```text | |
handle_subscriber_callback | |
handle_publisher_callback | |
handle_subscriber_callback | |
handle_publisher_callback | |
handle_subscriber_callback | |
handle_publisher_callback | |
``` | |
It can be seen that the topic was published three times, and the publisher callback function was executed three times. If you want the publisher callback function to be executed only once after all topics are published and all subscriber callback functions are executed, you can use the parameter `enable_final_return=False` to achieve this goal. | |
```python | |
from broadcast_service import broadcast_service | |
broadcast-service/broadcast_service/_core.py
Lines 320 to 344 in 1db4a59
) -> 'BroadcastService': | |
"""Provide more complex topic publish mode | |
Args: | |
num_of_executions: default is 1, indicating the number of times the same topic is published at once | |
callback: default is None. You can get callback and the parameters of subscriber | |
after all subscribers' callback functions have been completed. | |
enable_final_return: default is False, it means you can get callback after you publish | |
n times topic. In this case, finish_callback params is store in *args rather than **kwargs. | |
interval: publish interval. Unit seconds. | |
split_parameters: If you initiate multiple calls and want to pass different parameters to the subscriber | |
in each call, you can use this parameter for parameter passing. Additionally, when you use this | |
parameter, you do not need to pass any parameters in the broadcast() function. | |
Returns: | |
Returns current object, which is used to call broadcast with configuration. | |
""" | |
self.enable_config = True | |
self.publish_dispatch_config_manager.create_publisher_callback( | |
num_of_executions=num_of_executions, | |
callback=callback, | |
enable_final_return=enable_final_return, | |
interval=interval, | |
status=PUBLISHER_CALLBACK_STATUS['RUNNING'], | |
split_parameters=split_parameters | |
) |
broadcast-service/docs/publisher_dispatch.md
Lines 195 to 210 in 1db4a59
Note that if the publisher's callback function needs to receive parameters, you must use `*args` to receive parameters. Therefore, if multiple subscriber callback functions return information, the publisher's callback cannot be set. Therefore, `*args` is used as a parameter pool to receive data returned from the subscriber's callback function. `*args` is a tuple, which can store any type of data, as long as you can successfully obtain the parameter information of `args`. | |
The following example shows a complex scenario where multiple subscriber callback functions return information. | |
```python | |
from broadcast_service import broadcast_service | |
@broadcast_service.on_listen("topic") | |
def handle_subscriber_callback1(): | |
return "handle_subscriber_callback 1" | |
@broadcast_service.on_listen("topic") | |
def handle_subscriber_callback2(): | |
return [1, 2, 3, 4, 5] |
broadcast-service/broadcast_service/_core.py
Lines 113 to 143 in 1db4a59
def listen_all(self, callback: Callable): | |
""" | |
'__all__' is a special topic. It can receive any topic message. | |
""" | |
self._invoke_listen_topic('__all__', callback) | |
def broadcast(self, topics: str or List[str], *args, **kwargs): | |
""" | |
Launch broadcast on the specify topic. If all subscribe callback finish, it will call finish_callback. | |
""" | |
self.logger.debug(f"[broadcast-service] broadcast topic <{topics}>") | |
if type(topics) == str: | |
self._invoke_broadcast_topic(topics, *args, **kwargs) | |
elif type(topics) == list: | |
for topic in topics: | |
self._invoke_broadcast_topic(topic, *args, **kwargs) | |
else: | |
raise ValueError("Unknown broadcast-service error, please submit " | |
"issue to https://github.com/Undertone0809/broadcast-service/issues") | |
def broadcast_all(self, *args, **kwargs): | |
""" | |
All topics listened on will be called back. | |
Attention: Not all callback function will be called. If your publisher callback | |
and your subscriber callback takes different arguments, your callback function | |
will not be executed. | |
""" | |
for topic in self.pubsub_channels.keys(): | |
self._invoke_broadcast_topic(topic, *args, **kwargs) | |
Step 2: ⌨️ Coding
Modify broadcast_service/_core.py with contents:
• Locate the `_invoke_callback()` function in the `broadcast_service/_core.py` file.
• Currently, this function uses the `result()` method of the `Future` object, which can cause thread blocking. We need to change this to prevent blocking and allow for asynchronous execution.
• Instead of using `result()`, use the `add_done_callback()` method of the `Future` object. This method adds a function to be called at some point in the future when the `Future` is completed. The callback function will be called with the `Future` as its only argument.
• The callback function should handle the result of the `Future` and any exceptions that may have occurred during its execution. This can be done by using the `result()` method within the callback function, which will not block because it is called after the `Future` is completed.
• Here is an example of how to use `add_done_callback()`: ```python def handle_future(future): try: result = future.result() # Handle result except Exception as e: # Handle exceptionfuture.add_done_callback(handle_future)
```
• Replace the current usage ofresult()
in `_invoke_callback()` with a similar structure to the above example.--- +++ @@ -39,9 +39,15 @@ ) -> Any: if enable_async: future_result = thread_pool.submit(callback, *args, **kwargs) - if future_result.result() is not None: - logger.debug(f"[broadcast-service invoke_callback result] {future_result.result()}") - return future_result.result() + def handle_future(future): + try: + result = future.result() + if result is not None: + logger.debug(f"[broadcast-service invoke_callback result] {result}") + return result + except Exception as e: + logger.error(f"[broadcast-service invoke_callback error] {str(e)}") + future_result.add_done_callback(handle_future) else: return callback(*args, **kwargs)
- Running GitHub Actions for
broadcast_service/_core.py
✓ Edit
Check broadcast_service/_core.py with contents:Ran GitHub Actions for 8282112afe43a774129985809a79429751877737:
Step 3: 🔁 Code Review
I have finished reviewing the code for completeness. I did not find errors for sweep/fix-thread-blocking
.
🎉 Latest improvements to Sweep:
- We just released a dashboard to track Sweep's progress on your issue in real-time, showing every stage of the process – from search to planning and coding.
- Sweep uses OpenAI's latest Assistant API to plan code changes and modify code! This is 3x faster and significantly more reliable as it allows Sweep to edit code and validate the changes in tight iterations, the same way as a human would.
💡 To recreate the pull request edit the issue title or description. To tweak the pull request, leave a comment on the pull request.
Join Our Discord
In
broadcast_service/core.py
, _invoke_callback() function uses the result method of the future object, which can cause thread blocking and thus fail to achieve asynchronous effects. We should sovle it, so that when multiple callback functions listen to a topic at the same time, they will be triggered simultaneously instead of serially.Checklist
broadcast_service/_core.py
✓ 8282112 Editbroadcast_service/_core.py
✓ EditThe text was updated successfully, but these errors were encountered: