From 2d79d129817d727dd661226ec3686502ee787080 Mon Sep 17 00:00:00 2001 From: lxzan Date: Sat, 17 Aug 2024 15:21:01 +0800 Subject: [PATCH] add write reader method --- .golangci.yaml | 1 + benchmark_test.go | 4 +- bigfile.go | 215 ++++++++++++++++++++++++++++++++ compress.go | 9 +- compress_test.go | 4 + examples/echo/main.go | 10 +- option.go | 6 + reader_test.go | 6 +- writer.go | 18 +-- writer_test.go | 278 ++++++++++++++++++++++++++++++++++++++++++ 10 files changed, 528 insertions(+), 23 deletions(-) create mode 100644 bigfile.go diff --git a/.golangci.yaml b/.golangci.yaml index 8b56cc76..6e5281d6 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -3,6 +3,7 @@ linters: # Disable specific linter # https://golangci-lint.run/usage/linters/#disabled-by-default disable: + - maintidx - mnd - testpackage - nlreturn diff --git a/benchmark_test.go b/benchmark_test.go index d6039687..f2ed3504 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -68,7 +68,7 @@ func BenchmarkConn_ReadMessage(b *testing.B) { conn: &benchConn{}, config: upgrader.option.getConfig(), } - var buf, _ = conn1.genFrame(OpcodeText, internal.Bytes(githubData), false) + var buf, _ = conn1.genFrame(OpcodeText, true, false, internal.Bytes(githubData), false) var reader = bytes.NewBuffer(buf.Bytes()) var conn2 = &Conn{ @@ -98,7 +98,7 @@ func BenchmarkConn_ReadMessage(b *testing.B) { deflater: new(deflater), } conn1.deflater.initialize(false, conn1.pd, config.ReadMaxPayloadSize) - var buf, _ = conn1.genFrame(OpcodeText, internal.Bytes(githubData), false) + var buf, _ = conn1.genFrame(OpcodeText, true, true, internal.Bytes(githubData), false) var reader = bytes.NewBuffer(buf.Bytes()) var conn2 = &Conn{ diff --git a/bigfile.go b/bigfile.go new file mode 100644 index 00000000..64bd945f --- /dev/null +++ b/bigfile.go @@ -0,0 +1,215 @@ +package gws + +import ( + "bytes" + "encoding/binary" + "errors" + "github.com/klauspost/compress/flate" + "github.com/lxzan/gws/internal" + "io" + "math" +) + +const segmentSize = 128 * 1024 + +// 获取大文件压缩器 +func (c *Conn) getBigDeflater() *bigDeflater { + if c.isServer { + return c.config.bdPool.Get() + } + return c.deflater.ToBigDeflater() +} + +// 回收大文件压缩器 +func (c *Conn) putBigDeflater(d *bigDeflater) { + if c.isServer { + c.config.bdPool.Put(d) + } +} + +// 拆分io.Reader为小切片 +func (c *Conn) splitReader(r io.Reader, f func(index int, eof bool, p []byte) error) error { + var buf = binaryPool.Get(segmentSize) + var p = buf.Bytes()[:segmentSize] + var n, index = 0, 0 + var err error + for n, err = r.Read(p); err == nil || errors.Is(err, io.EOF); n, err = r.Read(p) { + eof := errors.Is(err, io.EOF) + if err = f(index, eof, p[:n]); err != nil { + return err + } + index++ + if eof { + break + } + } + return err +} + +// WriteReader 大文件写入 +// 采用分段写入技术, 大大减少内存占用 +func (c *Conn) WriteReader(opcode Opcode, payload io.Reader) error { + err := c.doWriteReader(opcode, payload) + c.emitError(err) + return err +} + +func (c *Conn) doWriteReader(opcode Opcode, payload io.Reader) error { + c.mu.Lock() + defer c.mu.Unlock() + + var cb = func(index int, eof bool, p []byte) error { + op := internal.SelectValue(index == 0, opcode, OpcodeContinuation) + frame, err := c.genFrame(op, eof, false, internal.Bytes(p), false) + if err != nil { + return err + } + if c.pd.Enabled && index == 0 { + frame.Bytes()[0] |= uint8(64) + } + if c.isClosed() { + return ErrConnClosed + } + err = internal.WriteN(c.conn, frame.Bytes()) + binaryPool.Put(frame) + return err + } + + if c.pd.Enabled { + var deflater = c.getBigDeflater() + var fw = &flateWriter{cb: cb} + err := deflater.Compress(payload, fw, c.getCpsDict(false), &c.cpsWindow) + c.putBigDeflater(deflater) + return err + } else { + return c.splitReader(payload, cb) + } +} + +// 大文件压缩器 +type bigDeflater struct { + cpsWriter *flate.Writer +} + +// 初始化大文件压缩器 +// Initialize the bigDeflater +func (c *bigDeflater) initialize(isServer bool, options PermessageDeflate) *bigDeflater { + windowBits := internal.SelectValue(isServer, options.ServerMaxWindowBits, options.ClientMaxWindowBits) + if windowBits == 15 { + c.cpsWriter, _ = flate.NewWriter(nil, options.Level) + } else { + c.cpsWriter, _ = flate.NewWriterWindow(nil, internal.BinaryPow(windowBits)) + } + return c +} + +// Compress 压缩 +func (c *bigDeflater) Compress(src io.Reader, dst *flateWriter, dict []byte, sw *slideWindow) error { + if err := compressTo(c.cpsWriter, &readerWrapper{r: src, sw: sw}, dst, dict); err != nil { + return err + } + return dst.Flush() +} + +// 写入代理 +// 将切片透传给回调函数, 以实现分段写入功能 +type flateWriter struct { + index int + buffers []*bytes.Buffer + cb func(index int, eof bool, p []byte) error +} + +// 是否可以执行回调函数 +func (c *flateWriter) shouldCall() bool { + var n = len(c.buffers) + if n < 2 { + return false + } + var sum = 0 + for i := 1; i < n; i++ { + sum += c.buffers[i].Len() + } + return sum >= 4 +} + +// 聚合写入, 减少syscall.write次数 +func (c *flateWriter) write(p []byte) { + if len(c.buffers) == 0 { + var buf = binaryPool.Get(segmentSize) + c.buffers = append(c.buffers, buf) + } + var n = len(c.buffers) + var tail = c.buffers[n-1] + if tail.Len()+len(p) >= segmentSize { + var buf = binaryPool.Get(segmentSize) + c.buffers = append(c.buffers, buf) + tail = buf + } + tail.Write(p) +} + +func (c *flateWriter) Write(p []byte) (n int, err error) { + c.write(p) + if c.shouldCall() { + err = c.cb(c.index, false, c.buffers[0].Bytes()) + binaryPool.Put(c.buffers[0]) + c.buffers = c.buffers[1:] + c.index++ + } + return n, err +} + +func (c *flateWriter) Flush() error { + var buf = c.buffers[0] + for i := 1; i < len(c.buffers); i++ { + buf.Write(c.buffers[i].Bytes()) + binaryPool.Put(c.buffers[i]) + } + if n := buf.Len(); n >= 4 { + compressedContent := buf.Bytes() + if tail := compressedContent[n-4:]; binary.BigEndian.Uint32(tail) == math.MaxUint16 { + buf.Truncate(n - 4) + } + } + var err = c.cb(c.index, true, buf.Bytes()) + c.index++ + binaryPool.Put(buf) + return err +} + +// 将io.Reader包装为io.WriterTo +type readerWrapper struct { + r io.Reader + sw *slideWindow +} + +// WriteTo 写入内容, 并更新字典 +func (c *readerWrapper) WriteTo(w io.Writer) (int64, error) { + var buf = binaryPool.Get(segmentSize) + defer binaryPool.Put(buf) + + var p = buf.Bytes()[:segmentSize] + var sum, n = 0, 0 + var err error + for n, err = c.r.Read(p); err == nil || errors.Is(err, io.EOF); n, err = c.r.Read(p) { + eof := errors.Is(err, io.EOF) + if _, err = w.Write(p[:n]); err != nil { + return int64(sum), err + } + sum += n + _, _ = c.sw.Write(p[:n]) + if eof { + break + } + } + return int64(sum), err +} + +// 压缩公共函数 +func compressTo(cpsWriter *flate.Writer, r io.WriterTo, w io.Writer, dict []byte) error { + cpsWriter.ResetDict(w, dict) + if _, err := r.WriteTo(cpsWriter); err != nil { + return err + } + return cpsWriter.Flush() +} diff --git a/compress.go b/compress.go index 77ae64bb..3117743b 100644 --- a/compress.go +++ b/compress.go @@ -100,12 +100,7 @@ func (c *deflater) Decompress(src *bytes.Buffer, dict []byte) (*bytes.Buffer, er func (c *deflater) Compress(src internal.Payload, dst *bytes.Buffer, dict []byte) error { c.cpsLocker.Lock() defer c.cpsLocker.Unlock() - - c.cpsWriter.ResetDict(dst, dict) - if _, err := src.WriteTo(c.cpsWriter); err != nil { - return err - } - if err := c.cpsWriter.Flush(); err != nil { + if err := compressTo(c.cpsWriter, src, dst, dict); err != nil { return err } if n := dst.Len(); n >= 4 { @@ -117,6 +112,8 @@ func (c *deflater) Compress(src internal.Payload, dst *bytes.Buffer, dict []byte return nil } +func (c *deflater) ToBigDeflater() *bigDeflater { return &bigDeflater{cpsWriter: c.cpsWriter} } + // 滑动窗口 // Sliding window type slideWindow struct { diff --git a/compress_test.go b/compress_test.go index 97e57335..55d03070 100644 --- a/compress_test.go +++ b/compress_test.go @@ -259,3 +259,7 @@ func (c *writerTo) Len() int { func (c *writerTo) WriteTo(w io.Writer) (n int64, err error) { return 0, errors.New("1") } + +func (c *writerTo) Read(p []byte) (n int, err error) { + return 0, errors.New("1") +} diff --git a/examples/echo/main.go b/examples/echo/main.go index 497210e2..e7374ec7 100644 --- a/examples/echo/main.go +++ b/examples/echo/main.go @@ -1,10 +1,10 @@ package main import ( + "github.com/lxzan/gws" "log" "net/http" - - "github.com/lxzan/gws" + "os" ) func main() { @@ -41,5 +41,9 @@ func (c *Handler) OnPing(socket *gws.Conn, payload []byte) { func (c *Handler) OnMessage(socket *gws.Conn, message *gws.Message) { defer message.Close() - _ = socket.WriteMessage(message.Opcode, message.Bytes()) + //file, _ := os.OpenFile("C:\\msys64\\home\\lxzan\\Open\\gws\\assets\\github.json", os.O_RDONLY, 0644) + file, _ := os.OpenFile("C:\\Users\\lxzan\\Pictures\\mg.png", os.O_RDONLY, 0644) + defer file.Close() + _ = socket.WriteReader(gws.OpcodeBinary, file) + //_ = socket.WriteReader(message.Opcode, message) } diff --git a/option.go b/option.go index 374b5ca4..e3c517f6 100644 --- a/option.go +++ b/option.go @@ -104,6 +104,9 @@ type ( // Memory pool for bufio.Reader brPool *internal.Pool[*bufio.Reader] + // 大文件压缩器 + bdPool *internal.Pool[*bigDeflater] + // 压缩器滑动窗口内存池 // Memory pool for compressor sliding window cswPool *internal.Pool[[]byte] @@ -320,6 +323,9 @@ func initServerOption(c *ServerOption) *ServerOption { } if c.PermessageDeflate.Enabled { + c.config.bdPool = internal.NewPool[*bigDeflater](func() *bigDeflater { + return new(bigDeflater).initialize(true, c.PermessageDeflate) + }) if c.PermessageDeflate.ServerContextTakeover { windowSize := internal.BinaryPow(c.PermessageDeflate.ServerMaxWindowBits) c.config.cswPool = internal.NewPool[[]byte](func() []byte { diff --git a/reader_test.go b/reader_test.go index 5807e894..23e00593 100644 --- a/reader_test.go +++ b/reader_test.go @@ -287,7 +287,7 @@ func TestSegments(t *testing.T) { go client.ReadLoop() go func() { - frame, _ := client.genFrame(OpcodeText, internal.Bytes(testdata), false) + frame, _ := client.genFrame(OpcodeText, true, true, internal.Bytes(testdata), false) data := frame.Bytes() data[20] = 'x' client.conn.Write(data) @@ -366,7 +366,7 @@ func TestConn_ReadMessage(t *testing.T) { var serverHandler = &webSocketMocker{} serverHandler.onOpen = func(socket *Conn) { var p = []byte("123") - frame, _ := socket.genFrame(OpcodePing, internal.Bytes(p), false) + frame, _ := socket.genFrame(OpcodePing, true, socket.pd.Enabled, internal.Bytes(p), false) socket.conn.Write(frame.Bytes()[:2]) socket.conn.Close() } @@ -391,7 +391,7 @@ func TestConn_ReadMessage(t *testing.T) { var serverHandler = &webSocketMocker{} serverHandler.onOpen = func(socket *Conn) { var p = []byte("123") - frame, _ := socket.genFrame(OpcodeText, internal.Bytes(p), false) + frame, _ := socket.genFrame(OpcodeText, true, socket.pd.Enabled, internal.Bytes(p), false) socket.conn.Write(frame.Bytes()[:2]) socket.conn.Close() } diff --git a/writer.go b/writer.go index ce4e83e0..e3a43718 100644 --- a/writer.go +++ b/writer.go @@ -104,7 +104,7 @@ func (c *Conn) doWrite(opcode Opcode, payload internal.Payload) error { return ErrConnClosed } - frame, err := c.genFrame(opcode, payload, false) + frame, err := c.genFrame(opcode, true, c.pd.Enabled, payload, false) if err != nil { return err } @@ -117,8 +117,8 @@ func (c *Conn) doWrite(opcode Opcode, payload internal.Payload) error { // 生成帧数据 // Generates the frame data -func (c *Conn) genFrame(opcode Opcode, payload internal.Payload, isBroadcast bool) (*bytes.Buffer, error) { - if opcode == OpcodeText && !payload.CheckEncoding(c.config.CheckUtf8Enabled, uint8(opcode)) { +func (c *Conn) genFrame(opcode Opcode, fin bool, compress bool, payload internal.Payload, isBroadcast bool) (*bytes.Buffer, error) { + if opcode == OpcodeText && fin && !payload.CheckEncoding(c.config.CheckUtf8Enabled, uint8(opcode)) { return nil, internal.NewError(internal.CloseUnsupportedData, ErrTextEncoding) } @@ -131,12 +131,12 @@ func (c *Conn) genFrame(opcode Opcode, payload internal.Payload, isBroadcast boo var buf = binaryPool.Get(n + frameHeaderSize) buf.Write(framePadding[0:]) - if c.pd.Enabled && opcode.isDataFrame() && n >= c.pd.Threshold { - return c.compressData(buf, opcode, payload, isBroadcast) + if compress && opcode.isDataFrame() && n >= c.pd.Threshold { + return c.compressData(buf, opcode, fin, payload, isBroadcast) } var header = frameHeader{} - headerLength, maskBytes := header.GenerateHeader(c.isServer, true, false, opcode, n) + headerLength, maskBytes := header.GenerateHeader(c.isServer, fin, false, opcode, n) _, _ = payload.WriteTo(buf) var contents = buf.Bytes() if !c.isServer { @@ -150,7 +150,7 @@ func (c *Conn) genFrame(opcode Opcode, payload internal.Payload, isBroadcast boo // 压缩数据并生成帧 // Compresses the data and generates the frame -func (c *Conn) compressData(buf *bytes.Buffer, opcode Opcode, payload internal.Payload, isBroadcast bool) (*bytes.Buffer, error) { +func (c *Conn) compressData(buf *bytes.Buffer, opcode Opcode, fin bool, payload internal.Payload, isBroadcast bool) (*bytes.Buffer, error) { err := c.deflater.Compress(payload, buf, c.getCpsDict(isBroadcast)) if err != nil { return nil, err @@ -158,7 +158,7 @@ func (c *Conn) compressData(buf *bytes.Buffer, opcode Opcode, payload internal.P var contents = buf.Bytes() var payloadSize = buf.Len() - frameHeaderSize var header = frameHeader{} - headerLength, maskBytes := header.GenerateHeader(c.isServer, true, true, opcode, payloadSize) + headerLength, maskBytes := header.GenerateHeader(c.isServer, fin, true, opcode, payloadSize) if !c.isServer { internal.MaskXOR(contents[frameHeaderSize:], maskBytes) } @@ -218,7 +218,7 @@ func (c *Broadcaster) Broadcast(socket *Conn) error { var msg = c.msgs[idx] msg.once.Do(func() { - msg.frame, msg.err = socket.genFrame(c.opcode, internal.Bytes(c.payload), true) + msg.frame, msg.err = socket.genFrame(c.opcode, true, socket.pd.Enabled, internal.Bytes(c.payload), true) }) if msg.err != nil { return msg.err diff --git a/writer_test.go b/writer_test.go index c6b1ff65..4bb1528a 100644 --- a/writer_test.go +++ b/writer_test.go @@ -509,3 +509,281 @@ func TestConn_Async(t *testing.T) { wg.Wait() assert.True(t, internal.IsSameSlice(arr1, arr2)) } + +func TestConn_WriteReader(t *testing.T) { + t.Run("context_take_over 1", func(t *testing.T) { + var pd = PermessageDeflate{ + Enabled: true, + ServerContextTakeover: true, + ClientContextTakeover: true, + Threshold: 1, + } + var serverHandler = new(webSocketMocker) + var clientHandler = new(webSocketMocker) + var serverOption = &ServerOption{ + PermessageDeflate: pd, + } + var clientOption = &ClientOption{ + PermessageDeflate: pd, + } + var wg = &sync.WaitGroup{} + wg.Add(1) + + var content = internal.AlphabetNumeric.Generate(512 * 1024) + clientHandler.onMessage = func(socket *Conn, message *Message) { + if bytes.Equal(message.Bytes(), content) { + wg.Done() + } + } + + server, client := newPeer(serverHandler, serverOption, clientHandler, clientOption) + go server.ReadLoop() + go client.ReadLoop() + + var err = server.WriteReader(OpcodeBinary, bytes.NewReader(content)) + assert.NoError(t, err) + wg.Wait() + }) + + t.Run("context_take_over 2", func(t *testing.T) { + var pd = PermessageDeflate{ + Enabled: true, + ServerContextTakeover: true, + ClientContextTakeover: true, + ServerMaxWindowBits: 15, + ClientMaxWindowBits: 15, + Threshold: 1, + } + var serverHandler = new(webSocketMocker) + var clientHandler = new(webSocketMocker) + var serverOption = &ServerOption{ + PermessageDeflate: pd, + } + var clientOption = &ClientOption{ + PermessageDeflate: pd, + } + var wg = &sync.WaitGroup{} + wg.Add(1) + + var content = internal.AlphabetNumeric.Generate(512 * 1024) + clientHandler.onMessage = func(socket *Conn, message *Message) { + if bytes.Equal(message.Bytes(), content) { + wg.Done() + } + } + + server, client := newPeer(serverHandler, serverOption, clientHandler, clientOption) + go server.ReadLoop() + go client.ReadLoop() + + var err = server.WriteReader(OpcodeBinary, bytes.NewReader(content)) + assert.NoError(t, err) + wg.Wait() + }) + + t.Run("context_take_over 3", func(t *testing.T) { + var pd = PermessageDeflate{ + Enabled: true, + ServerContextTakeover: true, + ClientContextTakeover: true, + ServerMaxWindowBits: 15, + ClientMaxWindowBits: 15, + Threshold: 1, + } + var serverHandler = new(webSocketMocker) + var clientHandler = new(webSocketMocker) + var serverOption = &ServerOption{ + PermessageDeflate: pd, + } + var clientOption = &ClientOption{ + PermessageDeflate: pd, + } + var count = 1000 + var wg = &sync.WaitGroup{} + wg.Add(count) + + clientHandler.onMessage = func(socket *Conn, message *Message) { + wg.Done() + } + + server, client := newPeer(serverHandler, serverOption, clientHandler, clientOption) + go server.ReadLoop() + go client.ReadLoop() + + for i := 0; i < count; i++ { + var length = 128*1024 + internal.AlphabetNumeric.Intn(10) + var content = internal.AlphabetNumeric.Generate(length) + var err = server.WriteReader(OpcodeBinary, bytes.NewReader(content)) + assert.NoError(t, err) + } + wg.Wait() + }) + + t.Run("no_context_take_over", func(t *testing.T) { + var pd = PermessageDeflate{ + Enabled: true, + ServerContextTakeover: false, + ClientContextTakeover: false, + Threshold: 1, + } + var serverHandler = new(webSocketMocker) + var clientHandler = new(webSocketMocker) + var serverOption = &ServerOption{ + PermessageDeflate: pd, + } + var clientOption = &ClientOption{ + PermessageDeflate: pd, + } + var wg = &sync.WaitGroup{} + wg.Add(1) + + var content = internal.AlphabetNumeric.Generate(512 * 1024) + serverHandler.onMessage = func(socket *Conn, message *Message) { + if bytes.Equal(message.Bytes(), content) { + wg.Done() + } + } + + server, client := newPeer(serverHandler, serverOption, clientHandler, clientOption) + go server.ReadLoop() + go client.ReadLoop() + + var err = client.WriteReader(OpcodeBinary, bytes.NewReader(content)) + assert.NoError(t, err) + wg.Wait() + }) + + t.Run("no_compress", func(t *testing.T) { + var pd = PermessageDeflate{ + Enabled: false, + } + var serverHandler = new(webSocketMocker) + var clientHandler = new(webSocketMocker) + var serverOption = &ServerOption{ + PermessageDeflate: pd, + } + var clientOption = &ClientOption{ + PermessageDeflate: pd, + } + var wg = &sync.WaitGroup{} + wg.Add(1) + + var content = internal.AlphabetNumeric.Generate(512 * 1024) + serverHandler.onMessage = func(socket *Conn, message *Message) { + if bytes.Equal(message.Bytes(), content) { + wg.Done() + } + } + + server, client := newPeer(serverHandler, serverOption, clientHandler, clientOption) + go server.ReadLoop() + go client.ReadLoop() + + var err = client.WriteReader(OpcodeBinary, bytes.NewReader(content)) + assert.NoError(t, err) + wg.Wait() + }) + + t.Run("close 1", func(t *testing.T) { + var pd = PermessageDeflate{ + Enabled: false, + } + var serverHandler = new(webSocketMocker) + var clientHandler = new(webSocketMocker) + var serverOption = &ServerOption{ + PermessageDeflate: pd, + } + var clientOption = &ClientOption{ + PermessageDeflate: pd, + } + var wg = &sync.WaitGroup{} + wg.Add(1) + + var content = internal.AlphabetNumeric.Generate(512 * 1024) + serverHandler.onClose = func(socket *Conn, err error) { + if ev, ok := err.(*CloseError); ok && ev.Code == 1000 { + wg.Done() + } + } + + server, client := newPeer(serverHandler, serverOption, clientHandler, clientOption) + go server.ReadLoop() + go client.ReadLoop() + + client.WriteClose(1000, nil) + var err = client.WriteReader(OpcodeBinary, bytes.NewReader(content)) + assert.Error(t, err) + wg.Wait() + }) + + t.Run("msg too big", func(t *testing.T) { + var pd = PermessageDeflate{ + Enabled: false, + } + var serverHandler = new(webSocketMocker) + var clientHandler = new(webSocketMocker) + var serverOption = &ServerOption{ + PermessageDeflate: pd, + } + var clientOption = &ClientOption{ + PermessageDeflate: pd, + WriteMaxPayloadSize: 1024, + } + var wg = &sync.WaitGroup{} + wg.Add(1) + + var content = internal.AlphabetNumeric.Generate(512 * 1024) + clientHandler.onClose = func(socket *Conn, err error) { + wg.Done() + } + + server, client := newPeer(serverHandler, serverOption, clientHandler, clientOption) + go server.ReadLoop() + go client.ReadLoop() + + var err = client.WriteReader(OpcodeBinary, bytes.NewReader(content)) + assert.Error(t, err) + wg.Wait() + }) + + t.Run("", func(t *testing.T) { + deflater := new(bigDeflater).initialize(true, PermessageDeflate{ + Enabled: true, + ServerMaxWindowBits: 12, + ClientMaxWindowBits: 12, + }) + var fw = &flateWriter{cb: func(index int, eof bool, p []byte) error { + return nil + }} + err := deflater.Compress(new(writerTo), fw, nil, new(slideWindow)) + assert.Error(t, err) + }) + + t.Run("", func(t *testing.T) { + deflater := new(bigDeflater).initialize(true, PermessageDeflate{ + Enabled: true, + ServerMaxWindowBits: 12, + ClientMaxWindowBits: 12, + }) + var fw = &flateWriter{cb: func(index int, eof bool, p []byte) error { + return errors.New("2") + }} + src := bytes.NewBufferString("hello") + err := deflater.Compress(src, fw, nil, new(slideWindow)) + assert.Error(t, err) + }) + + t.Run("", func(t *testing.T) { + var fw = &flateWriter{ + cb: func(index int, eof bool, p []byte) error { + return nil + }, + buffers: []*bytes.Buffer{ + bytes.NewBufferString("he"), + bytes.NewBufferString("llo"), + }, + } + var err = fw.Flush() + assert.NoError(t, err) + }) +}