Skip to content

Commit

Permalink
Merge pull request #53 from lxzan/testing
Browse files Browse the repository at this point in the history
v1.6.12
  • Loading branch information
lxzan authored Sep 23, 2023
2 parents 26f29d4 + 20a757a commit 2a1e621
Show file tree
Hide file tree
Showing 14 changed files with 325 additions and 93 deletions.
32 changes: 17 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# gws
# GWS

### Event-Driven Go WebSocket Server & Client

Expand Down Expand Up @@ -28,7 +28,7 @@

[12]: https://github.com/avelino/awesome-go#networking

- [gws](#gws)
- [GWS](#gws)
- [Feature](#feature)
- [Attention](#attention)
- [Install](#install)
Expand All @@ -41,6 +41,8 @@
- [Broadcast](#broadcast)
- [Autobahn Test](#autobahn-test)
- [Benchmark](#benchmark)
- [IOPS (Echo Server)](#iops-echo-server)
- [GoBench](#gobench)
- [Communication](#communication)
- [Acknowledgments](#acknowledgments)

Expand All @@ -51,7 +53,7 @@
- [x] Dial via Proxy
- [x] IO Multiplexing
- [x] Concurrent Write
- [x] Zero Allocs Read/Write (Compression Disabled)
- [x] Zero Allocs Read/Write
- [x] Passes WebSocket [autobahn-testsuite](https://lxzan.github.io/gws/reports/servers/)

### Attention
Expand Down Expand Up @@ -230,7 +232,7 @@ func main() {
```go
func Broadcast(conns []*gws.Conn, opcode gws.Opcode, payload []byte) {
var b = gws.NewBroadcaster(opcode, payload)
defer b.Release()
defer b.Close()
for _, item := range conns {
_ = b.Broadcast(item)
}
Expand All @@ -251,22 +253,22 @@ docker run -it --rm \

### Benchmark

- GOMAXPROCS = 4
- Connection = 1000
- Compress Disabled
#### IOPS (Echo Server)
GOMAXPROCS=4, Connection=1000, CompressEnabled=false

![performance](assets/performance-compress-disabled.png)

#### GoBench
```go
$ go test -benchmem -run=^$ -bench ^(BenchmarkConn_WriteMessage|BenchmarkConn_ReadMessage)$ github.com/lxzan/gws

goos: darwin
goarch: arm64
goos: linux
goarch: amd64
pkg: github.com/lxzan/gws
BenchmarkConn_WriteMessage/compress_disabled-8 8713082 138.0 ns/op 0 B/op 0 allocs/op
BenchmarkConn_WriteMessage/compress_enabled-8 144266 8066 ns/op 235 B/op 0 allocs/op
BenchmarkConn_ReadMessage/compress_disabled-8 11608689 102.8 ns/op 12 B/op 0 allocs/op
BenchmarkConn_ReadMessage/compress_enabled-8 435176 2498 ns/op 98 B/op 1 allocs/op
cpu: AMD Ryzen 5 PRO 4650G with Radeon Graphics
BenchmarkConn_WriteMessage/compress_disabled-8 7252513 165.4 ns/op 0 B/op 0 allocs/op
BenchmarkConn_WriteMessage/compress_enabled-8 97394 10391 ns/op 349 B/op 0 allocs/op
BenchmarkConn_ReadMessage/compress_disabled-8 7812108 152.3 ns/op 16 B/op 0 allocs/op
BenchmarkConn_ReadMessage/compress_enabled-8 368712 3248 ns/op 108 B/op 0 allocs/op
PASS
```

### Communication
Expand Down
9 changes: 8 additions & 1 deletion conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ import (
)

type Conn struct {
SessionStorage SessionStorage // 会话
// 已废弃, 请使用Session()方法替代
// Deprecated: please use Session() method instead
SessionStorage SessionStorage // 会话

err atomic.Value // 错误
isServer bool // 是否为服务器
subprotocol string // 子协议
Expand Down Expand Up @@ -206,3 +209,7 @@ func (c *Conn) SetNoDelay(noDelay bool) error {
// SubProtocol 获取协商的子协议
// Get negotiated sub-protocols
func (c *Conn) SubProtocol() string { return c.subprotocol }

// Session 获取会话存储
// get session storage
func (c *Conn) Session() SessionStorage { return c.SessionStorage }
61 changes: 61 additions & 0 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gws

import (
"bytes"
"errors"
"net"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -87,3 +88,63 @@ func TestConn_Close(t *testing.T) {
assert.Error(t, socket.SetReadDeadline(time.Time{}))
assert.Error(t, socket.SetWriteDeadline(time.Time{}))
}

func TestConn_SubProtocol(t *testing.T) {
conn := new(Conn)
conn.SubProtocol()
}

func TestConn_EmitClose(t *testing.T) {
t.Run("", func(t *testing.T) {
var serverHandler = new(webSocketMocker)
var clientHandler = new(webSocketMocker)
var serverOption = &ServerOption{CheckUtf8Enabled: true}
var clientOption = &ClientOption{}
var wg = &sync.WaitGroup{}
wg.Add(1)
clientHandler.onClose = func(socket *Conn, err error) {
if err.(*CloseError).Code == internal.CloseProtocolError.Uint16() {
wg.Done()
}
}
server, client := newPeer(serverHandler, serverOption, clientHandler, clientOption)
go client.ReadLoop()
server.emitClose(bytes.NewBuffer(internal.StatusCode(500).Bytes()))
wg.Wait()
})

t.Run("", func(t *testing.T) {
var serverHandler = new(webSocketMocker)
var clientHandler = new(webSocketMocker)
var serverOption = &ServerOption{CheckUtf8Enabled: true}
var clientOption = &ClientOption{}
var wg = &sync.WaitGroup{}
wg.Add(1)
clientHandler.onClose = func(socket *Conn, err error) {
if err.(*CloseError).Code == 4000 {
wg.Done()
}
}
server, client := newPeer(serverHandler, serverOption, clientHandler, clientOption)
go client.ReadLoop()
server.emitClose(bytes.NewBuffer(internal.StatusCode(4000).Bytes()))
wg.Wait()
})
}

func TestConn_EmitError(t *testing.T) {
var serverHandler = new(webSocketMocker)
var clientHandler = new(webSocketMocker)
var serverOption = &ServerOption{CheckUtf8Enabled: true}
var clientOption = &ClientOption{}
var wg = &sync.WaitGroup{}
wg.Add(1)
clientHandler.onClose = func(socket *Conn, err error) {
wg.Done()
}
server, client := newPeer(serverHandler, serverOption, clientHandler, clientOption)
go client.ReadLoop()
err := errors.New(string(internal.AlphabetNumeric.Generate(500)))
server.emitError(err)
wg.Wait()
}
19 changes: 12 additions & 7 deletions examples/chatroom/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import (
"time"
)

const PingInterval = 15 * time.Second // 客户端心跳间隔
const (
PingInterval = 10 * time.Second // 客户端心跳间隔
HeartbeatWaitTimeout = 5 * time.Second // 心跳等待超时时间
)

//go:embed index.html
var html []byte
Expand Down Expand Up @@ -60,12 +63,12 @@ type WebSocket struct {
}

func (c *WebSocket) getName(socket *gws.Conn) string {
name, _ := socket.SessionStorage.Load("name")
name, _ := socket.Session().Load("name")
return name.(string)
}

func (c *WebSocket) getKey(socket *gws.Conn) string {
name, _ := socket.SessionStorage.Load("key")
name, _ := socket.Session().Load("key")
return name.(string)
}

Expand All @@ -74,7 +77,7 @@ func (c *WebSocket) OnOpen(socket *gws.Conn) {
if conn, ok := c.sessions.Load(name); ok {
conn.WriteClose(1000, []byte("connection replaced"))
}
socket.SetDeadline(time.Now().Add(3 * PingInterval))
socket.SetDeadline(time.Now().Add(PingInterval + HeartbeatWaitTimeout))
c.sessions.Store(name, socket)
log.Printf("%s connected\n", name)
}
Expand All @@ -90,7 +93,10 @@ func (c *WebSocket) OnClose(socket *gws.Conn, err error) {
log.Printf("onerror, name=%s, msg=%s\n", name, err.Error())
}

func (c *WebSocket) OnPing(socket *gws.Conn, payload []byte) {}
func (c *WebSocket) OnPing(socket *gws.Conn, payload []byte) {
_ = socket.SetDeadline(time.Now().Add(PingInterval + HeartbeatWaitTimeout))
_ = socket.WriteString("pong")
}

func (c *WebSocket) OnPong(socket *gws.Conn, payload []byte) {}

Expand All @@ -104,8 +110,7 @@ func (c *WebSocket) OnMessage(socket *gws.Conn, message *gws.Message) {

// chrome websocket不支持ping方法, 所以在text frame里面模拟ping
if b := message.Data.Bytes(); len(b) == 4 && string(b) == "ping" {
socket.WriteMessage(gws.OpcodeText, []byte("pong"))
socket.SetDeadline(time.Now().Add(3 * PingInterval))
c.OnPing(socket, nil)
return
}

Expand Down
16 changes: 3 additions & 13 deletions internal/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,16 +199,6 @@ func ToBinaryNumber[T Integer](n T) T {
return x
}

type Buffer struct {
buf []byte
off int
lastRead int8
}

//go:nosplit
func BufferReset(b *bytes.Buffer, p []byte) {
buffer := (*Buffer)(unsafe.Pointer(b))
buffer.buf = p
buffer.off = 0
buffer.lastRead = 0
}
// BufferReset 重置buffer底层切片
// 修改后面的属性一定要加偏移量!!!
func BufferReset(b *bytes.Buffer, p []byte) { *(*[]byte)(unsafe.Pointer(b)) = p }
26 changes: 19 additions & 7 deletions internal/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"bytes"
"encoding/binary"
"encoding/hex"
"github.com/stretchr/testify/assert"
"hash/fnv"
"io"
"net/http"
"reflect"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"unsafe"
)

func TestStringToBytes(t *testing.T) {
Expand Down Expand Up @@ -212,9 +213,20 @@ func TestGetIntersectionElem(t *testing.T) {
}

func TestResetBuffer(t *testing.T) {
var buf = bytes.NewBufferString("hello")
var p = buf.Bytes()
p = append(p, "world"...)
BufferReset(buf, p)
assert.Equal(t, "helloworld", buf.String())
{
var buffer = bytes.NewBufferString("hello")
var name = reflect.TypeOf(buffer).Elem().Field(0).Name
assert.Equal(t, "buf", name)
}

{
var buf = bytes.NewBufferString("")
BufferReset(buf, []byte("hello"))
assert.Equal(t, "hello", buf.String())

var p = buf.Bytes()
var sh1 = (*reflect.SliceHeader)(unsafe.Pointer(&p))
var sh2 = (*reflect.SliceHeader)(unsafe.Pointer(buf))
assert.Equal(t, sh1.Data, sh2.Data)
}
}
13 changes: 11 additions & 2 deletions option.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ type (
CompressThreshold int

// CompressorNum 压缩器数量
// 数值越大竞争的概率越小, 但是会耗费大量内存, 注意取舍
// 数值越大竞争的概率越小, 但是会耗费大量内存
// Number of compressors
// The higher the value the lower the probability of competition, but it will consume a lot of memory, so be careful about the trade-off
// The higher the value the lower the probability of competition, but it will consume a lot of memory
CompressorNum int

// 是否检查文本utf8编码, 关闭性能会好点
Expand Down Expand Up @@ -117,6 +117,14 @@ type (
}
)

func (c *ServerOption) deleteProtectedHeaders() {
c.ResponseHeader.Del(internal.Upgrade.Key)
c.ResponseHeader.Del(internal.Connection.Key)
c.ResponseHeader.Del(internal.SecWebSocketAccept.Key)
c.ResponseHeader.Del(internal.SecWebSocketExtensions.Key)
c.ResponseHeader.Del(internal.SecWebSocketProtocol.Key)
}

func initServerOption(c *ServerOption) *ServerOption {
if c == nil {
c = new(ServerOption)
Expand Down Expand Up @@ -158,6 +166,7 @@ func initServerOption(c *ServerOption) *ServerOption {
c.HandshakeTimeout = defaultHandshakeTimeout
}
c.CompressorNum = internal.ToBinaryNumber(c.CompressorNum)
c.deleteProtectedHeaders()

c.config = &Config{
readerPool: internal.NewPool(func() *bufio.Reader { return bufio.NewReaderSize(nil, c.ReadBufferSize) }),
Expand Down
10 changes: 9 additions & 1 deletion option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gws

import (
"compress/flate"
"net/http"
"testing"
"time"

Expand Down Expand Up @@ -49,7 +50,12 @@ func validateClientOption(as *assert.Assertions, option *ClientOption) {
// 检查默认配置
func TestDefaultUpgrader(t *testing.T) {
var as = assert.New(t)
var updrader = NewUpgrader(new(BuiltinEventHandler), nil)
var updrader = NewUpgrader(new(BuiltinEventHandler), &ServerOption{
ResponseHeader: http.Header{
"Sec-Websocket-Extensions": []string{"chat"},
"X-Server": []string{"gws"},
},
})
var config = updrader.option.getConfig()
as.Equal(false, config.CompressEnabled)
as.Equal(false, config.ReadAsyncEnabled)
Expand All @@ -66,6 +72,8 @@ func TestDefaultUpgrader(t *testing.T) {
as.NotNil(updrader.option.Authorize)
as.NotNil(updrader.option.NewSessionStorage)
as.Nil(updrader.option.SubProtocols)
as.Equal("", updrader.option.ResponseHeader.Get("Sec-Websocket-Extensions"))
as.Equal("gws", updrader.option.ResponseHeader.Get("X-Server"))
validateServerOption(as, updrader)
}

Expand Down
6 changes: 3 additions & 3 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"fmt"
"github.com/lxzan/gws/internal"
"unsafe"
)

func (c *Conn) checkMask(enabled bool) error {
Expand Down Expand Up @@ -93,8 +94,7 @@ func (c *Conn) readMessage() error {

var fin = c.fh.GetFIN()
var buf, index = binaryPool.Get(contentLength)
var p = buf.Bytes()
p = p[:contentLength]
var p = buf.Bytes()[:contentLength]
if err := internal.ReadN(c.br, p); err != nil {
return err
}
Expand All @@ -107,7 +107,7 @@ func (c *Conn) readMessage() error {
}

if fin && opcode != OpcodeContinuation {
internal.BufferReset(buf, p)
*(*[]byte)(unsafe.Pointer(buf)) = p
return c.emitMessage(&Message{index: index, Opcode: opcode, Data: buf, compressed: compressed})
}

Expand Down
Loading

0 comments on commit 2a1e621

Please sign in to comment.