Skip to content

Commit

Permalink
update test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
lxzan committed Aug 19, 2024
1 parent 2d79d12 commit b65e852
Show file tree
Hide file tree
Showing 15 changed files with 189 additions and 100 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,9 @@ ok github.com/lxzan/gws 17.231s
- [x] Broadcast
- [x] Dial via Proxy
- [x] Context-Takeover
- [x] Passed Autobahn Test Cases [Server](https://lxzan.github.io/gws/reports/servers/) / [Client](https://lxzan.github.io/gws/reports/clients/)
- [x] Concurrent & Asynchronous Non-Blocking Write
- [x] Segmented Writing of Large Files
- [x] Passed Autobahn Test Cases [Server](https://lxzan.github.io/gws/reports/servers/) / [Client](https://lxzan.github.io/gws/reports/clients/)

### Attention

Expand Down
1 change: 1 addition & 0 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ ok github.com/lxzan/gws 17.231s
- [x] 广播
- [x] 代理拨号
- [x] 上下文接管
- [x] 大文件分段写入
- [x] 支持并发和异步非阻塞写入
- [x] 通过所有 Autobahn 测试用例 [Server](https://lxzan.github.io/gws/reports/servers/) / [Client](https://lxzan.github.io/gws/reports/clients/)

Expand Down
14 changes: 12 additions & 2 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ func BenchmarkConn_ReadMessage(b *testing.B) {
conn: &benchConn{},
config: upgrader.option.getConfig(),
}
var buf, _ = conn1.genFrame(OpcodeText, true, false, internal.Bytes(githubData), false)
var buf, _ = conn1.genFrame(OpcodeText, internal.Bytes(githubData), frameConfig{
fin: true,
compress: conn1.pd.Enabled,
broadcast: false,
checkEncoding: false,
})

var reader = bytes.NewBuffer(buf.Bytes())
var conn2 = &Conn{
Expand Down Expand Up @@ -98,7 +103,12 @@ func BenchmarkConn_ReadMessage(b *testing.B) {
deflater: new(deflater),
}
conn1.deflater.initialize(false, conn1.pd, config.ReadMaxPayloadSize)
var buf, _ = conn1.genFrame(OpcodeText, true, true, internal.Bytes(githubData), false)
var buf, _ = conn1.genFrame(OpcodeText, internal.Bytes(githubData), frameConfig{
fin: true,
compress: conn1.pd.Enabled,
broadcast: false,
checkEncoding: false,
})

var reader = bytes.NewBuffer(buf.Bytes())
var conn2 = &Conn{
Expand Down
72 changes: 44 additions & 28 deletions bigfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,34 @@ import (
"bytes"
"encoding/binary"
"errors"
"github.com/klauspost/compress/flate"
"github.com/lxzan/gws/internal"
"io"
"math"

"github.com/klauspost/compress/flate"
"github.com/lxzan/gws/internal"
)

const segmentSize = 128 * 1024

// 获取大文件压缩器
// Get bigDeflater
func (c *Conn) getBigDeflater() *bigDeflater {
if c.isServer {
return c.config.bdPool.Get()
}
return c.deflater.ToBigDeflater()
return (*bigDeflater)(c.deflater.cpsWriter)
}

// 回收大文件压缩器
// Recycle bigDeflater
func (c *Conn) putBigDeflater(d *bigDeflater) {
if c.isServer {
c.config.bdPool.Put(d)
}
}

// 拆分io.Reader为小切片
// Split io.Reader into small slices
func (c *Conn) splitReader(r io.Reader, f func(index int, eof bool, p []byte) error) error {
var buf = binaryPool.Get(segmentSize)
var p = buf.Bytes()[:segmentSize]
Expand All @@ -46,21 +50,29 @@ func (c *Conn) splitReader(r io.Reader, f func(index int, eof bool, p []byte) er
return err
}

// WriteReader 大文件写入
// 采用分段写入技术, 大大减少内存占用
func (c *Conn) WriteReader(opcode Opcode, payload io.Reader) error {
err := c.doWriteReader(opcode, payload)
// WriteFile 大文件写入
// 采用分段写入技术, 减少写入过程中的内存占用
// Segmented write technology to reduce memory usage during write process
func (c *Conn) WriteFile(opcode Opcode, payload io.Reader) error {
err := c.doWriteFile(opcode, payload)
c.emitError(err)
return err
}

func (c *Conn) doWriteReader(opcode Opcode, payload io.Reader) error {
func (c *Conn) doWriteFile(opcode Opcode, payload io.Reader) error {
c.mu.Lock()
defer c.mu.Unlock()

var cb = func(index int, eof bool, p []byte) error {
op := internal.SelectValue(index == 0, opcode, OpcodeContinuation)
frame, err := c.genFrame(op, eof, false, internal.Bytes(p), false)
if index > 0 {
opcode = OpcodeContinuation
}
frame, err := c.genFrame(opcode, internal.Bytes(p), frameConfig{
fin: eof,
compress: false,
broadcast: false,
checkEncoding: false,
})
if err != nil {
return err
}
Expand All @@ -87,39 +99,43 @@ func (c *Conn) doWriteReader(opcode Opcode, payload io.Reader) error {
}

// 大文件压缩器
type bigDeflater struct {
cpsWriter *flate.Writer
}
type bigDeflater flate.Writer

// 初始化大文件压缩器
// Initialize the bigDeflater
func (c *bigDeflater) initialize(isServer bool, options PermessageDeflate) *bigDeflater {
// 创建大文件压缩器
// Create a bigDeflater
func newBigDeflater(isServer bool, options PermessageDeflate) *bigDeflater {
windowBits := internal.SelectValue(isServer, options.ServerMaxWindowBits, options.ClientMaxWindowBits)
if windowBits == 15 {
c.cpsWriter, _ = flate.NewWriter(nil, options.Level)
cpsWriter, _ := flate.NewWriter(nil, options.Level)
return (*bigDeflater)(cpsWriter)
} else {
c.cpsWriter, _ = flate.NewWriterWindow(nil, internal.BinaryPow(windowBits))
cpsWriter, _ := flate.NewWriterWindow(nil, internal.BinaryPow(windowBits))
return (*bigDeflater)(cpsWriter)
}
return c
}

func (c *bigDeflater) FlateWriter() *flate.Writer { return (*flate.Writer)(c) }

// Compress 压缩
func (c *bigDeflater) Compress(src io.Reader, dst *flateWriter, dict []byte, sw *slideWindow) error {
if err := compressTo(c.cpsWriter, &readerWrapper{r: src, sw: sw}, dst, dict); err != nil {
if err := compressTo(c.FlateWriter(), &readerWrapper{r: src, sw: sw}, dst, dict); err != nil {
return err
}
return dst.Flush()
}

// 写入代理
// 将切片透传给回调函数, 以实现分段写入功能
// Write proxy
// Passthrough slices to the callback function for segmented writes.
type flateWriter struct {
index int
buffers []*bytes.Buffer
cb func(index int, eof bool, p []byte) error
}

// 是否可以执行回调函数
// Whether the callback function can be executed
func (c *flateWriter) shouldCall() bool {
var n = len(c.buffers)
if n < 2 {
Expand All @@ -132,18 +148,17 @@ func (c *flateWriter) shouldCall() bool {
return sum >= 4
}

// 聚合写入, 减少syscall.write次数
// 聚合写入, 减少syscall.write调用次数
// Aggregate writes, reducing the number of syscall.write calls
func (c *flateWriter) write(p []byte) {
if len(c.buffers) == 0 {
var buf = binaryPool.Get(segmentSize)
c.buffers = append(c.buffers, buf)
c.buffers = append(c.buffers, binaryPool.Get(segmentSize))
}
var n = len(c.buffers)
var tail = c.buffers[n-1]
if tail.Len()+len(p) >= segmentSize {
var buf = binaryPool.Get(segmentSize)
c.buffers = append(c.buffers, buf)
tail = buf
if tail.Len()+len(p)+frameHeaderSize > tail.Cap() {
tail = binaryPool.Get(segmentSize)
c.buffers = append(c.buffers, tail)
}
tail.Write(p)
}
Expand Down Expand Up @@ -178,12 +193,14 @@ func (c *flateWriter) Flush() error {
}

// 将io.Reader包装为io.WriterTo
// Wrapping io.Reader as io.WriterTo
type readerWrapper struct {
r io.Reader
sw *slideWindow
}

// WriteTo 写入内容, 并更新字典
// Write the contents, and update the dictionary
func (c *readerWrapper) WriteTo(w io.Writer) (int64, error) {
var buf = binaryPool.Get(segmentSize)
defer binaryPool.Put(buf)
Expand All @@ -205,7 +222,6 @@ func (c *readerWrapper) WriteTo(w io.Writer) (int64, error) {
return int64(sum), err
}

// 压缩公共函数
func compressTo(cpsWriter *flate.Writer, r io.WriterTo, w io.Writer, dict []byte) error {
cpsWriter.ResetDict(w, dict)
if _, err := r.WriteTo(cpsWriter); err != nil {
Expand Down
3 changes: 3 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ func (c *connector) handshake() (*Conn, *http.Response, error) {
writeQueue: workerQueue{maxConcurrency: 1},
readQueue: make(channel, c.option.ParallelGolimit),
}

// 压缩字典和解压字典内存开销比较大, 故使用懒加载
// Compressing and decompressing dictionaries has a large memory overhead, so use lazy loading.
if pd.Enabled {
socket.deflater.initialize(false, pd, c.option.ReadMaxPayloadSize)
if pd.ServerContextTakeover {
Expand Down
2 changes: 0 additions & 2 deletions compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,6 @@ func (c *deflater) Compress(src internal.Payload, dst *bytes.Buffer, dict []byte
return nil
}

func (c *deflater) ToBigDeflater() *bigDeflater { return &bigDeflater{cpsWriter: c.cpsWriter} }

// 滑动窗口
// Sliding window
type slideWindow struct {
Expand Down
2 changes: 1 addition & 1 deletion examples/chatroom/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func main() {

func MustLoad[T any](session gws.SessionStorage, key string) (v T) {
if value, exist := session.Load(key); exist {
v = value.(T)
v, _ = value.(T)
}
return
}
Expand Down
10 changes: 3 additions & 7 deletions examples/echo/main.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package main

import (
"github.com/lxzan/gws"
"log"
"net/http"
"os"

"github.com/lxzan/gws"
)

func main() {
Expand Down Expand Up @@ -41,9 +41,5 @@ func (c *Handler) OnPing(socket *gws.Conn, payload []byte) {

func (c *Handler) OnMessage(socket *gws.Conn, message *gws.Message) {
defer message.Close()
//file, _ := os.OpenFile("C:\\msys64\\home\\lxzan\\Open\\gws\\assets\\github.json", os.O_RDONLY, 0644)
file, _ := os.OpenFile("C:\\Users\\lxzan\\Pictures\\mg.png", os.O_RDONLY, 0644)
defer file.Close()
_ = socket.WriteReader(gws.OpcodeBinary, file)
//_ = socket.WriteReader(message.Opcode, message)
_ = socket.WriteMessage(message.Opcode, message.Bytes())
}
83 changes: 48 additions & 35 deletions examples/push/main.go
Original file line number Diff line number Diff line change
@@ -1,67 +1,80 @@
package main

import (
"bufio"
"fmt"
"log"
"net"
"net/http"

"github.com/lxzan/gws"
)

func main() {
var app = gws.NewServer(new(Handler), nil)
var h = &Handler{conns: gws.NewConcurrentMap[string, *gws.Conn]()}

app.OnRequest = func(conn net.Conn, br *bufio.Reader, r *http.Request) {
socket, err := app.GetUpgrader().UpgradeFromConn(conn, br, r)
var upgrader = gws.NewUpgrader(h, &gws.ServerOption{
PermessageDeflate: gws.PermessageDeflate{
Enabled: true,
ServerContextTakeover: true,
ClientContextTakeover: true,
},
})

http.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
socket, err := upgrader.Upgrade(writer, request)
if err != nil {
log.Print(err.Error())
log.Println(err.Error())
return
}
var channel = make(chan []byte, 8)
var closer = make(chan struct{})
socket.Session().Store("channel", channel)
socket.Session().Store("closer", closer)
go socket.ReadLoop()
websocketKey := request.Header.Get("Sec-WebSocket-Key")
socket.Session().Store("websocketKey", websocketKey)
h.conns.Store(websocketKey, socket)
go func() {
for {
select {
case p := <-channel:
_ = socket.WriteMessage(gws.OpcodeText, p)
case <-closer:
return
}
}
socket.ReadLoop()
}()
})

go func() {
if err := http.ListenAndServe(":8000", nil); err != nil {
return
}
}()

for {
var msg = ""
if _, err := fmt.Scanf("%s\n", &msg); err != nil {
log.Println(err.Error())
return
}
h.Broadcast(msg)
}
}

log.Fatalf("%v", app.Run(":8000"))
func getSession[T any](s gws.SessionStorage, key string) (val T) {
if v, ok := s.Load(key); ok {
val, _ = v.(T)
}
return
}

type Handler struct {
gws.BuiltinEventHandler
conns *gws.ConcurrentMap[string, *gws.Conn]
}

func (c *Handler) getSession(socket *gws.Conn, key string) any {
v, _ := socket.Session().Load(key)
return v
}

func (c *Handler) Send(socket *gws.Conn, payload []byte) {
var channel = c.getSession(socket, "channel").(chan []byte)
select {
case channel <- payload:
default:
return
}
func (c *Handler) Broadcast(msg string) {
var b = gws.NewBroadcaster(gws.OpcodeText, []byte(msg))
c.conns.Range(func(key string, conn *gws.Conn) bool {
_ = b.Broadcast(conn)
return true
})
_ = b.Close()
}

func (c *Handler) OnClose(socket *gws.Conn, err error) {
var closer = c.getSession(socket, "closer").(chan struct{})
closer <- struct{}{}
websocketKey := getSession[string](socket.Session(), "websocketKey")
c.conns.Delete(websocketKey)
}

func (c *Handler) OnMessage(socket *gws.Conn, message *gws.Message) {
defer message.Close()
_ = socket.WriteMessage(message.Opcode, message.Bytes())
}
Loading

0 comments on commit b65e852

Please sign in to comment.