Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opt: eliminate the inuse eventloop.cache for idle connections #660

Merged
merged 4 commits into from
Nov 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 20 additions & 25 deletions connection_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
pollAttachment netpoll.PollAttachment // connection attachment for poller
inboundBuffer elastic.RingBuffer // buffer for leftover data from the remote
buffer []byte // buffer for the latest bytes
cache []byte // temporary cache for the inbound data
isDatagram bool // UDP protocol
opened bool // connection opened event fired
isEOF bool // whether the connection has reached EOF
Expand Down Expand Up @@ -290,6 +291,7 @@
func (c *conn) resetBuffer() {
c.buffer = c.buffer[:0]
c.inboundBuffer.Reset()
c.inboundBuffer.Done()
}

func (c *conn) Read(p []byte) (n int, err error) {
Expand Down Expand Up @@ -325,22 +327,9 @@
return
}

head, tail := c.inboundBuffer.Peek(n)
defer c.inboundBuffer.Discard(n) //nolint:errcheck
c.loop.cache.Reset()
c.loop.cache.Write(head)
if len(head) == n {
return c.loop.cache.Bytes(), err
}
c.loop.cache.Write(tail)
if inBufferLen >= n {
return c.loop.cache.Bytes(), err
}

remaining := n - inBufferLen
c.loop.cache.Write(c.buffer[:remaining])
c.buffer = c.buffer[remaining:]
return c.loop.cache.Bytes(), err
buf = bsPool.Get(n)
_, err = c.Read(buf)
return

Check warning on line 332 in connection_unix.go

View check run for this annotation

Codecov / codecov/patch

connection_unix.go#L330-L332

Added lines #L330 - L332 were not covered by tests
}

func (c *conn) Peek(n int) (buf []byte, err error) {
Expand All @@ -359,25 +348,31 @@
if len(head) == n {
return head, err
}
c.loop.cache.Reset()
c.loop.cache.Write(head)
c.loop.cache.Write(tail)
buf = bsPool.Get(n)[:0]
buf = append(buf, head...)
buf = append(buf, tail...)
if inBufferLen >= n {
return c.loop.cache.Bytes(), err
return

Check warning on line 355 in connection_unix.go

View check run for this annotation

Codecov / codecov/patch

connection_unix.go#L355

Added line #L355 was not covered by tests
}

remaining := n - inBufferLen
c.loop.cache.Write(c.buffer[:remaining])
return c.loop.cache.Bytes(), err
buf = append(buf, c.buffer[:remaining]...)
c.cache = buf
return
}

func (c *conn) Discard(n int) (int, error) {
if len(c.cache) > 0 {
bsPool.Put(c.cache)
c.cache = nil
}

inBufferLen := c.inboundBuffer.Buffered()
tempBufferLen := len(c.buffer)
if inBufferLen+tempBufferLen < n || n <= 0 {
if totalLen := inBufferLen + len(c.buffer); n >= totalLen || n <= 0 {
c.resetBuffer()
return inBufferLen + tempBufferLen, nil
return totalLen, nil
}

if c.inboundBuffer.IsEmpty() {
c.buffer = c.buffer[n:]
return n, nil
Expand Down
45 changes: 21 additions & 24 deletions connection_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"github.com/panjf2000/gnet/v2/pkg/buffer/elastic"
errorx "github.com/panjf2000/gnet/v2/pkg/errors"
bbPool "github.com/panjf2000/gnet/v2/pkg/pool/bytebuffer"
bsPool "github.com/panjf2000/gnet/v2/pkg/pool/byteslice"
goPool "github.com/panjf2000/gnet/v2/pkg/pool/goroutine"
)

Expand Down Expand Up @@ -54,6 +55,7 @@
ctx any // user-defined context
loop *eventloop // owner event-loop
buffer *bbPool.ByteBuffer // reuse memory of inbound data as a temporary buffer
cache []byte // temporary cache for the inbound data
rawConn net.Conn // original connection
localAddr net.Addr // local server addr
remoteAddr net.Addr // remote addr
Expand Down Expand Up @@ -116,6 +118,7 @@
func (c *conn) resetBuffer() {
c.buffer.Reset()
c.inboundBuffer.Reset()
c.inboundBuffer.Done()
}

func (c *conn) Read(p []byte) (n int, err error) {
Expand Down Expand Up @@ -149,22 +152,10 @@
c.buffer.B = c.buffer.B[n:]
return
}
head, tail := c.inboundBuffer.Peek(n)
defer c.inboundBuffer.Discard(n) //nolint:errcheck
c.loop.cache.Reset()
c.loop.cache.Write(head)
if len(head) == n {
return c.loop.cache.Bytes(), err
}
c.loop.cache.Write(tail)
if inBufferLen >= n {
return c.loop.cache.Bytes(), err
}

remaining := n - inBufferLen
c.loop.cache.Write(c.buffer.B[:remaining])
c.buffer.B = c.buffer.B[remaining:]
return c.loop.cache.Bytes(), err
buf = bsPool.Get(n)
_, err = c.Read(buf)
return

Check warning on line 158 in connection_windows.go

View check run for this annotation

Codecov / codecov/patch

connection_windows.go#L156-L158

Added lines #L156 - L158 were not covered by tests
}

func (c *conn) Peek(n int) (buf []byte, err error) {
Expand All @@ -181,25 +172,31 @@
if len(head) == n {
return head, err
}
c.loop.cache.Reset()
c.loop.cache.Write(head)
c.loop.cache.Write(tail)
buf = bsPool.Get(n)[:0]
buf = append(buf, head...)
buf = append(buf, tail...)
if inBufferLen >= n {
return c.loop.cache.Bytes(), err
return

Check warning on line 179 in connection_windows.go

View check run for this annotation

Codecov / codecov/patch

connection_windows.go#L179

Added line #L179 was not covered by tests
}

remaining := n - inBufferLen
c.loop.cache.Write(c.buffer.B[:remaining])
return c.loop.cache.Bytes(), err
buf = append(buf, c.buffer.B[:remaining]...)
c.cache = buf
return
}

func (c *conn) Discard(n int) (int, error) {
if len(c.cache) > 0 {
bsPool.Put(c.cache)
c.cache = nil
}

inBufferLen := c.inboundBuffer.Buffered()
tempBufferLen := c.buffer.Len()
if inBufferLen+tempBufferLen < n || n <= 0 {
if totalLen := inBufferLen + c.buffer.Len(); n >= totalLen || n <= 0 {
c.resetBuffer()
return inBufferLen + tempBufferLen, nil
return totalLen, nil
}

if c.inboundBuffer.IsEmpty() {
c.buffer.B = c.buffer.B[n:]
return n, nil
Expand Down
2 changes: 0 additions & 2 deletions eventloop_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package gnet

import (
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -39,7 +38,6 @@ import (
type eventloop struct {
listeners map[int]*listener // listeners
idx int // loop index in the engine loops list
cache bytes.Buffer // temporary buffer for scattered bytes
engine *engine // engine in loop
poller *netpoll.Poller // epoll or kqueue
buffer []byte // read packet buffer whose capacity is set by user, default value is 64KB
Expand Down
2 changes: 0 additions & 2 deletions eventloop_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package gnet

import (
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -31,7 +30,6 @@ type eventloop struct {
ch chan any // channel for event-loop
idx int // index of event-loop in event-loops
eng *engine // engine in loop
cache bytes.Buffer // temporary buffer for scattered bytes
connCount int32 // number of active connections in event-loop
connections map[*conn]struct{} // TCP connection map: fd -> conn
eventHandler EventHandler // user eventHandler
Expand Down
18 changes: 8 additions & 10 deletions pkg/buffer/linkedlist/linked_list_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ func (b *node) len() int {

// Buffer is a linked list of node.
type Buffer struct {
bs [][]byte
head *node
tail *node
size int
Expand Down Expand Up @@ -123,19 +122,19 @@ func (llb *Buffer) Peek(maxBytes int) ([][]byte, error) {
} else if maxBytes > llb.Buffered() {
return nil, io.ErrShortBuffer
}
llb.bs = llb.bs[:0]
var bs [][]byte
var cum int
for iter := llb.head; iter != nil; iter = iter.next {
offset := iter.len()
if cum+offset > maxBytes {
offset = maxBytes - cum
}
llb.bs = append(llb.bs, iter.buf[:offset])
bs = append(bs, iter.buf[:offset])
if cum += offset; cum == maxBytes {
break
}
}
return llb.bs, nil
return bs, nil
}

// PeekWithBytes is like Peek but accepts [][]byte and puts them onto head.
Expand All @@ -145,17 +144,17 @@ func (llb *Buffer) PeekWithBytes(maxBytes int, bs ...[]byte) ([][]byte, error) {
} else if maxBytes > llb.Buffered() {
return nil, io.ErrShortBuffer
}
llb.bs = llb.bs[:0]
var bss [][]byte
var cum int
for _, b := range bs {
if n := len(b); n > 0 {
offset := n
if cum+offset > maxBytes {
offset = maxBytes - cum
}
llb.bs = append(llb.bs, b[:offset])
bss = append(bss, b[:offset])
if cum += offset; cum == maxBytes {
return llb.bs, nil
return bss, nil
}
}
}
Expand All @@ -164,12 +163,12 @@ func (llb *Buffer) PeekWithBytes(maxBytes int, bs ...[]byte) ([][]byte, error) {
if cum+offset > maxBytes {
offset = maxBytes - cum
}
llb.bs = append(llb.bs, iter.buf[:offset])
bss = append(bss, iter.buf[:offset])
if cum += offset; cum == maxBytes {
break
}
}
return llb.bs, nil
return bss, nil
}

// Discard removes some nodes based on n bytes.
Expand Down Expand Up @@ -266,7 +265,6 @@ func (llb *Buffer) Reset() {
llb.tail = nil
llb.size = 0
llb.bytes = 0
llb.bs = nil
}

// pop returns and removes the head of l. If l is empty, it returns nil.
Expand Down
Loading