Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: redesign controller-replica communication #1246

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

Kampadais
Copy link

@Kampadais Kampadais commented Oct 3, 2024

Which issue(s) this PR fixes: longhorn/longhorn#6600

What this PR does / why we need it:

This PR is implementing a new communication structure between the controller and the replicas. The main focus of this restructure is the remove of the loop function and replace it with a more scalable approach.

Based on the current implementation, frontent issuing a new operation goes into a request channel and wait for the loop function to serve it. The same approach is followed with the replies from the replicas. The main fault of the loop function is that there is only on thread running and it both serves requests and replies. This is because Messages map can be R/W by only one thread. So even if there are a bunch of incoming requests or replies the systems is capped on serving these operations one at a time. This solution is not scalable.

The new approach scales the operation process removing the loop function and replacing it with immediate call of handleRequest and handleResponse. In order to make this possible there is also a need to prevent concurrent R/W in the Messages map. I replaced it with a Messages array and a Seq channel that ensures that every index of the array will be accessed by only one thread. What the new structs do :

  • Messages array : It stores the data equivalant with the Messages map but has a fixed length equal to the number of the inflight I/O we want to cap it. It's index is the id/Seq of the message.

  • Seq channel : It stores all the available Seqs that a new operation can acquire. Go channels ensure that only one thread will get the next available Seq. When we init the Client the Seq channel gets populated with numbers from 0 to Size(Messages). When an operation is completed the client stores the , now available , index back to the Seq channel

With this approach when a reply come from the replica the client is available to identify the message using the id as index in the Messages array ensuring the integrity of the Messages array.

Basic data-path flow :

  1. Frontend issues a new operation
  2. Client accepts it with one of the WriteAt/UnmapAt/ReadAt
  3. func operation is called , formats the message and calls handleRequest()
  4. handleRequest give the message a Seq from the Seq channel and forwards it to the send channel
  5. This part is not changed and follows the flow of the current version
  6. When the read function reads a reply on the wire it immediately calls handleResponse()
  7. handleResponse identify the operation replied using the Seq as an index on the Message array and after it manipulates it , it notifies the Complete channel of the message to complete the request
  8. After the request is completed the operation function reinserts the Seq used in the Seq channel in order to be used by the next operation

This approach uses multiple concurrent threads that each serves a different request and is scalable up to the frontend's and network's capabilities

Benchmarks

As mentioned before this approach is scalable up to the frontend's capabilities. Since tgt is a far from optimal solution using it as a frontend shows minimal to zero performance boost. In order to see the potential of this approach i used the Ublk frontend i have implemented in here .

Specs :
CPU : Intel(R) Xeon(R) CPU E5-2620 v2 @ 2.10GHz
RAM: 128GB
Disk: Samsung 860 Evo SSD 250GB
Network : 10Gbps

One replica , one controller , 2 machines in a local cluster
On the ublk+optimizations :
      Ublk frontend queues : 6
      Controller-Replica connections : 6

The fio command used for testing IOPS:
sudo fio --name=read_iops --filename=/dev/ublkb0 --size=5G --numjobs=12 --time_based --runtime=30s --ramp_time=2s --ioengine=libaio --direct=1 --verify=0 --bs=4K --iodepth=64 --rw=randwrite --group_reporting=1
fio command used for testing Bandwidth:

sudo fio --name=bandwidth --filename=/dev/ublkb0 --size=5G --numjobs=12--time_based --runtime=30s --ramp_time=2s --ioengine=libaio --direct=1 --verify=0 --bs=1M --iodepth=64 --rw=read --group_reporting=1

Additional documentation or context

image

*Replica R/W disabled means that the replica code is altered in order to answer dummy replies and not do the actual R/W in the Linux sparse files.

Note that with this approach we manage to scale the system to the Network's speed disabling the Replica R/W ( which is another bottleneck which need investigation given that the system up to this part can perform in these numbers)

Special notes for your reviewer:

I know i have removed some of the client's functionality regarding Journal but i find those part of the bottleneck . I am open in conversation in order to put any necessary part of it back

Summary by CodeRabbit

  • New Features

    • Introduced a new channel for handling incoming messages in the server, allowing for concurrent processing.
    • Enhanced message handling by decoupling reading from processing, improving server responsiveness.
  • Improvements

    • Streamlined message handling logic in both client and server structures.
    • Updated client to manage sequence numbers more effectively.
  • Bug Fixes

    • Simplified error handling in message processing to improve reliability.

@Kampadais Kampadais changed the title Improved controller-replica communication feat: redesign controller-replica communication Oct 9, 2024
@liyimeng
Copy link

any cpu usage number available?

@Kampadais
Copy link
Author

any cpu usage number available?

Sorry , i forgot to include this. When actual R/W is disabled i get around 25-30% using tgt-blockdev and 75-80% with ublk as frontend.

@liyimeng
Copy link

any cpu usage number available?

Sorry , i forgot to include this. When actual R/W is disabled i get around 25-30% using tgt-blockdev and 75-80% with ublk as frontend.

Thanks @Kampadais! So it is higher than tgt, but lower than v2 engine?

@Kampadais
Copy link
Author

Sorry for the late reply. Yes , it is lower but given the complexity and requirements of V2 engine and longhorn/longhorn#6600 there is also the need to improve V1 performance.

Copy link

This PR is stale because it has been open for 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.

@github-actions github-actions bot added the stale label Dec 28, 2024
Copy link

coderabbitai bot commented Dec 28, 2024

Walkthrough

The pull request introduces significant architectural changes to the data connection client and server components. In the client, message handling is refactored from a map-based to a fixed-size array approach, with a new sequence number channel managing message tracking. The server implementation shifts towards a more concurrent design, introducing a dedicated requests channel and launching multiple goroutines to process incoming messages in parallel, improving the system's responsiveness and message handling efficiency.

Changes

File Changes
pkg/dataconn/client.go - Replaced messages map with fixed-size array
- Added SeqChan for sequence number management
- Removed loop method
- Simplified handleRequest and handleResponse methods
pkg/dataconn/server.go - Added requests channel to Server struct
- Introduced threadCount constant (256)
- Modified NewServer to initialize requests channel
- Launched concurrent goroutines for message processing
- Simplified readFromWire method

Sequence Diagram

sequenceDiagram
    participant Client
    participant Server
    participant RequestHandler
    
    Client->>Server: Send Message
    Server->>Server: Receive Message
    Server->>requests: Enqueue Message
    RequestHandler->>requests: Dequeue Message
    RequestHandler->>RequestHandler: Process Message
Loading

Poem

🐰 Hoppity hop, through channels we leap,
Concurrent connections, no more to keep!
Messages flow with grace and might,
Goroutines dancing, oh what a sight!
Code refactored, performance so bright! 🚀


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (5)
pkg/dataconn/server.go (2)

12-14: Consider making threadCount configurable
Hardcoding threadCount = 256 might be a practical default but may not be optimal in all deployments. In high-CPU environments, 256 goroutines could cause oversubscription, while smaller systems might need fewer threads. Consider allowing this value to be set via configuration or command-line arguments.


25-28: Validate channel buffer size
You have requests: make(chan *Message, 1024), which might consume a significant amount of memory if messages are large and the channel backs up. Consider monitoring the usage to ensure this buffer size is appropriate.

pkg/dataconn/client.go (3)

11-12: queueLength set to 4196
While this allows a large number of in-flight operations, it’s neither a trivially recognizable power of two (e.g., 4096) nor an explicitly documented constant. Consider providing comments or rationale for this specific number and whether it should be user-configurable.


35-35: Initialization of messages array and SeqChan
This design effectively reuses sequence numbers and ensures no two in-flight messages share the same index. Watch for potential high concurrency usage: ensuring you never exceed queueLength in-flight messages is essential to avoid blocking on <-c.SeqChan.

Also applies to: 43-44, 47-49


184-184: Read goroutine concurrency
Reading responses in multiple goroutines is beneficial for high-throughput scenarios. Ensure each wire handles potential reconnection or error states gracefully, especially if the server is under load or if partial reads occur.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between eedf985 and ca399f3.

📒 Files selected for processing (2)
  • pkg/dataconn/client.go (7 hunks)
  • pkg/dataconn/server.go (2 hunks)
🔇 Additional comments (7)
pkg/dataconn/server.go (2)

18-18: Channel-based concurrency pattern
Adding requests chan *Message improves concurrency by offloading message processing to dedicated goroutines. Ensure all code paths that enqueue requests also handle potential backpressure if the channel is full.


71-71: Push to s.requests
This line properly sends an incoming message for concurrent handling. Just verify that, under high load, writing here does not block indefinitely if no goroutine is available to read.

pkg/dataconn/client.go (5)

5-6: Imports for types and logging
These imports look fine and remain consistent with common logging and types usage patterns within the Longhorn codebase.


22-23: Replace map with a fixed-size array
Storing messages in [queueLength]*Message eliminates the need for locks on a map, but keep in mind the memory usage. Each slot references a *Message even if it remains unused for a long duration.


101-101: Directly calling handleRequest
Removing the loop-based approach and calling c.handleRequest(&msg) directly is cleaner. Ensure upstream code keeps track of error handling if there’s any transport or logic failure in handleRequest.


115-116: Return sequence numbers to the pool
Placing msg.Seq back into c.SeqChan after completion is an efficient way to recycle sequence slots. Ensure all error paths in operation eventually perform this return to avoid leaks in busy scenarios.


141-142: Assignment of a unique sequence
req.Seq = <-c.SeqChan fetches one unused slot from the channel. This helps avoid collisions but relies on all completed messages returning their sequence, so watch out for concurrency or panic scenarios that skip returning the sequence.

Comment on lines +33 to +49
for i := 0; i < threadCount; i++ {
go func(s *Server) {
for {
msg := <-s.requests
switch msg.Type {
case TypeRead:
s.handleRead(msg)
case TypeWrite:
s.handleWrite(msg)
case TypeUnmap:
s.handleUnmap(msg)
case TypePing:
s.handlePing(msg)
}
}
}(server)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add graceful shutdown mechanism for goroutines
Spawning 256 goroutines without a select statement in the loop risks indefinite blocking if requests is never closed. Consider adding a select on a done channel so these goroutines can gracefully exit when the server shuts down.

 for i := 0; i < threadCount; i++ {
     go func(s *Server) {
         for {
-            msg := <-s.requests
+            select {
+            case msg, ok := <-s.requests:
+                if !ok {
+                    return
+                }
                 switch msg.Type {
                     ...
                 }
+            case <-s.done:
+                return
+            }
         }
     }(server)
 }

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +149 to +155
req := c.messages[resp.Seq]

req.Type = resp.Type
req.Size = resp.Size
req.Data = resp.Data
req.Complete <- struct{}{}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Possible memory retention
Since c.messages[resp.Seq] is never set to nil after req.Complete, stale requests may remain in memory. If large objects are attached, memory usage could grow unnecessarily. Consider clearing this reference:

 func (c *Client) handleResponse(resp *Message) {
     req := c.messages[resp.Seq]
     req.Type = resp.Type
     req.Size = resp.Size
     req.Data = resp.Data
     req.Complete <- struct{}{}
+    c.messages[resp.Seq] = nil
 }

Committable suggestion skipped: line range outside the PR's diff.

@derekbit derekbit removed the stale label Dec 28, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[EPIC] v1 data path performance enhancement
3 participants