-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve threading when calling the accessy api. #585
Conversation
WalkthroughThe pull request introduces modifications to the Changes
Sequence DiagramsequenceDiagram
participant Client
participant AccessyAPI
participant ThreadPool
Client->>ThreadPool: Divide user IDs into chunks
loop Process Chunks
ThreadPool->>AccessyAPI: Fetch user details
AccessyAPI-->>ThreadPool: Return user membership
end
ThreadPool-->>Client: Consolidated user data
Poem
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
api/src/multiaccessy/accessy.py (2)
60-68
: LGTM! Backoff strategy improvements look good.The changes to the backoff strategy are well thought out:
- Lower initial backoff allows for quicker retry attempts
- Fixed multiplier provides predictable exponential backoff
- Random factor helps prevent thundering herd problem
Fix typo in warning message
There's a typo in "reqeusts" in the warning message.
- f"requesting accessy returned 429, too many reqeusts, try {i+1}/{max_tries}, retrying in {backoff}s, {path=}" + f"requesting accessy returned 429, too many requests, try {i+1}/{max_tries}, retrying in {backoff}s, {path=}"
545-561
: LGTM! Threading model improvements significantly reduce thread count.The new implementation effectively addresses the rate limiting issues by:
- Limiting maximum thread count to 4
- Chunking user IDs across threads
- Using round-robin distribution for balanced load
Consider these enhancements to the threading implementation
- Make the thread count configurable via environment variable
- Add error handling in the worker function
- Consider using ThreadPoolExecutor for better thread management
+MAX_THREADS = int(config.get("ACCESSY_MAX_THREADS", default="4")) + def _user_ids_to_accessy_members(self, user_ids: Iterable[UUID]) -> list[AccessyMember]: threads = [] user_ids = list(user_ids) - thread_count = min(4, len(user_ids)) + thread_count = min(MAX_THREADS, len(user_ids)) accessy_members = [] for i in range(thread_count): slice = user_ids[i::thread_count] member_slice = [AccessyMember(user_id=uid) for uid in slice] accessy_members.extend(member_slice) def worker(member_slice: list[AccessyMember]) -> None: - for member in member_slice: - fill_user_details(member) - fill_membership_id(member) + try: + for member in member_slice: + fill_user_details(member) + fill_membership_id(member) + except Exception as e: + logger.error(f"Error processing members: {e}") + raise t = threading.Thread(target=worker, args=(member_slice,)) threads.append(t) t.start()Alternative implementation using ThreadPoolExecutor:
from concurrent.futures import ThreadPoolExecutor, as_completed def _user_ids_to_accessy_members(self, user_ids: Iterable[UUID]) -> list[AccessyMember]: user_ids = list(user_ids) thread_count = min(MAX_THREADS, len(user_ids)) accessy_members = [] def process_chunk(chunk: list[UUID]) -> list[AccessyMember]: members = [AccessyMember(user_id=uid) for uid in chunk] for member in members: fill_user_details(member) fill_membership_id(member) return members # Split user_ids into chunks chunks = [user_ids[i::thread_count] for i in range(thread_count)] with ThreadPoolExecutor(max_workers=thread_count) as executor: futures = [executor.submit(process_chunk, chunk) for chunk in chunks] for future in as_completed(futures): try: chunk_members = future.result() accessy_members.extend(chunk_members) except Exception as e: logger.error(f"Error processing members: {e}") raise # Filter out API keys accessy_members = [m for m in accessy_members if m.phone is not APPLICATION_PHONE_NUMBER] return accessy_members
33e5c40
to
2729ca1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Not sure how the backoff affects it, but if it works, it works.
I fixed the build so you can now rebase on main to get the tests running. |
@@ -541,14 +542,23 @@ def fill_membership_id(user: AccessyMember) -> None: | |||
user.membership_id = data["id"] | |||
|
|||
threads = [] | |||
user_ids = list(user_ids) | |||
thread_count = min(4, len(user_ids)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't follow this completely. Doesn't this still create one thread per user_id as before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it creates at most 4 threads.
The previous code started one thread per member, which is way too much, and just resulted in a ton of rate limit errors.
2729ca1
to
533c111
Compare
The previous code started one thread per member, which is way too much, and just resulted
in a ton of rate limit errors.
Summary by CodeRabbit
New Features
Bug Fixes