Skip to content

Commit

Permalink
Update PFMERGE to be supported as multishard command (#1388)
Browse files Browse the repository at this point in the history
  • Loading branch information
lucifercr07 authored Dec 23, 2024
1 parent f6c3028 commit 1f27c40
Show file tree
Hide file tree
Showing 12 changed files with 314 additions and 103 deletions.
4 changes: 4 additions & 0 deletions integration_tests/commands/http/hyperloglog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

//go:build ignore
// +build ignore

// Ignored as multishard commands not supported by HTTP
package http

import (
Expand Down
4 changes: 4 additions & 0 deletions integration_tests/commands/websocket/hyperloglog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

//go:build ignore
// +build ignore

// Ignored as multishard commands not supported by WS
package websocket

import (
Expand Down
4 changes: 2 additions & 2 deletions internal/cmd/cmds.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ type DiceDBCmd struct {
// This slice allows flexible support for commands with variable arguments.
Args []string

// InternalObj is a pointer to an InternalObj, representing an optional data structure
// InternalObjs is a pointer to list of InternalObjs, representing an optional data structure
// associated with the command. This contains pointer to the underlying simple
// types such as int, string or even complex types
// like hashes, sets, or sorted sets, which are stored and manipulated as objects.
// WARN: This parameter should be used with caution
InternalObj *object.InternalObj
InternalObjs []*object.InternalObj
}

type RedisCmds struct {
Expand Down
20 changes: 10 additions & 10 deletions internal/eval/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,22 @@ var (
// their implementation for HTTP and WebSocket protocols is still pending.
// As a result, their Eval functions remained intact.
var (
//TODO: supports only http protocol, needs to be removed once http is migrated to multishard
objectCopyCmdMeta = DiceCmdMeta{
Name: "OBJECTCOPY",
Info: `COPY command copies the value stored at the source key to the destination key.`,
StoreObjectEval: evalCOPYObject,
IsMigrated: true,
Arity: -2,
}
pfMergeCmdMeta = DiceCmdMeta{
Name: "PFMERGE",
Info: `PFMERGE destkey [sourcekey [sourcekey ...]]
Merges one or more HyperLogLog values into a single key.`,
IsMigrated: true,
Arity: -2,
KeySpecs: KeySpecs{BeginIndex: 1},
StoreObjectEval: evalPFMERGE,
}
)

// Single Shard command
Expand Down Expand Up @@ -976,15 +984,6 @@ var (
Arity: -2,
KeySpecs: KeySpecs{BeginIndex: 1},
}
pfMergeCmdMeta = DiceCmdMeta{
Name: "PFMERGE",
Info: `PFMERGE destkey [sourcekey [sourcekey ...]]
Merges one or more HyperLogLog values into a single key.`,
NewEval: evalPFMERGE,
IsMigrated: true,
Arity: -2,
KeySpecs: KeySpecs{BeginIndex: 1},
}
jsonStrlenCmdMeta = DiceCmdMeta{
Name: "JSON.STRLEN",
Info: `JSON.STRLEN key [path]
Expand Down Expand Up @@ -1361,6 +1360,7 @@ var (
func init() {
PreProcessing["COPY"] = evalGetObject
PreProcessing["RENAME"] = evalGET
PreProcessing["GETOBJECT"] = evalGetObject

DiceCmds["ABORT"] = abortCmdMeta
DiceCmds["APPEND"] = appendCmdMeta
Expand Down
134 changes: 112 additions & 22 deletions internal/eval/eval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/base64"
"errors"
"fmt"
"github.com/dicedb/dice/internal/cmd"
"math"
"reflect"
"strconv"
Expand Down Expand Up @@ -51,6 +52,14 @@ type evalTestCase struct {
migratedOutput EvalResponse
}

type evalMultiShardTestCase struct {
name string
setup func()
input *cmd.DiceDBCmd
validator func(output interface{})
output EvalResponse
}

func setupTest(store *dstore.Store) *dstore.Store {
dstore.ResetStore(store)
return store
Expand Down Expand Up @@ -101,7 +110,6 @@ func TestEval(t *testing.T) {
testEvalHEXISTS(t, store)
testEvalHDEL(t, store)
testEvalHSCAN(t, store)
testEvalPFMERGE(t, store)
testEvalJSONSTRLEN(t, store)
testEvalJSONOBJLEN(t, store)
testEvalHLEN(t, store)
Expand Down Expand Up @@ -3012,21 +3020,26 @@ func testEvalPFCOUNT(t *testing.T, store *dstore.Store) {
}

func testEvalPFMERGE(t *testing.T, store *dstore.Store) {
tests := map[string]evalTestCase{
tests := map[string]evalMultiShardTestCase{
"PFMERGE nil value": {
name: "PFMERGE nil value",
setup: func() {},
input: nil,
migratedOutput: EvalResponse{
input: &cmd.DiceDBCmd{
Cmd: "PFMERGE",
},
output: EvalResponse{
Result: nil,
Error: diceerrors.ErrWrongArgumentCount("PFMERGE"),
},
},
"PFMERGE empty array": {
name: "PFMERGE empty array",
setup: func() {},
input: []string{},
migratedOutput: EvalResponse{
input: &cmd.DiceDBCmd{
Cmd: "PFMERGE",
Args: []string{},
},
output: EvalResponse{
Result: nil,
Error: diceerrors.ErrWrongArgumentCount("PFMERGE"),
},
Expand All @@ -3042,26 +3055,44 @@ func testEvalPFMERGE(t *testing.T, store *dstore.Store) {
}
store.Put(key, obj)
},
input: []string{"INVALID_OBJ_DEST_KEY"},
migratedOutput: EvalResponse{
input: &cmd.DiceDBCmd{
Cmd: "PFMERGE",
Args: []string{"INVALID_OBJ_DEST_KEY"},
},
output: EvalResponse{
Result: nil,
Error: diceerrors.ErrInvalidHyperLogLogKey,
},
},
"PFMERGE destKey doesn't exist": {
name: "PFMERGE destKey doesn't exist",
setup: func() {},
input: []string{"NON_EXISTING_DEST_KEY"},
migratedOutput: EvalResponse{
input: &cmd.DiceDBCmd{
Cmd: "PFMERGE",
Args: []string{"NON_EXISTING_DEST_KEY"},
},
output: EvalResponse{
Result: clientio.OK,
Error: nil,
},
},
"PFMERGE destKey exist": {
name: "PFMERGE destKey exist",
setup: func() {},
input: []string{"NON_EXISTING_DEST_KEY"},
migratedOutput: EvalResponse{
name: "PFMERGE destKey exist",
setup: func() {
key := "EXISTING_DEST_KEY"
value := hyperloglog.New()
value.Insert([]byte("VALUE"))
obj := &object.Obj{
Value: value,
LastAccessedAt: uint32(time.Now().Unix()),
}
store.Put(key, obj)
},
input: &cmd.DiceDBCmd{
Cmd: "PFMERGE",
Args: []string{"EXISTING_DEST_KEY"},
},
output: EvalResponse{
Result: clientio.OK,
Error: nil,
},
Expand All @@ -3078,8 +3109,11 @@ func testEvalPFMERGE(t *testing.T, store *dstore.Store) {
}
store.Put(key, obj)
},
input: []string{"EXISTING_DEST_KEY", "NON_EXISTING_SRC_KEY"},
migratedOutput: EvalResponse{
input: &cmd.DiceDBCmd{
Cmd: "PFMERGE",
Args: []string{"EXISTING_DEST_KEY", "NON_EXISTING_SRC_KEY"},
},
output: EvalResponse{
Result: clientio.OK,
Error: nil,
},
Expand All @@ -3096,8 +3130,19 @@ func testEvalPFMERGE(t *testing.T, store *dstore.Store) {
}
store.Put(key, obj)
},
input: []string{"EXISTING_DEST_KEY", "NON_EXISTING_SRC_KEY"},
migratedOutput: EvalResponse{
input: &cmd.DiceDBCmd{
Cmd: "PFMERGE",
Args: []string{"EXISTING_DEST_KEY", "EXISTING_SRC_KEY"},
InternalObjs: []*object.InternalObj{
{
Obj: &object.Obj{
Value: hyperloglog.New(),
Type: object.ObjTypeHLL,
},
},
},
},
output: EvalResponse{
Result: clientio.OK,
Error: nil,
},
Expand All @@ -3113,24 +3158,49 @@ func testEvalPFMERGE(t *testing.T, store *dstore.Store) {
LastAccessedAt: uint32(time.Now().Unix()),
}
store.Put(key, obj)
srcKey := "EXISTING_SRC_KEY"
srcKey := "EXISTING_SRC_KEY1"
srcValue := hyperloglog.New()
value.Insert([]byte("SRC_VALUE"))
srcKeyObj := &object.Obj{
Value: srcValue,
LastAccessedAt: uint32(time.Now().Unix()),
}
store.Put(srcKey, srcKeyObj)
srcKey2 := "EXISTING_SRC_KEY2"
srcValue2 := hyperloglog.New()
value.Insert([]byte("SRC_VALUE"))
srcKeyObj2 := &object.Obj{
Value: srcValue2,
LastAccessedAt: uint32(time.Now().Unix()),
}
store.Put(srcKey2, srcKeyObj2)
},
input: &cmd.DiceDBCmd{
Cmd: "PFMERGE",
Args: []string{"EXISTING_DEST_KEY", "EXISTING_SRC_KEY1", "EXISTING_SRC_KEY2"},
InternalObjs: []*object.InternalObj{
{
Obj: &object.Obj{
Value: hyperloglog.New(),
Type: object.ObjTypeHLL,
},
},
{
Obj: &object.Obj{
Value: hyperloglog.New(),
Type: object.ObjTypeHLL,
},
},
},
},
input: []string{"EXISTING_DEST_KEY", "EXISTING_SRC_KEY"},
migratedOutput: EvalResponse{
output: EvalResponse{
Result: clientio.OK,
Error: nil,
},
},
}

runMigratedEvalTests(t, tests, evalPFMERGE, store)
runEvalTestsMultiShard(t, tests, evalPFMERGE, store)
}

func testEvalHGET(t *testing.T, store *dstore.Store) {
Expand Down Expand Up @@ -4352,6 +4422,26 @@ func runMigratedEvalTests(t *testing.T, tests map[string]evalTestCase, evalFunc
}
}

func runEvalTestsMultiShard(t *testing.T, tests map[string]evalMultiShardTestCase, evalFunc func(*cmd.DiceDBCmd, *dstore.Store) *EvalResponse, store *dstore.Store) {
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
store = setupTest(store)
if tc.setup != nil {
tc.setup()
}

output := evalFunc(tc.input, store)
if tc.output.Error != nil {
assert.Equal(t, tc.output.Error, output.Error)
}

if tc.output.Result != nil {
assert.Equal(t, tc.output.Result, output.Result)
}
})
}
}

func BenchmarkEvalHSET(b *testing.B) {
store := dstore.NewStore(nil, nil)
for i := 0; i < b.N; i++ {
Expand Down
4 changes: 2 additions & 2 deletions internal/eval/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ func (e *Eval) ExecuteCommand() *EvalResponse {
// ===============================================================================
// dealing with store object is not recommended for all commands
// These operations are specialised for the commands which requires
// transferring data across multiple shards. e.g COPY, RENAME
// transferring data across multiple shards. e.g. COPY, RENAME, PFMERGE
// ===============================================================================
if e.cmd.InternalObj != nil {
if e.cmd.InternalObjs != nil {
// This involves handling object at store level, evaluating it, modifying it, and then storing it back.
return diceCmd.StoreObjectEval(e.cmd, e.store)
}
Expand Down
Loading

0 comments on commit 1f27c40

Please sign in to comment.