From 232af0a9c25e9e4b5793c3c3d6252df9106ee492 Mon Sep 17 00:00:00 2001 From: Slava Markeyev Date: Tue, 30 Apr 2024 13:32:31 -0700 Subject: [PATCH] Response byte reuse (#13) * build request from byte args and encode it * fetch slice from pool * return ByteResponse * wip * update cluster tests * update example test * update conn test --- Makefile | 2 +- redis/byteslice/byteslice.go | 80 ++++++++++++++++++++++++++++++++++++ redis/example_test.go | 2 +- redis/reader.go | 28 +++++++++++-- redis/reader_test.go | 14 +++---- rediscluster/bench/go.sum | 4 ++ rediscluster/cluster_test.go | 42 +++++++++++-------- redisconn/bench/go.sum | 4 ++ redisconn/conn.go | 8 ++-- redisconn/conn_test.go | 19 ++++----- redisdumb/conn.go | 8 +++- 11 files changed, 163 insertions(+), 48 deletions(-) create mode 100644 redis/byteslice/byteslice.go diff --git a/Makefile b/Makefile index 1edb9e8..d43e5ef 100644 --- a/Makefile +++ b/Makefile @@ -11,7 +11,7 @@ test: testcluster testconn testredis mv redis-$(REDIS_VERSION)/src/redis-server /tmp/redis-server rm redis-$(REDIS_VERSION) -rf -testredis: /tmp/redis-server/redis-server +testredis: PATH=/tmp/redis-server/:${PATH} go test ./redis testconn: /tmp/redis-server/redis-server diff --git a/redis/byteslice/byteslice.go b/redis/byteslice/byteslice.go new file mode 100644 index 0000000..3fd9ac8 --- /dev/null +++ b/redis/byteslice/byteslice.go @@ -0,0 +1,80 @@ +// Copyright (c) 2021 The Gnet Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package byteslice + +import ( + "math" + "math/bits" + "reflect" + "runtime" + "sync" + "unsafe" +) + +var builtinPool Pool + +// Pool consists of 32 sync.Pool, representing byte slices of length from 0 to 32 in powers of 2. +type Pool struct { + pools [32]sync.Pool +} + +// Get returns a byte slice with given length from the built-in pool. +func Get(size int) []byte { + return builtinPool.Get(size) +} + +// Put returns the byte slice to the built-in pool. +func Put(buf []byte) { + builtinPool.Put(buf) +} + +// Get retrieves a byte slice of the length requested by the caller from pool or allocates a new one. +func (p *Pool) Get(size int) (buf []byte) { + if size <= 0 { + return nil + } + if size > math.MaxInt32 { + return make([]byte, size) + } + idx := index(uint32(size)) + ptr, _ := p.pools[idx].Get().(unsafe.Pointer) + if ptr == nil { + return make([]byte, 1< math.MaxInt32 { + return + } + idx := index(uint32(size)) + if size != 1< // ['\x02' '\x01' "2" "1"] diff --git a/redis/reader.go b/redis/reader.go index d568474..d110a35 100644 --- a/redis/reader.go +++ b/redis/reader.go @@ -3,14 +3,26 @@ package redis import ( "bufio" "bytes" + "github.com/joomcode/redispipe/redis/byteslice" "io" "strings" "github.com/joomcode/errorx" ) +var readerPool byteslice.Pool + +type ByteResponse struct { + Val []byte +} + +// Release puts Val byte slice back to pool for reuse. You must not use Val after this. +func (br *ByteResponse) Release() { + readerPool.Put(br.Val) +} + // ReadResponse reads single RESP answer from bufio.Reader -func ReadResponse(b *bufio.Reader) interface{} { +func ReadResponse(b *bufio.Reader, wrapBytes bool) interface{} { line, isPrefix, err := b.ReadLine() if err != nil { return ErrIO.WrapWithNoMessage(err) @@ -72,14 +84,22 @@ func ReadResponse(b *bufio.Reader) interface{} { if v < 0 { return nil } - buf := make([]byte, v+2, v+2) + + buf := readerPool.Get(int(v + 2)) if _, err = io.ReadFull(b, buf); err != nil { return ErrIO.WrapWithNoMessage(err) } + if buf[v] != '\r' || buf[v+1] != '\n' { + readerPool.Put(buf) return ErrNoFinalRN.NewWithNoMessage() } - return buf[:v:v] + + if wrapBytes { + return ByteResponse{Val: buf[:v]} + } + + return buf[:v] case '*': var rerr *errorx.Error if v, rerr = parseInt(line[1:]); rerr != nil { @@ -90,7 +110,7 @@ func ReadResponse(b *bufio.Reader) interface{} { } result := make([]interface{}, v) for i := int64(0); i < v; i++ { - result[i] = ReadResponse(b) + result[i] = ReadResponse(b, false) if e, ok := result[i].(*errorx.Error); ok && !e.IsOfType(ErrResult) { return e } diff --git a/redis/reader_test.go b/redis/reader_test.go index 0a0c478..a8ec4c9 100644 --- a/redis/reader_test.go +++ b/redis/reader_test.go @@ -18,7 +18,7 @@ func lines2bufio(lines ...string) *bufio.Reader { } func readLines(lines ...string) interface{} { - return ReadResponse(lines2bufio(lines...)) + return ReadResponse(lines2bufio(lines...), true) } func checkErrType(t *testing.T, res interface{}, kind *errorx.Type) bool { @@ -156,21 +156,17 @@ func TestReadResponse_Correct(t *testing.T) { assert.Equal(t, int64(-9223372036854775808), res) res = readLines("$0\r\n", "\r\n") - assert.Equal(t, []byte(""), res) - assert.Equal(t, len(res.([]byte)), cap(res.([]byte))) + assert.Equal(t, ByteResponse{Val: []byte("")}, res) res = readLines("$1\r\n", "a\r\n") - assert.Equal(t, []byte("a"), res) - assert.Equal(t, len(res.([]byte)), cap(res.([]byte))) + assert.Equal(t, ByteResponse{Val: []byte("a")}, res) res = readLines("$4\r\n", "asdf\r\n") - assert.Equal(t, []byte("asdf"), res) - assert.Equal(t, len(res.([]byte)), cap(res.([]byte))) + assert.Equal(t, ByteResponse{Val: []byte("asdf")}, res) big := strings.Repeat("a", 1024*1024) res = readLines(fmt.Sprintf("$%d\r\n", len(big)), big, "\r\n") - assert.Equal(t, []byte(big), res) - assert.Equal(t, len(res.([]byte)), cap(res.([]byte))) + assert.Equal(t, ByteResponse{Val: []byte(big)}, res) res = readLines("*0\r\n") assert.Equal(t, []interface{}{}, res) diff --git a/rediscluster/bench/go.sum b/rediscluster/bench/go.sum index 926296f..b3ba8e8 100644 --- a/rediscluster/bench/go.sum +++ b/rediscluster/bench/go.sum @@ -1,12 +1,16 @@ +github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= +github.com/cockroachdb/circuitbreaker v0.0.0-20210826084326-2045d59d3b5d/go.mod h1:mN5a3LcljXtJdPkmDnkbSCjPEmImXBXR+jmS31mxVwA= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a/go.mod h1:7Ga40egUymuWXxAe151lTNnCv97MddSOVsjpPPkityA= github.com/garyburd/redigo v1.6.2/go.mod h1:NR3MbYisc3/PwhQ00EMzDiPmrwpPxAn5GI05/YaO1SY= github.com/joomcode/errorx v1.0.3 h1:3e1mi0u7/HTPNdg6d6DYyKGBhA5l9XpsfuVE29NxnWw= github.com/joomcode/errorx v1.0.3/go.mod h1:eQzdtdlNyN7etw6YCS4W4+lu442waxZYw5yvz0ULrRo= github.com/mediocregopher/radix.v2 v0.0.0-20181115013041-b67df6e626f9/go.mod h1:fLRUbhbSd5Px2yKUaGYYPltlyxi1guJz1vCmo1RQL50= github.com/mediocregopher/radix/v3 v3.7.0 h1:SM9zJdme5pYGEVvh1HttjBjDmIaNBDKy+oDCv5w81Wo= github.com/mediocregopher/radix/v3 v3.7.0/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8= +github.com/peterbourgon/g2s v0.0.0-20170223122336-d4e7ad98afea/go.mod h1:1VcHEd3ro4QMoHfiNl/j7Jkln9+KQuorp0PItHMJYNg= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/rediscluster/cluster_test.go b/rediscluster/cluster_test.go index d8020e3..3257560 100644 --- a/rediscluster/cluster_test.go +++ b/rediscluster/cluster_test.go @@ -145,7 +145,7 @@ func (s *Suite) TestBasicOps() { s.slotnode(i).DoSure("SET", slotkey("basic", key), key+"y") } for _, key := range s.keys { - s.Equal([]byte(key+"y"), scl.Do(s.ctx, "GET", slotkey("basic", key))) + s.Equal(redis.ByteResponse{Val: []byte(key + "y")}, scl.Do(s.ctx, "GET", slotkey("basic", key))) } } @@ -256,7 +256,7 @@ func (s *Suite) TestSendMany() { } ress = scl.SendMany(s.ctx, reqs) for i, res := range ress { - s.Equal([]byte(s.keys[i]+"y"), res) + s.Equal(redis.ByteResponse{Val: []byte(s.keys[i] + "y")}, res) } } @@ -424,7 +424,7 @@ func (s *Suite) TestGetMoved() { s.cl.MoveSlot(10999, 1, 2) - s.Equal([]byte(key), sconn.Do(s.ctx, "GET", key)) + s.Equal(redis.ByteResponse{Val: []byte(key)}, sconn.Do(s.ctx, "GET", key)) s.Contains(DebugEvents(), "moved") s.cl.MoveSlot(10999, 2, 1) @@ -485,7 +485,7 @@ func (s *Suite) TestAsk() { key := slotkey("ask", s.keys[10997]) s.r().Equal("OK", sconn.Do(s.ctx, "SET", key, key)) - s.Equal([]byte(key), sconn.Do(s.ctx, "GET", key)) + s.Equal(redis.ByteResponse{Val: []byte(key)}, sconn.Do(s.ctx, "GET", key)) s.Contains(DebugEvents(), "asking") // recheck that redis responses with correct errors @@ -531,7 +531,7 @@ func (s *Suite) TestAskTransaction() { s.Equal([]interface{}{"OK", "OK"}, res) s.Contains(DebugEvents(), "transaction asking") - s.Equal([]byte("1"), sconn.Do(s.ctx, "GET", key1)) + s.Equal(redis.ByteResponse{Val: []byte("1")}, sconn.Do(s.ctx, "GET", key1)) DebugEventsReset() // if some keys are absent in new shard, then redis returns TRYAGAIN error @@ -558,8 +558,8 @@ func (s *Suite) TestAskTransaction() { }) s.Nil(err) - s.Equal([]byte("1"), sconn.Do(s.ctx, "GET", key2)) - s.Equal([]byte("2"), sconn.Do(s.ctx, "GET", key3)) + s.Equal(redis.ByteResponse{Val: []byte("1")}, sconn.Do(s.ctx, "GET", key2)) + s.Equal(redis.ByteResponse{Val: []byte("2")}, sconn.Do(s.ctx, "GET", key3)) s.Contains(DebugEvents(), "transaction asking") s.Contains(DebugEvents(), "transaction tryagain") @@ -589,8 +589,8 @@ func (s *Suite) TestMovedTransaction() { s.Nil(err) s.Equal([]interface{}{"OK", "OK"}, res) - s.Equal([]byte("2"), sconn.Do(s.ctx, "GET", key1)) - s.Equal([]byte("3"), sconn.Do(s.ctx, "GET", key2)) + s.Equal(redis.ByteResponse{Val: []byte("2")}, sconn.Do(s.ctx, "GET", key1)) + s.Equal(redis.ByteResponse{Val: []byte("3")}, sconn.Do(s.ctx, "GET", key2)) s.Equal([]string{"transaction moved"}, DebugEvents()) } @@ -626,7 +626,7 @@ func (s *Suite) TestAllReturns_Good() { skey := s.keys[(i*N+j)*127%NumSlots] key := slotkey("allgood", skey) res := sconn.Do(s.ctx, "GET", key) - if !s.Equal([]byte(skey), res) { + if !s.Equal(redis.ByteResponse{Val: []byte(skey)}, res) { return } @@ -648,7 +648,7 @@ func (s *Suite) TestAllReturns_Good() { if !s.Equal("OK", ress[0]) { return } - if ress[1] != nil && !s.Equal([]byte(keya), ress[1]) { + if ress[1] != nil && !s.Equal(redis.ByteResponse{Val: []byte(keya)}, ress[1]) { return } } @@ -699,7 +699,7 @@ func (s *Suite) TestAllReturns_GoodMoving() { skey := s.keys[(i*N+j)*127%NumSlots] key := slotkey("allgoodmove", skey) res := sconn.Do(ctx, "GET", key) - if !s.Equal([]byte(skey), res) { + if !s.Equal(redis.ByteResponse{Val: []byte(skey)}, res) { log.Println("Res ", res) atomic.AddUint32(&bad, 1) } @@ -723,7 +723,7 @@ func (s *Suite) TestAllReturns_GoodMoving() { log.Println("Ress[0] ", ress[0]) atomic.AddUint32(&bad, 1) } - if ress[1] != nil && !s.Equal([]byte(keya), ress[1]) { + if ress[1] != nil && !s.Equal(redis.ByteResponse{Val: []byte(keya)}, ress[1]) { log.Println("Ress[1] ", ress[1]) atomic.AddUint32(&bad, 1) } @@ -818,11 +818,15 @@ func (s *Suite) TestAllReturns_Bad() { ress, err = sconn.SendTransaction(s.ctx, reqs) } if check { - ok := s.Equal([]byte(skey), res) + ok := s.Equal(redis.ByteResponse{Val: []byte(skey)}, res) ok = ok && err == nil ok = ok && s.Equal("OK", ress[0]) if ress[1] != nil { - ok = ok && s.Equal([]byte(keya), ress[1]) + if !transact { + ok = ok && s.Equal(redis.ByteResponse{Val: []byte(keya)}, ress[1]) + } else { + ok = ok && s.Equal([]byte(keya), ress[1]) + } } checks <- ok } @@ -1015,11 +1019,15 @@ func (s *Suite) TestAllReturns_Bad_Latency() { ress, err = sconn.SendTransaction(s.ctx, reqs) } if check { - ok := s.Equal([]byte(skey), res) + ok := s.Equal(redis.ByteResponse{Val: []byte(skey)}, res) ok = ok && err == nil ok = ok && s.Equal("OK", ress[0]) if ress[1] != nil { - ok = ok && s.Equal([]byte(keya), ress[1]) + if !transact { + ok = ok && s.Equal(redis.ByteResponse{Val: []byte(keya)}, ress[1]) + } else { + ok = ok && s.Equal([]byte(keya), ress[1]) + } } checks <- ok } diff --git a/redisconn/bench/go.sum b/redisconn/bench/go.sum index c163bb9..7338510 100644 --- a/redisconn/bench/go.sum +++ b/redisconn/bench/go.sum @@ -1,6 +1,9 @@ +github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= +github.com/cockroachdb/circuitbreaker v0.0.0-20210826084326-2045d59d3b5d/go.mod h1:mN5a3LcljXtJdPkmDnkbSCjPEmImXBXR+jmS31mxVwA= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a/go.mod h1:7Ga40egUymuWXxAe151lTNnCv97MddSOVsjpPPkityA= github.com/garyburd/redigo v1.6.2/go.mod h1:NR3MbYisc3/PwhQ00EMzDiPmrwpPxAn5GI05/YaO1SY= github.com/gomodule/redigo v1.8.4 h1:Z5JUg94HMTR1XpwBaSH4vq3+PNSIykBLxMdglbw10gg= github.com/gomodule/redigo v1.8.4/go.mod h1:P9dn9mFrCBvWhGE1wpxx6fgq7BAeLBk+UUUzlpkBYO0= @@ -9,6 +12,7 @@ github.com/joomcode/errorx v1.0.3/go.mod h1:eQzdtdlNyN7etw6YCS4W4+lu442waxZYw5yv github.com/mediocregopher/radix.v2 v0.0.0-20181115013041-b67df6e626f9/go.mod h1:fLRUbhbSd5Px2yKUaGYYPltlyxi1guJz1vCmo1RQL50= github.com/mediocregopher/radix/v3 v3.7.0 h1:SM9zJdme5pYGEVvh1HttjBjDmIaNBDKy+oDCv5w81Wo= github.com/mediocregopher/radix/v3 v3.7.0/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8= +github.com/peterbourgon/g2s v0.0.0-20170223122336-d4e7ad98afea/go.mod h1:1VcHEd3ro4QMoHfiNl/j7Jkln9+KQuorp0PItHMJYNg= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/redisconn/conn.go b/redisconn/conn.go index c6886a8..359aa79 100644 --- a/redisconn/conn.go +++ b/redisconn/conn.go @@ -616,7 +616,7 @@ func (conn *Connection) dial() error { var res interface{} // Password response if conn.opts.Password != "" { - res = redis.ReadResponse(r) + res = redis.ReadResponse(r, false) if err := redis.AsErrorx(res); err != nil { connection.Close() if !err.IsOfType(redis.ErrIO) { @@ -626,7 +626,7 @@ func (conn *Connection) dial() error { } } // PING Response - res = redis.ReadResponse(r) + res = redis.ReadResponse(r, false) if err := redis.AsErrorx(res); err != nil { connection.Close() if !err.IsOfType(redis.ErrIO) { @@ -642,7 +642,7 @@ func (conn *Connection) dial() error { } // SELECT DB Response if conn.opts.DB != 0 { - res = redis.ReadResponse(r) + res = redis.ReadResponse(r, false) if err := redis.AsErrorx(res); err != nil { connection.Close() if !err.IsOfType(redis.ErrIO) { @@ -940,7 +940,7 @@ func (conn *Connection) reader(r *bufio.Reader, one *oneconn) { for { // try to read response from buffered socket. // Here is IOTimeout handled as well (through deadlineIO wrapper around socket). - res = redis.ReadResponse(r) + res = redis.ReadResponse(r, true) if rerr := redis.AsErrorx(res); rerr != nil { if !rerr.IsOfType(redis.ErrResult) { // it is not redis-sended error, then close connection diff --git a/redisconn/conn_test.go b/redisconn/conn_test.go index 74ec27f..440a425 100644 --- a/redisconn/conn_test.go +++ b/redisconn/conn_test.go @@ -121,7 +121,7 @@ func (s *Suite) TestConnectsDb() { res := sync1.Do("SET", "db", 0) s.r().NoError(redis.AsError(res)) res = sync1.Do("GET", "db") - s.r().Equal(res, []byte("0")) + s.r().Equal(redis.ByteResponse{Val: []byte("0")}, res) opts2 := defopts opts2.DB = 1 @@ -134,10 +134,9 @@ func (s *Suite) TestConnectsDb() { res = sync2.Do("SET", "db", 1) s.r().NoError(redis.AsError(res)) res = sync2.Do("GET", "db") - s.r().Equal(res, []byte("1")) - + s.r().Equal(redis.ByteResponse{Val: []byte("1")}, res) res = sync1.Do("GET", "db") - s.r().Equal(res, []byte("0")) + s.r().Equal(redis.ByteResponse{Val: []byte("0")}, res) } func (s *Suite) TestFailedWithWrongDB() { @@ -398,17 +397,17 @@ func (s *Suite) TestAllReturns_Good() { for j := 0; j < K; j++ { sij := strconv.Itoa(i*N + j) res := sconn.Do(s.ctx, "PING", sij) - if !s.IsType([]byte{}, res) || !s.Equal(sij, string(res.([]byte))) { + if !s.IsType(redis.ByteResponse{}, res) || !s.Equal(sij, string(res.(redis.ByteResponse).Val)) { return } ress := sconn.SendMany(s.ctx, []redis.Request{ redis.Req("PING", "a"+sij), redis.Req("PING", "b"+sij), }) - if !s.IsType([]byte{}, ress[0]) || !s.Equal("a"+sij, string(ress[0].([]byte))) { + if !s.IsType(redis.ByteResponse{}, ress[0]) || !s.Equal("a"+sij, string(ress[0].(redis.ByteResponse).Val)) { return } - if !s.IsType([]byte{}, ress[1]) || !s.Equal("b"+sij, string(ress[1].([]byte))) { + if !s.IsType(redis.ByteResponse{}, ress[1]) || !s.Equal("b"+sij, string(ress[1].(redis.ByteResponse).Val)) { return } } @@ -465,9 +464,9 @@ func (s *Suite) TestAllReturns_Bad() { redis.Req("PING", "b"+sij), }) if check && good { - ok := s.IsType([]byte{}, res) && s.Equal(sij, string(res.([]byte))) - ok = ok && s.IsType([]byte{}, ress[0]) && s.Equal("a"+sij, string(ress[0].([]byte))) - ok = ok && s.IsType([]byte{}, ress[1]) && s.Equal("b"+sij, string(ress[1].([]byte))) + ok := s.IsType(redis.ByteResponse{}, res) && s.Equal(sij, string(res.(redis.ByteResponse).Val)) + ok = ok && s.IsType(redis.ByteResponse{}, ress[0]) && s.Equal("a"+sij, string(ress[0].(redis.ByteResponse).Val)) + ok = ok && s.IsType(redis.ByteResponse{}, ress[1]) && s.Equal("b"+sij, string(ress[1].(redis.ByteResponse).Val)) checks <- ok } else if check && !good { ok := s.IsType((*errorx.Error)(nil), res) diff --git a/redisdumb/conn.go b/redisdumb/conn.go index ec841e7..68ee966 100644 --- a/redisdumb/conn.go +++ b/redisdumb/conn.go @@ -62,9 +62,13 @@ func (c *Conn) Do(cmd string, args ...interface{}) interface{} { req, err = redis.AppendRequest(nil, redis.Request{cmd, nil, args, nil}) if err == nil { if _, err = c.C.Write(req); err == nil { - res := redis.ReadResponse(c.R) + res := redis.ReadResponse(c.R, true) rerr := redis.AsErrorx(res) if rerr == nil { + if br, ok := res.(redis.ByteResponse); ok { + return br.Val + } + return res } err = rerr @@ -201,6 +205,6 @@ func Do(addr string, cmd string, args ...interface{}) interface{} { if _, err = conn.Write(req); err != nil { return redis.ErrIO.WrapWithNoMessage(err) } - res := redis.ReadResponse(bufio.NewReader(conn)) + res := redis.ReadResponse(bufio.NewReader(conn), false) return res }