Skip to content

Commit

Permalink
Command Migration of PFADD, PFCOUNT, PFMERGE done DiceDB#1032
Browse files Browse the repository at this point in the history
  • Loading branch information
surya0180 committed Oct 15, 2024
1 parent 97e435c commit 4d5423d
Show file tree
Hide file tree
Showing 6 changed files with 455 additions and 215 deletions.
21 changes: 12 additions & 9 deletions internal/eval/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -892,25 +892,28 @@ var (
Name: "PFADD",
Info: `PFADD key [element [element ...]]
Adds elements to a HyperLogLog key. Creates the key if it doesn't exist.`,
Eval: evalPFADD,
Arity: -2,
KeySpecs: KeySpecs{BeginIndex: 1},
NewEval: evalPFADD,
IsMigrated: true,
Arity: -2,
KeySpecs: KeySpecs{BeginIndex: 1},
}
pfCountCmdMeta = DiceCmdMeta{
Name: "PFCOUNT",
Info: `PFCOUNT key [key ...]
Returns the approximated cardinality of the set(s) observed by the HyperLogLog key(s).`,
Eval: evalPFCOUNT,
Arity: -2,
KeySpecs: KeySpecs{BeginIndex: 1},
NewEval: evalPFCOUNT,
IsMigrated: true,
Arity: -2,
KeySpecs: KeySpecs{BeginIndex: 1},
}
pfMergeCmdMeta = DiceCmdMeta{
Name: "PFMERGE",
Info: `PFMERGE destkey [sourcekey [sourcekey ...]]
Merges one or more HyperLogLog values into a single key.`,
Eval: evalPFMERGE,
Arity: -2,
KeySpecs: KeySpecs{BeginIndex: 1},
NewEval: evalPFMERGE,
IsMigrated: true,
Arity: -2,
KeySpecs: KeySpecs{BeginIndex: 1},
}
jsonStrlenCmdMeta = DiceCmdMeta{
Name: "JSON.STRLEN",
Expand Down
110 changes: 0 additions & 110 deletions internal/eval/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (

"github.com/dicedb/dice/internal/sql"

"github.com/axiomhq/hyperloglog"
"github.com/bytedance/sonic"
"github.com/dicedb/dice/config"
"github.com/dicedb/dice/internal/clientio"
Expand Down Expand Up @@ -4074,115 +4073,6 @@ func evalSINTER(args []string, store *dstore.Store) []byte {
return clientio.Encode(members, false)
}

// PFADD Adds all the element arguments to the HyperLogLog data structure stored at the variable
// name specified as first argument.
//
// Returns:
// If the approximated cardinality estimated by the HyperLogLog changed after executing the command,
// returns 1, otherwise 0 is returned.
func evalPFADD(args []string, store *dstore.Store) []byte {
if len(args) < 1 {
return diceerrors.NewErrArity("PFADD")
}

key := args[0]
obj := store.Get(key)

// If key doesn't exist prior initial cardinality changes hence return 1
if obj == nil {
hll := hyperloglog.New()
for _, arg := range args[1:] {
hll.Insert([]byte(arg))
}

obj = store.NewObj(hll, -1, object.ObjTypeString, object.ObjEncodingRaw)

store.Put(key, obj)
return clientio.Encode(1, false)
}

existingHll, ok := obj.Value.(*hyperloglog.Sketch)
if !ok {
return diceerrors.NewErrWithMessage(diceerrors.WrongTypeHllErr)
}
initialCardinality := existingHll.Estimate()
for _, arg := range args[1:] {
existingHll.Insert([]byte(arg))
}

if newCardinality := existingHll.Estimate(); initialCardinality != newCardinality {
return clientio.Encode(1, false)
}

return clientio.Encode(0, false)
}

func evalPFCOUNT(args []string, store *dstore.Store) []byte {
if len(args) < 1 {
return diceerrors.NewErrArity("PFCOUNT")
}

unionHll := hyperloglog.New()

for _, arg := range args {
obj := store.Get(arg)
if obj != nil {
currKeyHll, ok := obj.Value.(*hyperloglog.Sketch)
if !ok {
return diceerrors.NewErrWithMessage(diceerrors.WrongTypeHllErr)
}
err := unionHll.Merge(currKeyHll)
if err != nil {
return diceerrors.NewErrWithMessage(diceerrors.InvalidHllErr)
}
}
}

return clientio.Encode(unionHll.Estimate(), false)
}

func evalPFMERGE(args []string, store *dstore.Store) []byte {
if len(args) < 1 {
return diceerrors.NewErrArity("PFMERGE")
}

var mergedHll *hyperloglog.Sketch
destKey := args[0]
obj := store.Get(destKey)

// If destKey doesn't exist, create a new HLL, else fetch the existing
if obj == nil {
mergedHll = hyperloglog.New()
} else {
var ok bool
mergedHll, ok = obj.Value.(*hyperloglog.Sketch)
if !ok {
return diceerrors.NewErrWithMessage(diceerrors.WrongTypeHllErr)
}
}

for _, arg := range args {
obj := store.Get(arg)
if obj != nil {
currKeyHll, ok := obj.Value.(*hyperloglog.Sketch)
if !ok {
return diceerrors.NewErrWithMessage(diceerrors.WrongTypeHllErr)
}

err := mergedHll.Merge(currKeyHll)
if err != nil {
return diceerrors.NewErrWithMessage(diceerrors.InvalidHllErr)
}
}
}

// Save the mergedHll
obj = store.NewObj(mergedHll, -1, object.ObjTypeString, object.ObjEncodingRaw)
store.Put(destKey, obj)

return clientio.RespOK
}

func evalJSONSTRLEN(args []string, store *dstore.Store) []byte {
if len(args) < 1 {
return diceerrors.NewErrArity("JSON.STRLEN")
Expand Down
Loading

0 comments on commit 4d5423d

Please sign in to comment.