-
Notifications
You must be signed in to change notification settings - Fork 150
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: redesign controller-replica communication #1246
base: master
Are you sure you want to change the base?
Conversation
Signed-off-by: kampadais <[email protected]>
f3cb007
to
ca399f3
Compare
any cpu usage number available? |
Sorry , i forgot to include this. When actual R/W is disabled i get around 25-30% using tgt-blockdev and 75-80% with ublk as frontend. |
Thanks @Kampadais! So it is higher than tgt, but lower than v2 engine? |
Sorry for the late reply. Yes , it is lower but given the complexity and requirements of V2 engine and longhorn/longhorn#6600 there is also the need to improve V1 performance. |
This PR is stale because it has been open for 45 days with no activity. Remove stale label or comment or this will be closed in 10 days. |
WalkthroughThe pull request introduces significant architectural changes to the data connection client and server components. In the client, message handling is refactored from a map-based to a fixed-size array approach, with a new sequence number channel managing message tracking. The server implementation shifts towards a more concurrent design, introducing a dedicated requests channel and launching multiple goroutines to process incoming messages in parallel, improving the system's responsiveness and message handling efficiency. Changes
Sequence DiagramsequenceDiagram
participant Client
participant Server
participant RequestHandler
Client->>Server: Send Message
Server->>Server: Receive Message
Server->>requests: Enqueue Message
RequestHandler->>requests: Dequeue Message
RequestHandler->>RequestHandler: Process Message
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (5)
pkg/dataconn/server.go (2)
12-14
: Consider makingthreadCount
configurable
HardcodingthreadCount = 256
might be a practical default but may not be optimal in all deployments. In high-CPU environments, 256 goroutines could cause oversubscription, while smaller systems might need fewer threads. Consider allowing this value to be set via configuration or command-line arguments.
25-28
: Validate channel buffer size
You haverequests: make(chan *Message, 1024)
, which might consume a significant amount of memory if messages are large and the channel backs up. Consider monitoring the usage to ensure this buffer size is appropriate.pkg/dataconn/client.go (3)
11-12
:queueLength
set to 4196
While this allows a large number of in-flight operations, it’s neither a trivially recognizable power of two (e.g., 4096) nor an explicitly documented constant. Consider providing comments or rationale for this specific number and whether it should be user-configurable.
35-35
: Initialization ofmessages
array andSeqChan
This design effectively reuses sequence numbers and ensures no two in-flight messages share the same index. Watch for potential high concurrency usage: ensuring you never exceedqueueLength
in-flight messages is essential to avoid blocking on<-c.SeqChan
.Also applies to: 43-44, 47-49
184-184
: Read goroutine concurrency
Reading responses in multiple goroutines is beneficial for high-throughput scenarios. Ensure each wire handles potential reconnection or error states gracefully, especially if the server is under load or if partial reads occur.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
pkg/dataconn/client.go
(7 hunks)pkg/dataconn/server.go
(2 hunks)
🔇 Additional comments (7)
pkg/dataconn/server.go (2)
18-18
: Channel-based concurrency pattern
Adding requests chan *Message
improves concurrency by offloading message processing to dedicated goroutines. Ensure all code paths that enqueue requests also handle potential backpressure if the channel is full.
71-71
: Push to s.requests
This line properly sends an incoming message for concurrent handling. Just verify that, under high load, writing here does not block indefinitely if no goroutine is available to read.
pkg/dataconn/client.go (5)
5-6
: Imports for types and logging
These imports look fine and remain consistent with common logging and types usage patterns within the Longhorn codebase.
22-23
: Replace map with a fixed-size array
Storing messages in [queueLength]*Message
eliminates the need for locks on a map, but keep in mind the memory usage. Each slot references a *Message
even if it remains unused for a long duration.
101-101
: Directly calling handleRequest
Removing the loop-based approach and calling c.handleRequest(&msg)
directly is cleaner. Ensure upstream code keeps track of error handling if there’s any transport or logic failure in handleRequest
.
115-116
: Return sequence numbers to the pool
Placing msg.Seq
back into c.SeqChan
after completion is an efficient way to recycle sequence slots. Ensure all error paths in operation
eventually perform this return to avoid leaks in busy scenarios.
141-142
: Assignment of a unique sequence
req.Seq = <-c.SeqChan
fetches one unused slot from the channel. This helps avoid collisions but relies on all completed messages returning their sequence, so watch out for concurrency or panic scenarios that skip returning the sequence.
for i := 0; i < threadCount; i++ { | ||
go func(s *Server) { | ||
for { | ||
msg := <-s.requests | ||
switch msg.Type { | ||
case TypeRead: | ||
s.handleRead(msg) | ||
case TypeWrite: | ||
s.handleWrite(msg) | ||
case TypeUnmap: | ||
s.handleUnmap(msg) | ||
case TypePing: | ||
s.handlePing(msg) | ||
} | ||
} | ||
}(server) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add graceful shutdown mechanism for goroutines
Spawning 256 goroutines without a select
statement in the loop risks indefinite blocking if requests
is never closed. Consider adding a select
on a done
channel so these goroutines can gracefully exit when the server shuts down.
for i := 0; i < threadCount; i++ {
go func(s *Server) {
for {
- msg := <-s.requests
+ select {
+ case msg, ok := <-s.requests:
+ if !ok {
+ return
+ }
switch msg.Type {
...
}
+ case <-s.done:
+ return
+ }
}
}(server)
}
Committable suggestion skipped: line range outside the PR's diff.
req := c.messages[resp.Seq] | ||
|
||
req.Type = resp.Type | ||
req.Size = resp.Size | ||
req.Data = resp.Data | ||
req.Complete <- struct{}{} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Possible memory retention
Since c.messages[resp.Seq]
is never set to nil
after req.Complete
, stale requests may remain in memory. If large objects are attached, memory usage could grow unnecessarily. Consider clearing this reference:
func (c *Client) handleResponse(resp *Message) {
req := c.messages[resp.Seq]
req.Type = resp.Type
req.Size = resp.Size
req.Data = resp.Data
req.Complete <- struct{}{}
+ c.messages[resp.Seq] = nil
}
Committable suggestion skipped: line range outside the PR's diff.
Which issue(s) this PR fixes: longhorn/longhorn#6600
What this PR does / why we need it:
This PR is implementing a new communication structure between the controller and the replicas. The main focus of this restructure is the remove of the loop function and replace it with a more scalable approach.
Based on the current implementation, frontent issuing a new operation goes into a request channel and wait for the loop function to serve it. The same approach is followed with the replies from the replicas. The main fault of the loop function is that there is only on thread running and it both serves requests and replies. This is because Messages map can be R/W by only one thread. So even if there are a bunch of incoming requests or replies the systems is capped on serving these operations one at a time. This solution is not scalable.
The new approach scales the operation process removing the loop function and replacing it with immediate call of handleRequest and handleResponse. In order to make this possible there is also a need to prevent concurrent R/W in the Messages map. I replaced it with a Messages array and a Seq channel that ensures that every index of the array will be accessed by only one thread. What the new structs do :
Messages array : It stores the data equivalant with the Messages map but has a fixed length equal to the number of the inflight I/O we want to cap it. It's index is the id/Seq of the message.
Seq channel : It stores all the available Seqs that a new operation can acquire. Go channels ensure that only one thread will get the next available Seq. When we init the Client the Seq channel gets populated with numbers from 0 to Size(Messages). When an operation is completed the client stores the , now available , index back to the Seq channel
With this approach when a reply come from the replica the client is available to identify the message using the id as index in the Messages array ensuring the integrity of the Messages array.
Basic data-path flow :
This approach uses multiple concurrent threads that each serves a different request and is scalable up to the frontend's and network's capabilities
Benchmarks
As mentioned before this approach is scalable up to the frontend's capabilities. Since tgt is a far from optimal solution using it as a frontend shows minimal to zero performance boost. In order to see the potential of this approach i used the Ublk frontend i have implemented in here .
Specs :
CPU : Intel(R) Xeon(R) CPU E5-2620 v2 @ 2.10GHz
RAM: 128GB
Disk: Samsung 860 Evo SSD 250GB
Network : 10Gbps
One replica , one controller , 2 machines in a local cluster
On the ublk+optimizations :
Ublk frontend queues : 6
Controller-Replica connections : 6
The fio command used for testing IOPS:
sudo fio --name=read_iops --filename=/dev/ublkb0 --size=5G --numjobs=12 --time_based --runtime=30s --ramp_time=2s --ioengine=libaio --direct=1 --verify=0 --bs=4K --iodepth=64 --rw=randwrite --group_reporting=1
fio command used for testing Bandwidth:
sudo fio --name=bandwidth --filename=/dev/ublkb0 --size=5G --numjobs=12--time_based --runtime=30s --ramp_time=2s --ioengine=libaio --direct=1 --verify=0 --bs=1M --iodepth=64 --rw=read --group_reporting=1
Additional documentation or context
*Replica R/W disabled means that the replica code is altered in order to answer dummy replies and not do the actual R/W in the Linux sparse files.
Note that with this approach we manage to scale the system to the Network's speed disabling the Replica R/W ( which is another bottleneck which need investigation given that the system up to this part can perform in these numbers)
Special notes for your reviewer:
I know i have removed some of the client's functionality regarding Journal but i find those part of the bottleneck . I am open in conversation in order to put any necessary part of it back
Summary by CodeRabbit
New Features
Improvements
Bug Fixes