From 3ca4c51ba1d2670c9a8ea5135c2f95b508365ef8 Mon Sep 17 00:00:00 2001 From: psr Date: Mon, 14 Oct 2024 17:07:26 +0530 Subject: [PATCH 01/25] updated parsing logic to support qwatch and qunwatch commands --- internal/server/utils/redisCmdAdapter.go | 38 +++++- internal/server/utils/redisCmdAdapter_test.go | 114 ++++++++++++++++++ 2 files changed, 146 insertions(+), 6 deletions(-) diff --git a/internal/server/utils/redisCmdAdapter.go b/internal/server/utils/redisCmdAdapter.go index 6edb7d945..4bc3d63bf 100644 --- a/internal/server/utils/redisCmdAdapter.go +++ b/internal/server/utils/redisCmdAdapter.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "net/http" + "strconv" "strings" "github.com/dicedb/dice/internal/cmd" @@ -33,6 +34,9 @@ const ( JSON = "json" ) +const QWatch string = "QWATCH" +const QUnwatch string = "QUNWATCH" + func ParseHTTPRequest(r *http.Request) (*cmd.DiceDBCmd, error) { commandParts := strings.Split(strings.TrimPrefix(r.URL.Path, "/"), "/") if len(commandParts) == 0 { @@ -117,16 +121,38 @@ func ParseHTTPRequest(r *http.Request) (*cmd.DiceDBCmd, error) { } func ParseWebsocketMessage(msg []byte) (*cmd.DiceDBCmd, error) { - cmdStr := string(msg) - cmdStr = strings.TrimSpace(cmdStr) - + cmdStr := strings.TrimSpace(string(msg)) if cmdStr == "" { return nil, diceerrors.ErrEmptyCommand } - cmdArr := strings.Split(cmdStr, " ") - command := strings.ToUpper(cmdArr[0]) - cmdArr = cmdArr[1:] // args + var command string + idx := strings.Index(cmdStr, " ") + // handle commands with no args + if idx == -1 { + command = strings.ToUpper(cmdStr) + return &cmd.DiceDBCmd{ + Cmd: command, + Args: nil, + }, nil + } else { + command = strings.ToUpper(cmdStr[:idx]) + cmdStr = cmdStr[idx+1:] + } + + cmdArr := []string{} // args + // handle qwatch and qunwatch commands + if command == QWatch || command == QUnwatch { + // remove quotes from query string + if cmdStr, err := strconv.Unquote(cmdStr); err != nil { + return nil, fmt.Errorf("error parsing qwatch query: %v", err) + } else { + cmdArr = []string{cmdStr} + } + } else { + // handle other commands + cmdArr = strings.Split(cmdStr, " ") + } // if key prefix is empty for JSON.INGEST command // add "" to cmdArr diff --git a/internal/server/utils/redisCmdAdapter_test.go b/internal/server/utils/redisCmdAdapter_test.go index d85bcacfe..abba19564 100644 --- a/internal/server/utils/redisCmdAdapter_test.go +++ b/internal/server/utils/redisCmdAdapter_test.go @@ -248,12 +248,114 @@ func TestParseWebsocketMessage(t *testing.T) { expectedCmd: "SET", expectedArgs: []string{"k1", "v1", "nx"}, }, + { + name: "Test SET command with value as a map", + message: `set k0 {"k1":"v1"} nx`, + expectedCmd: "SET", + expectedArgs: []string{"k0", `{"k1":"v1"}`, "nx"}, + }, + { + name: "Test SET command with value as an array", + message: `set k1 ["v1","v2","v3"] nx`, + expectedCmd: "SET", + expectedArgs: []string{"k1", `["v1","v2","v3"]`, "nx"}, + }, + { + name: "Test SET command with value as a map containing an array", + message: `set k1 {"k2":["v1","v2"]} nx`, + expectedCmd: "SET", + expectedArgs: []string{"k1", `{"k2":["v1","v2"]}`, "nx"}, + }, + { + name: "Test SET command with value as a deeply nested map", + message: `set k1 {"k2":{"k3":{"k4":"value"}}} nx`, + expectedCmd: "SET", + expectedArgs: []string{"k1", `{"k2":{"k3":{"k4":"value"}}}`, "nx"}, + }, + { + name: "Test SET command with value as an array of maps", + message: `set k0 [{"k1":"v1"},{"k2":"v2"}] nx`, + expectedCmd: "SET", + expectedArgs: []string{"k0", `[{"k1":"v1"},{"k2":"v2"}]`, "nx"}, + }, { name: "Test GET command", message: "get k1", expectedCmd: "GET", expectedArgs: []string{"k1"}, }, + { + name: "Test DEL command", + message: "del k1", + expectedCmd: "DEL", + expectedArgs: []string{"k1"}, + }, + { + name: "Test DEL command with multiple keys", + message: `del k1 k2 k3`, + expectedCmd: "DEL", + expectedArgs: []string{"k1", "k2", "k3"}, + }, + { + name: "Test KEYS command", + message: "keys *", + expectedCmd: "KEYS", + expectedArgs: []string{"*"}, + }, + { + name: "Test MSET command", + message: "mset k1 v1 k2 v2", + expectedCmd: "MSET", + expectedArgs: []string{"k1", "v1", "k2", "v2"}, + }, + { + name: "Test MSET command with options", + message: "mset k1 v1 k2 v2 nx", + expectedCmd: "MSET", + expectedArgs: []string{"k1", "v1", "k2", "v2", "nx"}, + }, + { + name: "Test SLEEP command", + message: "sleep 1", + expectedCmd: "SLEEP", + expectedArgs: []string{"1"}, + }, + { + name: "Test PING command", + message: "ping", + expectedCmd: "PING", + expectedArgs: nil, + }, + { + name: "Test EXPIRE command", + message: "expire k1 1", + expectedCmd: "EXPIRE", + expectedArgs: []string{"k1", "1"}, + }, + { + name: "Test AUTH command", + message: "auth user password", + expectedCmd: "AUTH", + expectedArgs: []string{"user", "password"}, + }, + { + name: "Test LPUSH command", + message: "lpush k1 v1", + expectedCmd: "LPUSH", + expectedArgs: []string{"k1", "v1"}, + }, + { + name: "Test LPUSH command with multiple items", + message: `lpush k1 v1 v2 v3`, + expectedCmd: "LPUSH", + expectedArgs: []string{"k1", "v1", "v2", "v3"}, + }, + { + name: "Test JSON.ARRPOP command", + message: "json.arrpop k1 $ 1", + expectedCmd: "JSON.ARRPOP", + expectedArgs: []string{"k1", "$", "1"}, + }, { name: "Test JSON.SET command", message: `json.set k1 . {"field":"value"}`, @@ -284,6 +386,18 @@ func TestParseWebsocketMessage(t *testing.T) { expectedCmd: "JSON.INGEST", expectedArgs: []string{"", "$..field", `{"field":"value"}`}, }, + { + name: "Test simple QWATCH command", + message: "qwatch \"select $key, $value where $key like 'k?'\"", + expectedCmd: "QWATCH", + expectedArgs: []string{"select $key, $value where $key like 'k?'"}, + }, + { + name: "Test complex QWATCH command", + message: "qwatch \"SELECT $key, $value WHERE $key LIKE 'player:*' AND '$value.score' > 10 ORDER BY $value.score DESC LIMIT 5\"", + expectedCmd: "QWATCH", + expectedArgs: []string{"SELECT $key, $value WHERE $key LIKE 'player:*' AND '$value.score' > 10 ORDER BY $value.score DESC LIMIT 5"}, + }, } for _, tc := range commands { From 0feea23b795f277fa4972ee6e4c16c7d15b6cd05 Mon Sep 17 00:00:00 2001 From: psr Date: Mon, 14 Oct 2024 17:12:09 +0530 Subject: [PATCH 02/25] update websocket server logic to support qwatch and qunwatch --- internal/eval/eval.go | 8 +- internal/eval/execute.go | 13 +- internal/server/websocketServer.go | 191 ++++++++++++++++------------- main.go | 2 +- 4 files changed, 114 insertions(+), 100 deletions(-) diff --git a/internal/eval/eval.go b/internal/eval/eval.go index ebcf891a5..28cceee2c 100644 --- a/internal/eval/eval.go +++ b/internal/eval/eval.go @@ -2109,7 +2109,7 @@ func evalMULTI(args []string, store *dstore.Store) []byte { // Every time a key in the watch list is modified, the client will be sent a response // containing the new value of the key along with the operation that was performed on it. // Contains only one argument, the query to be watched. -func EvalQWATCH(args []string, httpOp bool, client *comm.Client, store *dstore.Store) []byte { +func EvalQWATCH(args []string, httpOp, websocketOp bool, client *comm.Client, store *dstore.Store) []byte { if len(args) != 1 { return diceerrors.NewErrArity("QWATCH") } @@ -2128,7 +2128,7 @@ func EvalQWATCH(args []string, httpOp bool, client *comm.Client, store *dstore.S }) var watchSubscription querymanager.QuerySubscription - if httpOp { + if httpOp || websocketOp { watchSubscription = querymanager.QuerySubscription{ Subscribe: true, Query: query, @@ -2165,7 +2165,7 @@ func EvalQWATCH(args []string, httpOp bool, client *comm.Client, store *dstore.S } // EvalQUNWATCH removes the specified key from the watch list for the caller client. -func EvalQUNWATCH(args []string, httpOp bool, client *comm.Client) []byte { +func EvalQUNWATCH(args []string, httpOp, websocketOp bool, client *comm.Client) []byte { if len(args) != 1 { return diceerrors.NewErrArity("QUNWATCH") } @@ -2174,7 +2174,7 @@ func EvalQUNWATCH(args []string, httpOp bool, client *comm.Client) []byte { return clientio.Encode(e, false) } - if httpOp { + if httpOp || websocketOp { querymanager.QuerySubscriptionChan <- querymanager.QuerySubscription{ Subscribe: false, Query: query, diff --git a/internal/eval/execute.go b/internal/eval/execute.go index 2c00fa570..458e76c2a 100644 --- a/internal/eval/execute.go +++ b/internal/eval/execute.go @@ -17,15 +17,6 @@ func ExecuteCommand(c *cmd.DiceDBCmd, client *comm.Client, store *dstore.Store, return &EvalResponse{Result: diceerrors.NewErrWithFormattedMessage("unknown command '%s', with args beginning with: %s", c.Cmd, strings.Join(c.Args, " ")), Error: nil} } - // Till the time we refactor to handle QWATCH differently for websocket - if websocketOp { - if diceCmd.IsMigrated { - return diceCmd.NewEval(c.Args, store) - } - - return &EvalResponse{Result: diceCmd.Eval(c.Args, store), Error: nil} - } - // Temporary logic till we move all commands to new eval logic. // MigratedDiceCmds map contains refactored eval commands // For any command we will first check in the existing map @@ -40,9 +31,9 @@ func ExecuteCommand(c *cmd.DiceDBCmd, client *comm.Client, store *dstore.Store, // Old implementation kept as it is, but we will be moving // to the new implementation soon for all commands case "SUBSCRIBE", "QWATCH": - return &EvalResponse{Result: EvalQWATCH(c.Args, httpOp, client, store), Error: nil} + return &EvalResponse{Result: EvalQWATCH(c.Args, httpOp, websocketOp, client, store), Error: nil} case "UNSUBSCRIBE", "QUNWATCH": - return &EvalResponse{Result: EvalQUNWATCH(c.Args, httpOp, client), Error: nil} + return &EvalResponse{Result: EvalQUNWATCH(c.Args, httpOp, websocketOp, client), Error: nil} case auth.Cmd: return &EvalResponse{Result: EvalAUTH(c.Args, client), Error: nil} case "ABORT": diff --git a/internal/server/websocketServer.go b/internal/server/websocketServer.go index dd8ef8805..43872765a 100644 --- a/internal/server/websocketServer.go +++ b/internal/server/websocketServer.go @@ -13,37 +13,29 @@ import ( "github.com/dicedb/dice/config" "github.com/dicedb/dice/internal/clientio" + "github.com/dicedb/dice/internal/cmd" + "github.com/dicedb/dice/internal/comm" diceerrors "github.com/dicedb/dice/internal/errors" "github.com/dicedb/dice/internal/ops" - "github.com/dicedb/dice/internal/querymanager" "github.com/dicedb/dice/internal/server/utils" "github.com/dicedb/dice/internal/shard" - dstore "github.com/dicedb/dice/internal/store" "github.com/gorilla/websocket" ) -const Qwatch = "QWATCH" -const Qunwatch = "QUNWATCH" -const Subscribe = "SUBSCRIBE" - -var unimplementedCommandsWebsocket = map[string]bool{ - Qwatch: true, - Qunwatch: true, - Subscribe: true, -} +const QWatch = "QWATCH" +const QUnwatch = "QUNWATCH" type WebsocketServer struct { - querymanager *querymanager.Manager - shardManager *shard.ShardManager - ioChan chan *ops.StoreResponse - watchChan chan dstore.QueryWatchEvent - websocketServer *http.Server - upgrader websocket.Upgrader - logger *slog.Logger - shutdownChan chan struct{} + shardManager *shard.ShardManager + ioChan chan *ops.StoreResponse + websocketServer *http.Server + upgrader websocket.Upgrader + qwatchResponseChan chan comm.QwatchResponse + shutdownChan chan struct{} + logger *slog.Logger } -func NewWebSocketServer(shardManager *shard.ShardManager, watchChan chan dstore.QueryWatchEvent, logger *slog.Logger) *WebsocketServer { +func NewWebSocketServer(shardManager *shard.ShardManager, logger *slog.Logger) *WebsocketServer { mux := http.NewServeMux() srv := &http.Server{ Addr: fmt.Sprintf(":%d", config.WebsocketPort), @@ -56,23 +48,16 @@ func NewWebSocketServer(shardManager *shard.ShardManager, watchChan chan dstore. } websocketServer := &WebsocketServer{ - shardManager: shardManager, - querymanager: querymanager.NewQueryManager(logger), - ioChan: make(chan *ops.StoreResponse, 1000), - watchChan: watchChan, - websocketServer: srv, - upgrader: upgrader, - logger: logger, - shutdownChan: make(chan struct{}), + shardManager: shardManager, + ioChan: make(chan *ops.StoreResponse, 1000), + websocketServer: srv, + upgrader: upgrader, + qwatchResponseChan: make(chan comm.QwatchResponse), + shutdownChan: make(chan struct{}), + logger: logger, } mux.HandleFunc("/", websocketServer.WebsocketHandler) - mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { - _, err := w.Write([]byte("OK")) - if err != nil { - return - } - }) return websocketServer } @@ -148,76 +133,114 @@ func (s *WebsocketServer) WebsocketHandler(w http.ResponseWriter, r *http.Reques break } - if unimplementedCommandsWebsocket[diceDBCmd.Cmd] { - writeResponse(conn, []byte("Command is not implemented with Websocket")) - continue - } - - // send request to Shard Manager - s.shardManager.GetShard(0).ReqChan <- &ops.StoreOp{ + // create request + sp := &ops.StoreOp{ Cmd: diceDBCmd, WorkerID: "wsServer", ShardID: 0, WebsocketOp: true, } - // Wait for response - resp := <-s.ioChan + // handle qwatch and qunwatch commands + if diceDBCmd.Cmd == QWatch || diceDBCmd.Cmd == QUnwatch { + clientIdentifierID := generateUniqueInt32(r) + sp.Client = comm.NewHTTPQwatchClient(s.qwatchResponseChan, clientIdentifierID) - _, ok := WorkerCmdsMeta[diceDBCmd.Cmd] - respArr := []string{ - "(nil)", // Represents a RESP Nil Bulk String, which indicates a null value. - "OK", // Represents a RESP Simple String with value "OK". - "QUEUED", // Represents a Simple String indicating that a command has been queued. - "0", // Represents a RESP Integer with value 0. - "1", // Represents a RESP Integer with value 1. - "-1", // Represents a RESP Integer with value -1. - "-2", // Represents a RESP Integer with value -2. - "*0", // Represents an empty RESP Array. + // start a goroutine for subsequent updates + go s.processQwatchUpdates(clientIdentifierID, conn, diceDBCmd) } - var rp *clientio.RESPParser - var responseValue interface{} - // TODO: Remove this conditional check and if (true) condition when all commands are migrated - if !ok { - var err error - if resp.EvalResponse.Error != nil { - rp = clientio.NewRESPParser(bytes.NewBuffer([]byte(resp.EvalResponse.Error.Error()))) - } else { - rp = clientio.NewRESPParser(bytes.NewBuffer(resp.EvalResponse.Result.([]byte))) - } + s.shardManager.GetShard(0).ReqChan <- sp + resp := <-s.ioChan + s.processResponse(conn, diceDBCmd, resp) + } +} - responseValue, err = rp.DecodeOne() - if err != nil { - s.logger.Error("Error decoding response", "error", err) - writeResponse(conn, []byte("error: Internal Server Error")) - return - } - } else { - if resp.EvalResponse.Error != nil { - responseValue = resp.EvalResponse.Error.Error() - } else { - responseValue = resp.EvalResponse.Result +func (s *WebsocketServer) processQwatchUpdates(clientIdentifierID uint32, conn *websocket.Conn, dicDBCmd *cmd.DiceDBCmd) { + for { + select { + case resp := <-s.qwatchResponseChan: + if resp.ClientIdentifierID == clientIdentifierID { + s.processResponse(conn, dicDBCmd, resp) } + case <-s.shutdownChan: + return } - if val, ok := responseValue.(clientio.RespType); ok { - responseValue = respArr[val] - } + } +} - if bt, ok := responseValue.([]byte); ok { - responseValue = string(bt) +func (s *WebsocketServer) processResponse(conn *websocket.Conn, diceDBCmd *cmd.DiceDBCmd, response interface{}) { + var result interface{} + var err error + + // check response type + switch resp := response.(type) { + case comm.QwatchResponse: + result = resp.Result + err = resp.Error + case *ops.StoreResponse: + result = resp.EvalResponse.Result + err = resp.EvalResponse.Error + default: + s.logger.Error("Unsupported response type") + writeResponse(conn, []byte("error: 500 Internal Server Error")) + return + } + + _, ok := WorkerCmdsMeta[diceDBCmd.Cmd] + respArr := []string{ + "(nil)", // Represents a RESP Nil Bulk String, which indicates a null value. + "OK", // Represents a RESP Simple String with value "OK". + "QUEUED", // Represents a Simple String indicating that a command has been queued. + "0", // Represents a RESP Integer with value 0. + "1", // Represents a RESP Integer with value 1. + "-1", // Represents a RESP Integer with value -1. + "-2", // Represents a RESP Integer with value -2. + "*0", // Represents an empty RESP Array. + } + + var responseValue interface{} + // TODO: Remove this conditional check and if (true) condition when all commands are migrated + if !ok { + var rp *clientio.RESPParser + if err != nil { + rp = clientio.NewRESPParser(bytes.NewBuffer([]byte(err.Error()))) + } else { + rp = clientio.NewRESPParser(bytes.NewBuffer(result.([]byte))) } - respBytes, err := json.Marshal(responseValue) + responseValue, err = rp.DecodeOne() if err != nil { - writeResponse(conn, []byte("error: marshaling json response")) - continue + s.logger.Error("Error decoding response", "error", err) + writeResponse(conn, []byte("error: 500 Internal Server Error")) + return } + } else { + if err != nil { + responseValue = err.Error() + } else { + responseValue = result + } + } + + if val, ok := responseValue.(clientio.RespType); ok { + responseValue = respArr[val] + } - // Write response - writeResponse(conn, respBytes) + if bt, ok := responseValue.([]byte); ok { + responseValue = string(bt) } + + respBytes, err := json.Marshal(responseValue) + if err != nil { + s.logger.Error("Error marshaling json", "error", err) + writeResponse(conn, []byte("error: marshaling json")) + return + } + + // success + writeResponse(conn, respBytes) } func writeResponse(conn *websocket.Conn, text []byte) { diff --git a/main.go b/main.go index fc19d8e78..679b9f84d 100644 --- a/main.go +++ b/main.go @@ -193,7 +193,7 @@ func main() { }() } - websocketServer := server.NewWebSocketServer(shardManager, queryWatchChan, logr) + websocketServer := server.NewWebSocketServer(shardManager, logr) serverWg.Add(1) go func() { defer serverWg.Done() From 53bf950f0b558c39e0af78c7cf67568852557c9e Mon Sep 17 00:00:00 2001 From: psr Date: Mon, 14 Oct 2024 17:12:54 +0530 Subject: [PATCH 03/25] added integration tests --- .../commands/websocket/qunwatch_test.go | 46 +++++++++++++++++ .../commands/websocket/qwatch_test.go | 49 +++++++++++++++++++ integration_tests/commands/websocket/setup.go | 18 +++++-- 3 files changed, 109 insertions(+), 4 deletions(-) create mode 100644 integration_tests/commands/websocket/qunwatch_test.go create mode 100644 integration_tests/commands/websocket/qwatch_test.go diff --git a/integration_tests/commands/websocket/qunwatch_test.go b/integration_tests/commands/websocket/qunwatch_test.go new file mode 100644 index 000000000..f3a4502f9 --- /dev/null +++ b/integration_tests/commands/websocket/qunwatch_test.go @@ -0,0 +1,46 @@ +package websocket + +import ( + "testing" + + testifyAssert "github.com/stretchr/testify/assert" +) + +func TestQUnwatch(t *testing.T) { + exec := NewWebsocketCommandExecutor() + conn := exec.ConnectToServer() + + testCases := []struct { + name string + cmds []string + expect interface{} + }{ + { + name: "Wrong number of arguments", + cmds: []string{"QUNWATCH "}, + expect: "ERR wrong number of arguments for 'qunwatch' command", + }, + { + name: "Invalid query", + cmds: []string{"QUNWATCH \"SELECT \""}, + expect: "error parsing SQL statement: syntax error at position 8", + }, + { + name: "Successful unregister", + cmds: []string{`QUNWATCH "SELECT $key, $value WHERE $key like 'k?'"`}, + expect: "OK", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + for _, cmd := range tc.cmds { + result := exec.FireCommand(conn, cmd) + if _, ok := tc.expect.(string); ok { + // compare strings + testifyAssert.Equal(t, tc.expect, result, "Value mismatch for cmd %s", cmd) + } + } + }) + } +} diff --git a/integration_tests/commands/websocket/qwatch_test.go b/integration_tests/commands/websocket/qwatch_test.go new file mode 100644 index 000000000..c8e5c3b62 --- /dev/null +++ b/integration_tests/commands/websocket/qwatch_test.go @@ -0,0 +1,49 @@ +package websocket + +import ( + "testing" + + testifyAssert "github.com/stretchr/testify/assert" +) + +func TestQWatch(t *testing.T) { + exec := NewWebsocketCommandExecutor() + conn := exec.ConnectToServer() + + testCases := []struct { + name string + cmds []string + expect interface{} + }{ + { + name: "Wrong number of arguments", + cmds: []string{"QWATCH "}, + expect: "ERR wrong number of arguments for 'qwatch' command", + }, + { + name: "Invalid query", + cmds: []string{"QWATCH \"SELECT \""}, + expect: "error parsing SQL statement: syntax error at position 8", + }, + { + name: "Successful register", + cmds: []string{`QWATCH "SELECT $key, $value WHERE $key like 'k?'"`}, + expect: []interface{}{"qwatch", "SELECT $key, $value WHERE $key like 'k?'", []interface{}{}}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + for _, cmd := range tc.cmds { + result := exec.FireCommand(conn, cmd) + if _, ok := tc.expect.(string); ok { + // compare strings + testifyAssert.Equal(t, tc.expect, result, "Value mismatch for cmd %s", cmd) + } else { + // compare lists + testifyAssert.ElementsMatch(t, tc.expect, result, "Value mismatch for cmd %s", cmd) + } + } + }) + } +} diff --git a/integration_tests/commands/websocket/setup.go b/integration_tests/commands/websocket/setup.go index 4d7a720c2..0aad5fd4e 100644 --- a/integration_tests/commands/websocket/setup.go +++ b/integration_tests/commands/websocket/setup.go @@ -12,6 +12,7 @@ import ( "github.com/dicedb/dice/config" derrors "github.com/dicedb/dice/internal/errors" + "github.com/dicedb/dice/internal/querymanager" "github.com/dicedb/dice/internal/server" "github.com/dicedb/dice/internal/shard" dstore "github.com/dicedb/dice/internal/store" @@ -96,21 +97,30 @@ func RunWebsocketServer(ctx context.Context, wg *sync.WaitGroup, opt TestServerO config.DiceConfig.Network.IOBufferLength = 16 config.DiceConfig.Persistence.WriteAOFOnCleanup = false - // Initialize the WebsocketServer + // Initialize WebsocketServer globalErrChannel := make(chan error) watchChan := make(chan dstore.QueryWatchEvent, config.DiceConfig.Performance.WatchChanBufSize) shardManager := shard.NewShardManager(1, watchChan, nil, globalErrChannel, opt.Logger) + queryWatcherLocal := querymanager.NewQueryManager(opt.Logger) config.WebsocketPort = opt.Port - testServer := server.NewWebSocketServer(shardManager, watchChan, opt.Logger) - + testServer := server.NewWebSocketServer(shardManager, opt.Logger) shardManagerCtx, cancelShardManager := context.WithCancel(ctx) + + // run shard manager wg.Add(1) go func() { defer wg.Done() shardManager.Run(shardManagerCtx) }() - // Start the server in a goroutine + // run query manager + wg.Add(1) + go func() { + defer wg.Done() + queryWatcherLocal.Run(ctx, watchChan) + }() + + // start websocket server wg.Add(1) go func() { defer wg.Done() From a8c99fe99df7e89d7f12b82a371bd262468c72ee Mon Sep 17 00:00:00 2001 From: psr Date: Mon, 14 Oct 2024 17:26:51 +0530 Subject: [PATCH 04/25] add qunwatch unit test --- internal/server/utils/redisCmdAdapter_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/internal/server/utils/redisCmdAdapter_test.go b/internal/server/utils/redisCmdAdapter_test.go index abba19564..12ad28486 100644 --- a/internal/server/utils/redisCmdAdapter_test.go +++ b/internal/server/utils/redisCmdAdapter_test.go @@ -398,6 +398,12 @@ func TestParseWebsocketMessage(t *testing.T) { expectedCmd: "QWATCH", expectedArgs: []string{"SELECT $key, $value WHERE $key LIKE 'player:*' AND '$value.score' > 10 ORDER BY $value.score DESC LIMIT 5"}, }, + { + name: "Test simple QUNWATCH command", + message: "qunwatch \"select $key, $value where $key like 'k?'\"", + expectedCmd: "QUNWATCH", + expectedArgs: []string{"select $key, $value where $key like 'k?'"}, + }, } for _, tc := range commands { From b833052935aa88c2e3d9a52ba6800ddc0bd224fa Mon Sep 17 00:00:00 2001 From: psr Date: Mon, 14 Oct 2024 17:33:16 +0530 Subject: [PATCH 05/25] fixed linter warnings --- internal/server/utils/redisCmdAdapter.go | 16 +++++++++------- internal/server/websocketServer.go | 1 - 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/internal/server/utils/redisCmdAdapter.go b/internal/server/utils/redisCmdAdapter.go index 4bc3d63bf..d734f2a23 100644 --- a/internal/server/utils/redisCmdAdapter.go +++ b/internal/server/utils/redisCmdAdapter.go @@ -135,20 +135,22 @@ func ParseWebsocketMessage(msg []byte) (*cmd.DiceDBCmd, error) { Cmd: command, Args: nil, }, nil - } else { - command = strings.ToUpper(cmdStr[:idx]) - cmdStr = cmdStr[idx+1:] } - cmdArr := []string{} // args + // handle commands with args + command = strings.ToUpper(cmdStr[:idx]) + cmdStr = cmdStr[idx+1:] + + var cmdArr []string // args + // cmdArr := []string{} // args // handle qwatch and qunwatch commands if command == QWatch || command == QUnwatch { // remove quotes from query string - if cmdStr, err := strconv.Unquote(cmdStr); err != nil { + cmdStr, err := strconv.Unquote(cmdStr) + if err != nil { return nil, fmt.Errorf("error parsing qwatch query: %v", err) - } else { - cmdArr = []string{cmdStr} } + cmdArr = []string{cmdStr} } else { // handle other commands cmdArr = strings.Split(cmdStr, " ") diff --git a/internal/server/websocketServer.go b/internal/server/websocketServer.go index 43872765a..8e97097cb 100644 --- a/internal/server/websocketServer.go +++ b/internal/server/websocketServer.go @@ -166,7 +166,6 @@ func (s *WebsocketServer) processQwatchUpdates(clientIdentifierID uint32, conn * case <-s.shutdownChan: return } - } } From be7132a83e7017dbf4346d4439c7345d18e98171 Mon Sep 17 00:00:00 2001 From: psr Date: Mon, 14 Oct 2024 19:59:39 +0530 Subject: [PATCH 06/25] remove qunwatch code --- .../commands/websocket/qunwatch_test.go | 46 ------------------- internal/eval/eval.go | 4 +- internal/eval/execute.go | 2 +- internal/server/utils/redisCmdAdapter.go | 6 +-- internal/server/utils/redisCmdAdapter_test.go | 6 --- internal/server/websocketServer.go | 4 +- 6 files changed, 7 insertions(+), 61 deletions(-) delete mode 100644 integration_tests/commands/websocket/qunwatch_test.go diff --git a/integration_tests/commands/websocket/qunwatch_test.go b/integration_tests/commands/websocket/qunwatch_test.go deleted file mode 100644 index f3a4502f9..000000000 --- a/integration_tests/commands/websocket/qunwatch_test.go +++ /dev/null @@ -1,46 +0,0 @@ -package websocket - -import ( - "testing" - - testifyAssert "github.com/stretchr/testify/assert" -) - -func TestQUnwatch(t *testing.T) { - exec := NewWebsocketCommandExecutor() - conn := exec.ConnectToServer() - - testCases := []struct { - name string - cmds []string - expect interface{} - }{ - { - name: "Wrong number of arguments", - cmds: []string{"QUNWATCH "}, - expect: "ERR wrong number of arguments for 'qunwatch' command", - }, - { - name: "Invalid query", - cmds: []string{"QUNWATCH \"SELECT \""}, - expect: "error parsing SQL statement: syntax error at position 8", - }, - { - name: "Successful unregister", - cmds: []string{`QUNWATCH "SELECT $key, $value WHERE $key like 'k?'"`}, - expect: "OK", - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - for _, cmd := range tc.cmds { - result := exec.FireCommand(conn, cmd) - if _, ok := tc.expect.(string); ok { - // compare strings - testifyAssert.Equal(t, tc.expect, result, "Value mismatch for cmd %s", cmd) - } - } - }) - } -} diff --git a/internal/eval/eval.go b/internal/eval/eval.go index 28cceee2c..04806ca36 100644 --- a/internal/eval/eval.go +++ b/internal/eval/eval.go @@ -2165,7 +2165,7 @@ func EvalQWATCH(args []string, httpOp, websocketOp bool, client *comm.Client, st } // EvalQUNWATCH removes the specified key from the watch list for the caller client. -func EvalQUNWATCH(args []string, httpOp, websocketOp bool, client *comm.Client) []byte { +func EvalQUNWATCH(args []string, httpOp bool, client *comm.Client) []byte { if len(args) != 1 { return diceerrors.NewErrArity("QUNWATCH") } @@ -2174,7 +2174,7 @@ func EvalQUNWATCH(args []string, httpOp, websocketOp bool, client *comm.Client) return clientio.Encode(e, false) } - if httpOp || websocketOp { + if httpOp { querymanager.QuerySubscriptionChan <- querymanager.QuerySubscription{ Subscribe: false, Query: query, diff --git a/internal/eval/execute.go b/internal/eval/execute.go index 458e76c2a..af7235679 100644 --- a/internal/eval/execute.go +++ b/internal/eval/execute.go @@ -33,7 +33,7 @@ func ExecuteCommand(c *cmd.DiceDBCmd, client *comm.Client, store *dstore.Store, case "SUBSCRIBE", "QWATCH": return &EvalResponse{Result: EvalQWATCH(c.Args, httpOp, websocketOp, client, store), Error: nil} case "UNSUBSCRIBE", "QUNWATCH": - return &EvalResponse{Result: EvalQUNWATCH(c.Args, httpOp, websocketOp, client), Error: nil} + return &EvalResponse{Result: EvalQUNWATCH(c.Args, httpOp, client), Error: nil} case auth.Cmd: return &EvalResponse{Result: EvalAUTH(c.Args, client), Error: nil} case "ABORT": diff --git a/internal/server/utils/redisCmdAdapter.go b/internal/server/utils/redisCmdAdapter.go index d734f2a23..18f281dd2 100644 --- a/internal/server/utils/redisCmdAdapter.go +++ b/internal/server/utils/redisCmdAdapter.go @@ -35,7 +35,6 @@ const ( ) const QWatch string = "QWATCH" -const QUnwatch string = "QUNWATCH" func ParseHTTPRequest(r *http.Request) (*cmd.DiceDBCmd, error) { commandParts := strings.Split(strings.TrimPrefix(r.URL.Path, "/"), "/") @@ -142,9 +141,8 @@ func ParseWebsocketMessage(msg []byte) (*cmd.DiceDBCmd, error) { cmdStr = cmdStr[idx+1:] var cmdArr []string // args - // cmdArr := []string{} // args - // handle qwatch and qunwatch commands - if command == QWatch || command == QUnwatch { + // handle qwatch commands + if command == QWatch { // remove quotes from query string cmdStr, err := strconv.Unquote(cmdStr) if err != nil { diff --git a/internal/server/utils/redisCmdAdapter_test.go b/internal/server/utils/redisCmdAdapter_test.go index 12ad28486..abba19564 100644 --- a/internal/server/utils/redisCmdAdapter_test.go +++ b/internal/server/utils/redisCmdAdapter_test.go @@ -398,12 +398,6 @@ func TestParseWebsocketMessage(t *testing.T) { expectedCmd: "QWATCH", expectedArgs: []string{"SELECT $key, $value WHERE $key LIKE 'player:*' AND '$value.score' > 10 ORDER BY $value.score DESC LIMIT 5"}, }, - { - name: "Test simple QUNWATCH command", - message: "qunwatch \"select $key, $value where $key like 'k?'\"", - expectedCmd: "QUNWATCH", - expectedArgs: []string{"select $key, $value where $key like 'k?'"}, - }, } for _, tc := range commands { diff --git a/internal/server/websocketServer.go b/internal/server/websocketServer.go index 8e97097cb..c2e22b089 100644 --- a/internal/server/websocketServer.go +++ b/internal/server/websocketServer.go @@ -23,7 +23,7 @@ import ( ) const QWatch = "QWATCH" -const QUnwatch = "QUNWATCH" +const Subscribe = "SUBSCRIBE" type WebsocketServer struct { shardManager *shard.ShardManager @@ -142,7 +142,7 @@ func (s *WebsocketServer) WebsocketHandler(w http.ResponseWriter, r *http.Reques } // handle qwatch and qunwatch commands - if diceDBCmd.Cmd == QWatch || diceDBCmd.Cmd == QUnwatch { + if diceDBCmd.Cmd == QWatch || diceDBCmd.Cmd == Subscribe { clientIdentifierID := generateUniqueInt32(r) sp.Client = comm.NewHTTPQwatchClient(s.qwatchResponseChan, clientIdentifierID) From b0ee3a616e3bb751166484becff902ec199db0fe Mon Sep 17 00:00:00 2001 From: psr Date: Mon, 14 Oct 2024 20:02:08 +0530 Subject: [PATCH 07/25] keep qunwatch as unimplemented command --- internal/server/websocketServer.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/internal/server/websocketServer.go b/internal/server/websocketServer.go index c2e22b089..b44d80fed 100644 --- a/internal/server/websocketServer.go +++ b/internal/server/websocketServer.go @@ -24,6 +24,11 @@ import ( const QWatch = "QWATCH" const Subscribe = "SUBSCRIBE" +const Qunwatch = "QUNWATCH" + +var unimplementedCommandsWebsocket = map[string]bool{ + Qunwatch: true, +} type WebsocketServer struct { shardManager *shard.ShardManager @@ -133,6 +138,11 @@ func (s *WebsocketServer) WebsocketHandler(w http.ResponseWriter, r *http.Reques break } + if unimplementedCommandsWebsocket[diceDBCmd.Cmd] { + writeResponse(conn, []byte("Command is not implemented with Websocket")) + continue + } + // create request sp := &ops.StoreOp{ Cmd: diceDBCmd, From edc7203faac926e28c5091c4bbeca4ce3985d433 Mon Sep 17 00:00:00 2001 From: psr Date: Mon, 14 Oct 2024 20:18:41 +0530 Subject: [PATCH 08/25] fixed comment --- internal/server/websocketServer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/server/websocketServer.go b/internal/server/websocketServer.go index b55d32e37..373ef2824 100644 --- a/internal/server/websocketServer.go +++ b/internal/server/websocketServer.go @@ -155,7 +155,7 @@ func (s *WebsocketServer) WebsocketHandler(w http.ResponseWriter, r *http.Reques WebsocketOp: true, } - // handle qwatch and qunwatch commands + // handle qwatch commands if diceDBCmd.Cmd == QWatch || diceDBCmd.Cmd == Subscribe { clientIdentifierID := generateUniqueInt32(r) sp.Client = comm.NewHTTPQwatchClient(s.qwatchResponseChan, clientIdentifierID) From e39c9a0022b7386d0cf0cc63fa09c26df291a287 Mon Sep 17 00:00:00 2001 From: psr Date: Tue, 15 Oct 2024 12:29:56 +0530 Subject: [PATCH 09/25] debugging --- integration_tests/commands/websocket/setup.go | 1 + internal/server/websocketServer.go | 1 + 2 files changed, 2 insertions(+) diff --git a/integration_tests/commands/websocket/setup.go b/integration_tests/commands/websocket/setup.go index a15acc793..dacd10523 100644 --- a/integration_tests/commands/websocket/setup.go +++ b/integration_tests/commands/websocket/setup.go @@ -86,6 +86,7 @@ func (e *WebsocketCommandExecutor) FireCommand(conn *websocket.Conn, cmd string) } func (e *WebsocketCommandExecutor) DisconnectServer(conn *websocket.Conn) { + slog.Info("Disconnecting server") conn.Close() } diff --git a/internal/server/websocketServer.go b/internal/server/websocketServer.go index 373ef2824..a6dffaec3 100644 --- a/internal/server/websocketServer.go +++ b/internal/server/websocketServer.go @@ -332,5 +332,6 @@ func writeResponse(conn *websocket.Conn, text []byte) { err := conn.WriteMessage(websocket.TextMessage, text) if err != nil { slog.Error(fmt.Sprintf("Error writing response: %v", err)) + slog.String("data: ", string(text)) } } From 4e6dbe9d3ed6c3571541a2460f76e18a33a1e129 Mon Sep 17 00:00:00 2001 From: psr Date: Tue, 15 Oct 2024 13:01:21 +0530 Subject: [PATCH 10/25] minor fixes --- .../commands/websocket/main_test.go | 10 +++++++--- internal/server/websocketServer.go | 16 +++++++++------- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/integration_tests/commands/websocket/main_test.go b/integration_tests/commands/websocket/main_test.go index a85925541..0090fe0b8 100644 --- a/integration_tests/commands/websocket/main_test.go +++ b/integration_tests/commands/websocket/main_test.go @@ -35,9 +35,13 @@ func TestMain(m *testing.M) { exitCode := m.Run() // abort - conn := executor.ConnectToServer() - executor.FireCommand(conn, "abort") - executor.DisconnectServer(conn) + wg.Add(1) + go func() { + defer wg.Done() + conn := executor.ConnectToServer() + executor.FireCommand(conn, "abort") + executor.DisconnectServer(conn) + }() wg.Wait() os.Exit(exitCode) diff --git a/internal/server/websocketServer.go b/internal/server/websocketServer.go index a6dffaec3..886c5b0a9 100644 --- a/internal/server/websocketServer.go +++ b/internal/server/websocketServer.go @@ -120,11 +120,12 @@ func (s *WebsocketServer) WebsocketHandler(w http.ResponseWriter, r *http.Reques conn.Close() }() + maxRetries := config.DiceConfig.WebSocket.MaxWriteResponseRetries for { // read incoming message _, msg, err := conn.ReadMessage() if err != nil { - writeResponse(conn, []byte("error: command reading failed")) + WriteResponseWithRetries(conn, []byte("error: command reading failed"), maxRetries) continue } @@ -133,7 +134,7 @@ func (s *WebsocketServer) WebsocketHandler(w http.ResponseWriter, r *http.Reques if errors.Is(err, diceerrors.ErrEmptyCommand) { continue } else if err != nil { - writeResponse(conn, []byte("error: parsing failed")) + WriteResponseWithRetries(conn, []byte("error: parsing failed"), maxRetries) continue } @@ -143,7 +144,7 @@ func (s *WebsocketServer) WebsocketHandler(w http.ResponseWriter, r *http.Reques } if unimplementedCommandsWebsocket[diceDBCmd.Cmd] { - writeResponse(conn, []byte("Command is not implemented with Websocket")) + WriteResponseWithRetries(conn, []byte("Command is not implemented with Websocket"), maxRetries) continue } @@ -191,6 +192,7 @@ func (s *WebsocketServer) processQwatchUpdates(clientIdentifierID uint32, conn * func (s *WebsocketServer) processResponse(conn *websocket.Conn, diceDBCmd *cmd.DiceDBCmd, response interface{}) error { var result interface{} var err error + maxRetries := config.DiceConfig.WebSocket.MaxWriteResponseRetries // check response type switch resp := response.(type) { @@ -202,7 +204,7 @@ func (s *WebsocketServer) processResponse(conn *websocket.Conn, diceDBCmd *cmd.D err = resp.EvalResponse.Error default: s.logger.Error("Unsupported response type") - writeResponse(conn, []byte("error: 500 Internal Server Error")) + WriteResponseWithRetries(conn, []byte("error: 500 Internal Server Error"), maxRetries) return nil } @@ -231,7 +233,7 @@ func (s *WebsocketServer) processResponse(conn *websocket.Conn, diceDBCmd *cmd.D responseValue, err = rp.DecodeOne() if err != nil { s.logger.Error("Error decoding response", "error", err) - writeResponse(conn, []byte("error: 500 Internal Server Error")) + WriteResponseWithRetries(conn, []byte("error: 500 Internal Server Error"), maxRetries) return nil } } else { @@ -253,7 +255,7 @@ func (s *WebsocketServer) processResponse(conn *websocket.Conn, diceDBCmd *cmd.D respBytes, err := json.Marshal(responseValue) if err != nil { s.logger.Error("Error marshaling json", "error", err) - writeResponse(conn, []byte("error: marshaling json")) + WriteResponseWithRetries(conn, []byte("error: marshaling json"), maxRetries) return nil } @@ -331,7 +333,7 @@ func writeResponse(conn *websocket.Conn, text []byte) { err := conn.WriteMessage(websocket.TextMessage, text) if err != nil { - slog.Error(fmt.Sprintf("Error writing response: %v", err)) slog.String("data: ", string(text)) + slog.Error(fmt.Sprintf("Error1 writing response: %v", err)) } } From 1e9148998920cb90a02023421e09ea8fa817b0d4 Mon Sep 17 00:00:00 2001 From: psr Date: Tue, 15 Oct 2024 13:29:00 +0530 Subject: [PATCH 11/25] moved test port to a constant --- integration_tests/commands/websocket/main_test.go | 2 +- integration_tests/commands/websocket/setup.go | 9 +++++++-- internal/server/websocketServer.go | 4 ++-- main.go | 2 +- 4 files changed, 11 insertions(+), 6 deletions(-) diff --git a/integration_tests/commands/websocket/main_test.go b/integration_tests/commands/websocket/main_test.go index 0090fe0b8..96d9f7de4 100644 --- a/integration_tests/commands/websocket/main_test.go +++ b/integration_tests/commands/websocket/main_test.go @@ -21,7 +21,7 @@ func TestMain(m *testing.M) { // checks for available port and then forks a goroutine // to start the server opts := TestServerOptions{ - Port: 8380, + Port: testPort, Logger: l, } RunWebsocketServer(context.Background(), &wg, opts) diff --git a/integration_tests/commands/websocket/setup.go b/integration_tests/commands/websocket/setup.go index dacd10523..847b4b9d5 100644 --- a/integration_tests/commands/websocket/setup.go +++ b/integration_tests/commands/websocket/setup.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "log" "log/slog" "net/http" @@ -19,7 +20,10 @@ import ( "github.com/gorilla/websocket" ) -const URL = "ws://localhost:8380" +const ( + URL = "ws://localhost:8380" + testPort = 8380 +) type TestServerOptions struct { Port int @@ -104,7 +108,8 @@ func RunWebsocketServer(ctx context.Context, wg *sync.WaitGroup, opt TestServerO shardManager := shard.NewShardManager(1, watchChan, nil, globalErrChannel, opt.Logger) queryWatcherLocal := querymanager.NewQueryManager(opt.Logger) config.WebsocketPort = opt.Port - testServer := server.NewWebSocketServer(shardManager, opt.Logger) + testServer := server.NewWebSocketServer(shardManager, testPort, opt.Logger) + fmt.Println("Starting Websocket Test Server on port: ", testPort) shardManagerCtx, cancelShardManager := context.WithCancel(ctx) // run shard manager diff --git a/internal/server/websocketServer.go b/internal/server/websocketServer.go index 886c5b0a9..34d367f60 100644 --- a/internal/server/websocketServer.go +++ b/internal/server/websocketServer.go @@ -44,10 +44,10 @@ type WebsocketServer struct { logger *slog.Logger } -func NewWebSocketServer(shardManager *shard.ShardManager, logger *slog.Logger) *WebsocketServer { +func NewWebSocketServer(shardManager *shard.ShardManager, port int, logger *slog.Logger) *WebsocketServer { mux := http.NewServeMux() srv := &http.Server{ - Addr: fmt.Sprintf(":%d", config.WebsocketPort), + Addr: fmt.Sprintf(":%d", port), Handler: mux, ReadHeaderTimeout: 5 * time.Second, } diff --git a/main.go b/main.go index 679b9f84d..76332c8b1 100644 --- a/main.go +++ b/main.go @@ -193,7 +193,7 @@ func main() { }() } - websocketServer := server.NewWebSocketServer(shardManager, logr) + websocketServer := server.NewWebSocketServer(shardManager, config.WebsocketPort, logr) serverWg.Add(1) go func() { defer serverWg.Done() From 7307273711106aadad57eb62d5bd79f3cca0e3a3 Mon Sep 17 00:00:00 2001 From: psr Date: Tue, 15 Oct 2024 13:52:35 +0530 Subject: [PATCH 12/25] debugging --- integration_tests/commands/websocket/setup.go | 2 -- internal/server/websocketServer.go | 5 +++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/integration_tests/commands/websocket/setup.go b/integration_tests/commands/websocket/setup.go index 847b4b9d5..9c0596549 100644 --- a/integration_tests/commands/websocket/setup.go +++ b/integration_tests/commands/websocket/setup.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "errors" - "fmt" "log" "log/slog" "net/http" @@ -109,7 +108,6 @@ func RunWebsocketServer(ctx context.Context, wg *sync.WaitGroup, opt TestServerO queryWatcherLocal := querymanager.NewQueryManager(opt.Logger) config.WebsocketPort = opt.Port testServer := server.NewWebSocketServer(shardManager, testPort, opt.Logger) - fmt.Println("Starting Websocket Test Server on port: ", testPort) shardManagerCtx, cancelShardManager := context.WithCancel(ctx) // run shard manager diff --git a/internal/server/websocketServer.go b/internal/server/websocketServer.go index 34d367f60..0c6334cbc 100644 --- a/internal/server/websocketServer.go +++ b/internal/server/websocketServer.go @@ -126,7 +126,8 @@ func (s *WebsocketServer) WebsocketHandler(w http.ResponseWriter, r *http.Reques _, msg, err := conn.ReadMessage() if err != nil { WriteResponseWithRetries(conn, []byte("error: command reading failed"), maxRetries) - continue + close(s.shutdownChan) + break } // parse message to dice command @@ -262,7 +263,7 @@ func (s *WebsocketServer) processResponse(conn *websocket.Conn, diceDBCmd *cmd.D // success // Write response with retries for transient errors if err := WriteResponseWithRetries(conn, respBytes, config.DiceConfig.WebSocket.MaxWriteResponseRetries); err != nil { - s.logger.Error(fmt.Sprintf("Error reading message: %v", err)) + s.logger.Error(fmt.Sprintf("Error writing message: %v", err)) return fmt.Errorf("error writing response: %v", err) } From 096114d6d2e6e3a6a4863a7fc92a28f96ed87277 Mon Sep 17 00:00:00 2001 From: psr Date: Wed, 16 Oct 2024 11:32:57 +0530 Subject: [PATCH 13/25] minor fixes --- internal/server/websocketServer.go | 62 +++++++++++++++--------------- 1 file changed, 32 insertions(+), 30 deletions(-) diff --git a/internal/server/websocketServer.go b/internal/server/websocketServer.go index 0c6334cbc..5bd2df71e 100644 --- a/internal/server/websocketServer.go +++ b/internal/server/websocketServer.go @@ -26,9 +26,11 @@ import ( "golang.org/x/exp/rand" ) -const QWatch = "QWATCH" -const Subscribe = "SUBSCRIBE" -const Qunwatch = "QUNWATCH" +const ( + QWatch = "QWATCH" + Qunwatch = "QUNWATCH" + Subscribe = "SUBSCRIBE" +) var unimplementedCommandsWebsocket = map[string]bool{ Qunwatch: true, @@ -86,7 +88,7 @@ func (s *WebsocketServer) Run(ctx context.Context) error { case <-ctx.Done(): case <-s.shutdownChan: err = diceerrors.ErrAborted - s.logger.Debug("Shutting down Websocket Server") + s.logger.Debug("Shutting down Websocket Server", slog.Any("time", time.Now())) } shutdownErr := s.websocketServer.Shutdown(websocketCtx) @@ -101,6 +103,9 @@ func (s *WebsocketServer) Run(ctx context.Context) error { defer wg.Done() s.logger.Info("Websocket Server running", slog.String("port", s.websocketServer.Addr[1:])) err = s.websocketServer.ListenAndServe() + if err != nil { + s.logger.Debug("Error in Websocket Server", slog.Any("time", time.Now()), slog.Any("error", err)) + } }() wg.Wait() @@ -125,8 +130,6 @@ func (s *WebsocketServer) WebsocketHandler(w http.ResponseWriter, r *http.Reques // read incoming message _, msg, err := conn.ReadMessage() if err != nil { - WriteResponseWithRetries(conn, []byte("error: command reading failed"), maxRetries) - close(s.shutdownChan) break } @@ -135,7 +138,9 @@ func (s *WebsocketServer) WebsocketHandler(w http.ResponseWriter, r *http.Reques if errors.Is(err, diceerrors.ErrEmptyCommand) { continue } else if err != nil { - WriteResponseWithRetries(conn, []byte("error: parsing failed"), maxRetries) + if err := WriteResponseWithRetries(conn, []byte("error: parsing failed"), maxRetries); err != nil { + s.logger.Debug(fmt.Sprintf("Error writing message: %v", err)) + } continue } @@ -145,7 +150,9 @@ func (s *WebsocketServer) WebsocketHandler(w http.ResponseWriter, r *http.Reques } if unimplementedCommandsWebsocket[diceDBCmd.Cmd] { - WriteResponseWithRetries(conn, []byte("Command is not implemented with Websocket"), maxRetries) + if err := WriteResponseWithRetries(conn, []byte("Command is not implemented with Websocket"), maxRetries); err != nil { + s.logger.Debug(fmt.Sprintf("Error writing message: %v", err)) + } continue } @@ -180,7 +187,7 @@ func (s *WebsocketServer) processQwatchUpdates(clientIdentifierID uint32, conn * case resp := <-s.qwatchResponseChan: if resp.ClientIdentifierID == clientIdentifierID { if err := s.processResponse(conn, dicDBCmd, resp); err != nil { - s.logger.Error("Error writing response to client. Shutting down goroutine for qwatch updates", slog.Any("clientIdentifierID", clientIdentifierID), slog.Any("error", err)) + s.logger.Debug("Error writing response to client. Shutting down goroutine for qwatch updates", slog.Any("clientIdentifierID", clientIdentifierID), slog.Any("error", err)) return } } @@ -204,8 +211,11 @@ func (s *WebsocketServer) processResponse(conn *websocket.Conn, diceDBCmd *cmd.D result = resp.EvalResponse.Result err = resp.EvalResponse.Error default: - s.logger.Error("Unsupported response type") - WriteResponseWithRetries(conn, []byte("error: 500 Internal Server Error"), maxRetries) + s.logger.Debug("Unsupported response type") + if err := WriteResponseWithRetries(conn, []byte("error: 500 Internal Server Error"), maxRetries); err != nil { + s.logger.Debug(fmt.Sprintf("Error writing message: %v", err)) + return fmt.Errorf("error writing response: %v", err) + } return nil } @@ -233,8 +243,11 @@ func (s *WebsocketServer) processResponse(conn *websocket.Conn, diceDBCmd *cmd.D responseValue, err = rp.DecodeOne() if err != nil { - s.logger.Error("Error decoding response", "error", err) - WriteResponseWithRetries(conn, []byte("error: 500 Internal Server Error"), maxRetries) + s.logger.Debug("Error decoding response", "error", err) + if err := WriteResponseWithRetries(conn, []byte("error: 500 Internal Server Error"), maxRetries); err != nil { + s.logger.Debug(fmt.Sprintf("Error writing message: %v", err)) + return fmt.Errorf("error writing response: %v", err) + } return nil } } else { @@ -255,15 +268,18 @@ func (s *WebsocketServer) processResponse(conn *websocket.Conn, diceDBCmd *cmd.D respBytes, err := json.Marshal(responseValue) if err != nil { - s.logger.Error("Error marshaling json", "error", err) - WriteResponseWithRetries(conn, []byte("error: marshaling json"), maxRetries) + s.logger.Debug("Error marshaling json", "error", err) + if err := WriteResponseWithRetries(conn, []byte("error: marshaling json"), maxRetries); err != nil { + s.logger.Debug(fmt.Sprintf("Error writing message: %v", err)) + return fmt.Errorf("error writing response: %v", err) + } return nil } // success // Write response with retries for transient errors if err := WriteResponseWithRetries(conn, respBytes, config.DiceConfig.WebSocket.MaxWriteResponseRetries); err != nil { - s.logger.Error(fmt.Sprintf("Error writing message: %v", err)) + s.logger.Debug(fmt.Sprintf("Error writing message: %v", err)) return fmt.Errorf("error writing response: %v", err) } @@ -324,17 +340,3 @@ func WriteResponseWithRetries(conn *websocket.Conn, text []byte, maxRetries int) return nil } - -func writeResponse(conn *websocket.Conn, text []byte) { - // Set a write deadline to prevent hanging - if err := conn.SetWriteDeadline(time.Now().Add(10 * time.Second)); err != nil { - slog.Error(fmt.Sprintf("Error setting write deadline: %v", err)) - return - } - - err := conn.WriteMessage(websocket.TextMessage, text) - if err != nil { - slog.String("data: ", string(text)) - slog.Error(fmt.Sprintf("Error1 writing response: %v", err)) - } -} From 7331d9119ccc27aae9ef03e85308fe99da3440c8 Mon Sep 17 00:00:00 2001 From: psr Date: Wed, 16 Oct 2024 11:43:51 +0530 Subject: [PATCH 14/25] fixed integration tests --- .../commands/websocket/get_test.go | 9 +- .../commands/websocket/main_test.go | 16 +-- .../commands/websocket/qwatch_test.go | 16 ++- .../commands/websocket/set_test.go | 112 ++++++++++++++---- integration_tests/commands/websocket/setup.go | 42 ++++--- ...ite_retries_test.go => writeretry_test.go} | 8 +- 6 files changed, 135 insertions(+), 68 deletions(-) rename integration_tests/commands/websocket/{websocket_write_retries_test.go => writeretry_test.go} (94%) diff --git a/integration_tests/commands/websocket/get_test.go b/integration_tests/commands/websocket/get_test.go index 606b28f9e..98d1d854a 100644 --- a/integration_tests/commands/websocket/get_test.go +++ b/integration_tests/commands/websocket/get_test.go @@ -4,7 +4,7 @@ import ( "testing" "time" - "gotest.tools/v3/assert" + "github.com/stretchr/testify/assert" ) func TestGet(t *testing.T) { @@ -19,9 +19,9 @@ func TestGet(t *testing.T) { }{ { name: "Get with expiration", - cmds: []string{"SET k v EX 4", "GET k", "GET k"}, + cmds: []string{"SET k v EX 1", "GET k", "GET k"}, expect: []interface{}{"OK", "v", "(nil)"}, - delays: []time.Duration{0, 0, 5 * time.Second}, + delays: []time.Duration{0, 0, 2 * time.Second}, }, } @@ -31,7 +31,8 @@ func TestGet(t *testing.T) { if tc.delays[i] > 0 { time.Sleep(tc.delays[i]) } - result := exec.FireCommand(conn, cmd) + result, err := exec.FireCommandAndReadResponse(conn, cmd) + assert.Nil(t, err) assert.Equal(t, tc.expect[i], result, "Value mismatch for cmd %s", cmd) } }) diff --git a/integration_tests/commands/websocket/main_test.go b/integration_tests/commands/websocket/main_test.go index 96d9f7de4..cc330a19d 100644 --- a/integration_tests/commands/websocket/main_test.go +++ b/integration_tests/commands/websocket/main_test.go @@ -21,10 +21,11 @@ func TestMain(m *testing.M) { // checks for available port and then forks a goroutine // to start the server opts := TestServerOptions{ - Port: testPort, + Port: testPort1, Logger: l, } - RunWebsocketServer(context.Background(), &wg, opts) + ctx, cancel := context.WithCancel(context.Background()) + RunWebsocketServer(ctx, &wg, opts) // Wait for the server to start time.Sleep(2 * time.Second) @@ -34,15 +35,10 @@ func TestMain(m *testing.M) { // Run the test suite exitCode := m.Run() - // abort - wg.Add(1) - go func() { - defer wg.Done() - conn := executor.ConnectToServer() - executor.FireCommand(conn, "abort") - executor.DisconnectServer(conn) - }() + conn := executor.ConnectToServer() + executor.FireCommand(conn, "abort") + cancel() wg.Wait() os.Exit(exitCode) } diff --git a/integration_tests/commands/websocket/qwatch_test.go b/integration_tests/commands/websocket/qwatch_test.go index c8e5c3b62..ebac2662f 100644 --- a/integration_tests/commands/websocket/qwatch_test.go +++ b/integration_tests/commands/websocket/qwatch_test.go @@ -3,7 +3,7 @@ package websocket import ( "testing" - testifyAssert "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/assert" ) func TestQWatch(t *testing.T) { @@ -25,23 +25,27 @@ func TestQWatch(t *testing.T) { cmds: []string{"QWATCH \"SELECT \""}, expect: "error parsing SQL statement: syntax error at position 8", }, + // TODO - once following query is registered, websocket will also attempt sending updates + // while keys are set for other tests in this package + // Add unregister test case to handle this scenario once qunwatch support is added { name: "Successful register", - cmds: []string{`QWATCH "SELECT $key, $value WHERE $key like 'k?'"`}, - expect: []interface{}{"qwatch", "SELECT $key, $value WHERE $key like 'k?'", []interface{}{}}, + cmds: []string{`QWATCH "SELECT $key, $value WHERE $key like 'qwatch-test-key?'"`}, + expect: []interface{}{"qwatch", "SELECT $key, $value WHERE $key like 'qwatch-test-key?'", []interface{}{}}, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { for _, cmd := range tc.cmds { - result := exec.FireCommand(conn, cmd) + result, err := exec.FireCommandAndReadResponse(conn, cmd) + assert.Nil(t, err) if _, ok := tc.expect.(string); ok { // compare strings - testifyAssert.Equal(t, tc.expect, result, "Value mismatch for cmd %s", cmd) + assert.Equal(t, tc.expect, result, "Value mismatch for cmd %s", cmd) } else { // compare lists - testifyAssert.ElementsMatch(t, tc.expect, result, "Value mismatch for cmd %s", cmd) + assert.ElementsMatch(t, tc.expect, result, "Value mismatch for cmd %s", cmd) } } }) diff --git a/integration_tests/commands/websocket/set_test.go b/integration_tests/commands/websocket/set_test.go index 964652995..3f224e725 100644 --- a/integration_tests/commands/websocket/set_test.go +++ b/integration_tests/commands/websocket/set_test.go @@ -6,7 +6,8 @@ import ( "testing" "time" - "gotest.tools/v3/assert" + "github.com/gorilla/websocket" + "github.com/stretchr/testify/assert" ) type TestCase struct { @@ -39,11 +40,13 @@ func TestSet(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { conn := exec.ConnectToServer() - exec.FireCommand(conn, "del k") + + DeleteKey(t, conn, exec, "k") for i, cmd := range tc.commands { - result := exec.FireCommand(conn, cmd) - assert.DeepEqual(t, tc.expected[i], result) + result, err := exec.FireCommandAndReadResponse(conn, cmd) + assert.Nil(t, err) + assert.Equal(t, tc.expected[i], result) } }) } @@ -124,11 +127,14 @@ func TestSetWithOptions(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { conn := exec.ConnectToServer() - exec.FireCommand(conn, "del k") - exec.FireCommand(conn, "del k1") - exec.FireCommand(conn, "del k2") + + DeleteKey(t, conn, exec, "k") + DeleteKey(t, conn, exec, "k1") + DeleteKey(t, conn, exec, "k2") + for i, cmd := range tc.commands { - result := exec.FireCommand(conn, cmd) + result, err := exec.FireCommandAndReadResponse(conn, cmd) + assert.Nil(t, err) assert.Equal(t, tc.expected[i], result) } }) @@ -143,32 +149,76 @@ func TestSetWithExat(t *testing.T) { t.Run("SET with EXAT", func(t *testing.T) { conn := exec.ConnectToServer() - exec.FireCommand(conn, "DEL k") - assert.Equal(t, "OK", exec.FireCommand(conn, fmt.Sprintf("SET k v EXAT %v", Etime)), "Value mismatch for cmd SET k v EXAT "+Etime) - assert.Equal(t, "v", exec.FireCommand(conn, "GET k"), "Value mismatch for cmd GET k") - assert.Assert(t, exec.FireCommand(conn, "TTL k").(float64) <= 5, "Value mismatch for cmd TTL k") + + DeleteKey(t, conn, exec, "k") + + resp, err := exec.FireCommandAndReadResponse(conn, fmt.Sprintf("SET k v EXAT %v", Etime)) + assert.Nil(t, err) + assert.Equal(t, "OK", resp, "Value mismatch for cmd SET k v EXAT "+Etime) + + resp, err = exec.FireCommandAndReadResponse(conn, "GET k") + assert.Nil(t, err) + assert.Equal(t, "v", resp, "Value mismatch for cmd GET k") + + resp, err = exec.FireCommandAndReadResponse(conn, "TTL k") + assert.Nil(t, err) + respFloat, ok := resp.(float64) + assert.True(t, ok) + assert.True(t, respFloat <= 5, "Value mismatch for cmd TTL k") + time.Sleep(3 * time.Second) - assert.Assert(t, exec.FireCommand(conn, "TTL k").(float64) <= 3, "Value mismatch for cmd TTL k") + resp, err = exec.FireCommandAndReadResponse(conn, "TTL k") + assert.Nil(t, err) + respFloat, ok = resp.(float64) + assert.True(t, ok) + assert.True(t, respFloat <= 3, "Value mismatch for cmd TTL k") + time.Sleep(3 * time.Second) - assert.Equal(t, "(nil)", exec.FireCommand(conn, "GET k"), "Value mismatch for cmd GET k") - assert.Equal(t, float64(-2), exec.FireCommand(conn, "TTL k"), "Value mismatch for cmd TTL k") + resp, err = exec.FireCommandAndReadResponse(conn, "GET k") + assert.Nil(t, err) + assert.Equal(t, "(nil)", resp, "Value mismatch for cmd GET k") + + resp, err = exec.FireCommandAndReadResponse(conn, "TTL k") + assert.Nil(t, err) + respFloat, ok = resp.(float64) + assert.True(t, ok) + assert.Equal(t, float64(-2), respFloat, "Value mismatch for cmd TTL k") }) t.Run("SET with invalid EXAT expires key immediately", func(t *testing.T) { conn := exec.ConnectToServer() - exec.FireCommand(conn, "DEL k") - assert.Equal(t, "OK", exec.FireCommand(conn, "SET k v EXAT "+BadTime), "Value mismatch for cmd SET k v EXAT "+BadTime) - assert.Equal(t, "(nil)", exec.FireCommand(conn, "GET k"), "Value mismatch for cmd GET k") - assert.Equal(t, float64(-2), exec.FireCommand(conn, "TTL k"), "Value mismatch for cmd TTL k") + + DeleteKey(t, conn, exec, "k") + + resp, err := exec.FireCommandAndReadResponse(conn, "SET k v EXAT "+BadTime) + assert.Nil(t, err) + assert.Equal(t, "OK", resp, "Value mismatch for cmd SET k v EXAT "+BadTime) + + resp, err = exec.FireCommandAndReadResponse(conn, "GET k") + assert.Nil(t, err) + assert.Equal(t, "(nil)", resp, "Value mismatch for cmd GET k") + + resp, err = exec.FireCommandAndReadResponse(conn, "TTL k") + assert.Nil(t, err) + respFloat, ok := resp.(float64) + assert.True(t, ok) + assert.Equal(t, float64(-2), respFloat, "Value mismatch for cmd TTL k") }) t.Run("SET with EXAT and PXAT returns syntax error", func(t *testing.T) { conn := exec.ConnectToServer() - exec.FireCommand(conn, "DEL k") - assert.Equal(t, "ERR syntax error", exec.FireCommand(conn, "SET k v PXAT "+Etime+" EXAT "+Etime), "Value mismatch for cmd SET k v PXAT "+Etime+" EXAT "+Etime) - assert.Equal(t, "(nil)", exec.FireCommand(conn, "GET k"), "Value mismatch for cmd GET k") + + DeleteKey(t, conn, exec, "k") + + resp, err := exec.FireCommandAndReadResponse(conn, "SET k v PXAT "+Etime+" EXAT "+Etime) + assert.Nil(t, err) + assert.Equal(t, "ERR syntax error", resp, "Value mismatch for cmd SET k v PXAT "+Etime+" EXAT "+Etime) + + resp, err = exec.FireCommandAndReadResponse(conn, "GET k") + assert.Nil(t, err) + assert.Equal(t, "(nil)", resp, "Value mismatch for cmd GET k") }) } @@ -185,14 +235,26 @@ func TestWithKeepTTLFlag(t *testing.T) { for i := 0; i < len(tcase.commands); i++ { cmd := tcase.commands[i] out := tcase.expected[i] - assert.Equal(t, out, exec.FireCommand(conn, cmd), "Value mismatch for cmd %s\n.", cmd) + + resp, err := exec.FireCommandAndReadResponse(conn, cmd) + assert.Nil(t, err) + assert.Equal(t, out, resp, "Value mismatch for cmd %s\n.", cmd) } } time.Sleep(2 * time.Second) - cmd := "GET k" out := "(nil)" + resp, err := exec.FireCommandAndReadResponse(conn, cmd) + assert.Nil(t, err) + assert.Equal(t, out, resp, "Value mismatch for cmd %s\n.", cmd) +} - assert.Equal(t, out, exec.FireCommand(conn, cmd), "Value mismatch for cmd %s\n.", cmd) +func DeleteKey(t *testing.T, conn *websocket.Conn, exec *WebsocketCommandExecutor, key string) { + cmd := "DEL " + key + resp, err := exec.FireCommandAndReadResponse(conn, cmd) + assert.Nil(t, err) + respFloat, ok := resp.(float64) + assert.True(t, ok, "error converting response to float64") + assert.True(t, respFloat == 1 || respFloat == 0, "unexpected response in %v: %v", cmd, resp) } diff --git a/integration_tests/commands/websocket/setup.go b/integration_tests/commands/websocket/setup.go index 9c0596549..aae05b646 100644 --- a/integration_tests/commands/websocket/setup.go +++ b/integration_tests/commands/websocket/setup.go @@ -4,7 +4,7 @@ import ( "context" "encoding/json" "errors" - "log" + "fmt" "log/slog" "net/http" "sync" @@ -20,8 +20,9 @@ import ( ) const ( - URL = "ws://localhost:8380" - testPort = 8380 + URL = "ws://localhost:8380" + testPort1 = 8380 + testPort2 = 8381 ) type TestServerOptions struct { @@ -64,33 +65,39 @@ func (e *WebsocketCommandExecutor) ConnectToServer() *websocket.Conn { return conn } -func (e *WebsocketCommandExecutor) FireCommand(conn *websocket.Conn, cmd string) interface{} { - command := []byte(cmd) - - // send request - err := conn.WriteMessage(websocket.TextMessage, command) +func (e *WebsocketCommandExecutor) FireCommandAndReadResponse(conn *websocket.Conn, cmd string) (interface{}, error) { + err := e.FireCommand(conn, cmd) if err != nil { - return nil + return nil, err } // read the response _, resp, err := conn.ReadMessage() if err != nil { - return nil + return nil, err } // marshal to json var respJSON interface{} if err = json.Unmarshal(resp, &respJSON); err != nil { - return nil + return nil, fmt.Errorf("error unmarshaling response") } - return respJSON + return respJSON, nil } -func (e *WebsocketCommandExecutor) DisconnectServer(conn *websocket.Conn) { - slog.Info("Disconnecting server") - conn.Close() +func (e *WebsocketCommandExecutor) FireCommand(conn *websocket.Conn, cmd string) error { + defer func() { + time.Sleep(100 * time.Millisecond) + }() + + // send request + err := conn.WriteMessage(websocket.TextMessage, []byte(cmd)) + if err != nil { + return err + } + + return nil } func (e *WebsocketCommandExecutor) Name() string { @@ -98,6 +105,7 @@ func (e *WebsocketCommandExecutor) Name() string { } func RunWebsocketServer(ctx context.Context, wg *sync.WaitGroup, opt TestServerOptions) { + logger := opt.Logger config.DiceConfig.Network.IOBufferLength = 16 config.DiceConfig.Persistence.WriteAOFOnCleanup = false @@ -107,7 +115,7 @@ func RunWebsocketServer(ctx context.Context, wg *sync.WaitGroup, opt TestServerO shardManager := shard.NewShardManager(1, watchChan, nil, globalErrChannel, opt.Logger) queryWatcherLocal := querymanager.NewQueryManager(opt.Logger) config.WebsocketPort = opt.Port - testServer := server.NewWebSocketServer(shardManager, testPort, opt.Logger) + testServer := server.NewWebSocketServer(shardManager, testPort1, opt.Logger) shardManagerCtx, cancelShardManager := context.WithCancel(ctx) // run shard manager @@ -134,7 +142,7 @@ func RunWebsocketServer(ctx context.Context, wg *sync.WaitGroup, opt TestServerO if errors.Is(srverr, derrors.ErrAborted) { return } - log.Printf("Websocket test server encountered an error: %v", srverr) + logger.Debug("Websocket test server encountered an error: %v", slog.Any("error", srverr)) } }() } diff --git a/integration_tests/commands/websocket/websocket_write_retries_test.go b/integration_tests/commands/websocket/writeretry_test.go similarity index 94% rename from integration_tests/commands/websocket/websocket_write_retries_test.go rename to integration_tests/commands/websocket/writeretry_test.go index a295ed089..b0b270fc8 100644 --- a/integration_tests/commands/websocket/websocket_write_retries_test.go +++ b/integration_tests/commands/websocket/writeretry_test.go @@ -1,10 +1,10 @@ package websocket import ( + "fmt" "net" "net/http" "net/url" - "os" "sync" "testing" "time" @@ -14,7 +14,7 @@ import ( "github.com/stretchr/testify/assert" ) -var serverAddr = "localhost:12345" +var serverAddr = fmt.Sprintf("localhost:%v", testPort2) var once sync.Once func TestWriteResponseWithRetries_Success(t *testing.T) { @@ -70,10 +70,6 @@ func TestWriteResponseWithRetries_EAGAINRetry(t *testing.T) { assert.Equal(t, 2, retries) } -func newSyscallError(syscall string, err error) *os.SyscallError { - return &os.SyscallError{Syscall: syscall, Err: err} -} - func startWebSocketServer() { http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) { conn, err := websocket.Upgrade(w, r, nil, 1024, 1024) From c3a087c056cb51344b1475d4000f3871084e8c61 Mon Sep 17 00:00:00 2001 From: psr Date: Wed, 16 Oct 2024 11:46:42 +0530 Subject: [PATCH 15/25] fixed linter checks --- integration_tests/commands/websocket/setup.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/integration_tests/commands/websocket/setup.go b/integration_tests/commands/websocket/setup.go index aae05b646..4ddc94acc 100644 --- a/integration_tests/commands/websocket/setup.go +++ b/integration_tests/commands/websocket/setup.go @@ -87,9 +87,7 @@ func (e *WebsocketCommandExecutor) FireCommandAndReadResponse(conn *websocket.Co } func (e *WebsocketCommandExecutor) FireCommand(conn *websocket.Conn, cmd string) error { - defer func() { - time.Sleep(100 * time.Millisecond) - }() + defer time.Sleep(100 * time.Millisecond) // send request err := conn.WriteMessage(websocket.TextMessage, []byte(cmd)) From 39687ef9785f8372690cf7a4a17bd71330c0fbf0 Mon Sep 17 00:00:00 2001 From: psr Date: Wed, 16 Oct 2024 12:27:13 +0530 Subject: [PATCH 16/25] fixed breaking tests --- .../commands/websocket/helper.go | 17 +++++++++++++ .../commands/websocket/hyperloglog_test.go | 9 +++---- .../commands/websocket/set_test.go | 24 ++++++------------- 3 files changed, 29 insertions(+), 21 deletions(-) create mode 100644 integration_tests/commands/websocket/helper.go diff --git a/integration_tests/commands/websocket/helper.go b/integration_tests/commands/websocket/helper.go new file mode 100644 index 000000000..ab724995e --- /dev/null +++ b/integration_tests/commands/websocket/helper.go @@ -0,0 +1,17 @@ +package websocket + +import ( + "testing" + + "github.com/gorilla/websocket" + "github.com/stretchr/testify/assert" +) + +func deleteKey(t *testing.T, conn *websocket.Conn, exec *WebsocketCommandExecutor, key string) { + cmd := "DEL " + key + resp, err := exec.FireCommandAndReadResponse(conn, cmd) + assert.Nil(t, err) + respFloat, ok := resp.(float64) + assert.True(t, ok, "error converting response to float64") + assert.True(t, respFloat == 1 || respFloat == 0, "unexpected response in %v: %v", cmd, resp) +} diff --git a/integration_tests/commands/websocket/hyperloglog_test.go b/integration_tests/commands/websocket/hyperloglog_test.go index fe3a27173..8a3ed0942 100644 --- a/integration_tests/commands/websocket/hyperloglog_test.go +++ b/integration_tests/commands/websocket/hyperloglog_test.go @@ -3,7 +3,7 @@ package websocket import ( "testing" - "gotest.tools/v3/assert" + "github.com/stretchr/testify/assert" ) func TestHyperLogLogCommands(t *testing.T) { @@ -82,11 +82,12 @@ func TestHyperLogLogCommands(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { conn := exec.ConnectToServer() - exec.FireCommand(conn, "del k") + deleteKey(t, conn, exec, "k") for i, cmd := range tc.commands { - result := exec.FireCommand(conn, cmd) - assert.DeepEqual(t, tc.expected[i], result) + result, err := exec.FireCommandAndReadResponse(conn, cmd) + assert.Nil(t, err) + assert.Equal(t, tc.expected[i], result) } }) } diff --git a/integration_tests/commands/websocket/set_test.go b/integration_tests/commands/websocket/set_test.go index 3f224e725..ff96a1ac5 100644 --- a/integration_tests/commands/websocket/set_test.go +++ b/integration_tests/commands/websocket/set_test.go @@ -6,7 +6,6 @@ import ( "testing" "time" - "github.com/gorilla/websocket" "github.com/stretchr/testify/assert" ) @@ -41,7 +40,7 @@ func TestSet(t *testing.T) { t.Run(tc.name, func(t *testing.T) { conn := exec.ConnectToServer() - DeleteKey(t, conn, exec, "k") + deleteKey(t, conn, exec, "k") for i, cmd := range tc.commands { result, err := exec.FireCommandAndReadResponse(conn, cmd) @@ -128,9 +127,9 @@ func TestSetWithOptions(t *testing.T) { t.Run(tc.name, func(t *testing.T) { conn := exec.ConnectToServer() - DeleteKey(t, conn, exec, "k") - DeleteKey(t, conn, exec, "k1") - DeleteKey(t, conn, exec, "k2") + deleteKey(t, conn, exec, "k") + deleteKey(t, conn, exec, "k1") + deleteKey(t, conn, exec, "k2") for i, cmd := range tc.commands { result, err := exec.FireCommandAndReadResponse(conn, cmd) @@ -150,7 +149,7 @@ func TestSetWithExat(t *testing.T) { func(t *testing.T) { conn := exec.ConnectToServer() - DeleteKey(t, conn, exec, "k") + deleteKey(t, conn, exec, "k") resp, err := exec.FireCommandAndReadResponse(conn, fmt.Sprintf("SET k v EXAT %v", Etime)) assert.Nil(t, err) @@ -189,7 +188,7 @@ func TestSetWithExat(t *testing.T) { func(t *testing.T) { conn := exec.ConnectToServer() - DeleteKey(t, conn, exec, "k") + deleteKey(t, conn, exec, "k") resp, err := exec.FireCommandAndReadResponse(conn, "SET k v EXAT "+BadTime) assert.Nil(t, err) @@ -210,7 +209,7 @@ func TestSetWithExat(t *testing.T) { func(t *testing.T) { conn := exec.ConnectToServer() - DeleteKey(t, conn, exec, "k") + deleteKey(t, conn, exec, "k") resp, err := exec.FireCommandAndReadResponse(conn, "SET k v PXAT "+Etime+" EXAT "+Etime) assert.Nil(t, err) @@ -249,12 +248,3 @@ func TestWithKeepTTLFlag(t *testing.T) { assert.Nil(t, err) assert.Equal(t, out, resp, "Value mismatch for cmd %s\n.", cmd) } - -func DeleteKey(t *testing.T, conn *websocket.Conn, exec *WebsocketCommandExecutor, key string) { - cmd := "DEL " + key - resp, err := exec.FireCommandAndReadResponse(conn, cmd) - assert.Nil(t, err) - respFloat, ok := resp.(float64) - assert.True(t, ok, "error converting response to float64") - assert.True(t, respFloat == 1 || respFloat == 0, "unexpected response in %v: %v", cmd, resp) -} From faee66b04a4bd025e1df8a0253aa351ad4991c0d Mon Sep 17 00:00:00 2001 From: psr Date: Wed, 16 Oct 2024 12:36:07 +0530 Subject: [PATCH 17/25] fixed linter warnings --- integration_tests/commands/websocket/helper.go | 2 +- .../commands/websocket/hyperloglog_test.go | 2 +- integration_tests/commands/websocket/set_test.go | 14 +++++++------- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/integration_tests/commands/websocket/helper.go b/integration_tests/commands/websocket/helper.go index ab724995e..ca14c5b43 100644 --- a/integration_tests/commands/websocket/helper.go +++ b/integration_tests/commands/websocket/helper.go @@ -7,7 +7,7 @@ import ( "github.com/stretchr/testify/assert" ) -func deleteKey(t *testing.T, conn *websocket.Conn, exec *WebsocketCommandExecutor, key string) { +func DeleteKey(t *testing.T, conn *websocket.Conn, exec *WebsocketCommandExecutor, key string) { cmd := "DEL " + key resp, err := exec.FireCommandAndReadResponse(conn, cmd) assert.Nil(t, err) diff --git a/integration_tests/commands/websocket/hyperloglog_test.go b/integration_tests/commands/websocket/hyperloglog_test.go index 8a3ed0942..65e02dc04 100644 --- a/integration_tests/commands/websocket/hyperloglog_test.go +++ b/integration_tests/commands/websocket/hyperloglog_test.go @@ -82,7 +82,7 @@ func TestHyperLogLogCommands(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { conn := exec.ConnectToServer() - deleteKey(t, conn, exec, "k") + DeleteKey(t, conn, exec, "k") for i, cmd := range tc.commands { result, err := exec.FireCommandAndReadResponse(conn, cmd) diff --git a/integration_tests/commands/websocket/set_test.go b/integration_tests/commands/websocket/set_test.go index ff96a1ac5..9f31d4dde 100644 --- a/integration_tests/commands/websocket/set_test.go +++ b/integration_tests/commands/websocket/set_test.go @@ -40,7 +40,7 @@ func TestSet(t *testing.T) { t.Run(tc.name, func(t *testing.T) { conn := exec.ConnectToServer() - deleteKey(t, conn, exec, "k") + DeleteKey(t, conn, exec, "k") for i, cmd := range tc.commands { result, err := exec.FireCommandAndReadResponse(conn, cmd) @@ -127,9 +127,9 @@ func TestSetWithOptions(t *testing.T) { t.Run(tc.name, func(t *testing.T) { conn := exec.ConnectToServer() - deleteKey(t, conn, exec, "k") - deleteKey(t, conn, exec, "k1") - deleteKey(t, conn, exec, "k2") + DeleteKey(t, conn, exec, "k") + DeleteKey(t, conn, exec, "k1") + DeleteKey(t, conn, exec, "k2") for i, cmd := range tc.commands { result, err := exec.FireCommandAndReadResponse(conn, cmd) @@ -149,7 +149,7 @@ func TestSetWithExat(t *testing.T) { func(t *testing.T) { conn := exec.ConnectToServer() - deleteKey(t, conn, exec, "k") + DeleteKey(t, conn, exec, "k") resp, err := exec.FireCommandAndReadResponse(conn, fmt.Sprintf("SET k v EXAT %v", Etime)) assert.Nil(t, err) @@ -188,7 +188,7 @@ func TestSetWithExat(t *testing.T) { func(t *testing.T) { conn := exec.ConnectToServer() - deleteKey(t, conn, exec, "k") + DeleteKey(t, conn, exec, "k") resp, err := exec.FireCommandAndReadResponse(conn, "SET k v EXAT "+BadTime) assert.Nil(t, err) @@ -209,7 +209,7 @@ func TestSetWithExat(t *testing.T) { func(t *testing.T) { conn := exec.ConnectToServer() - deleteKey(t, conn, exec, "k") + DeleteKey(t, conn, exec, "k") resp, err := exec.FireCommandAndReadResponse(conn, "SET k v PXAT "+Etime+" EXAT "+Etime) assert.Nil(t, err) From 7a389a59aa3daef58a3d75638007cb50321d613f Mon Sep 17 00:00:00 2001 From: psr Date: Thu, 17 Oct 2024 13:25:26 +0530 Subject: [PATCH 18/25] renamed qwatch to q.watch --- internal/server/utils/redisCmdAdapter.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/server/utils/redisCmdAdapter.go b/internal/server/utils/redisCmdAdapter.go index 18f281dd2..ce19a33bc 100644 --- a/internal/server/utils/redisCmdAdapter.go +++ b/internal/server/utils/redisCmdAdapter.go @@ -34,7 +34,7 @@ const ( JSON = "json" ) -const QWatch string = "QWATCH" +const QWatch string = "Q.WATCH" func ParseHTTPRequest(r *http.Request) (*cmd.DiceDBCmd, error) { commandParts := strings.Split(strings.TrimPrefix(r.URL.Path, "/"), "/") @@ -146,7 +146,7 @@ func ParseWebsocketMessage(msg []byte) (*cmd.DiceDBCmd, error) { // remove quotes from query string cmdStr, err := strconv.Unquote(cmdStr) if err != nil { - return nil, fmt.Errorf("error parsing qwatch query: %v", err) + return nil, fmt.Errorf("error parsing q.watch query: %v", err) } cmdArr = []string{cmdStr} } else { From ec046081302a6ea845e02d0e6a06293610bbd4e1 Mon Sep 17 00:00:00 2001 From: psr Date: Thu, 17 Oct 2024 13:25:47 +0530 Subject: [PATCH 19/25] fixed qwatch tests --- integration_tests/commands/websocket/qwatch_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/integration_tests/commands/websocket/qwatch_test.go b/integration_tests/commands/websocket/qwatch_test.go index ebac2662f..27c8009f0 100644 --- a/integration_tests/commands/websocket/qwatch_test.go +++ b/integration_tests/commands/websocket/qwatch_test.go @@ -17,12 +17,12 @@ func TestQWatch(t *testing.T) { }{ { name: "Wrong number of arguments", - cmds: []string{"QWATCH "}, - expect: "ERR wrong number of arguments for 'qwatch' command", + cmds: []string{"Q.WATCH "}, + expect: "ERR wrong number of arguments for 'q.watch' command", }, { name: "Invalid query", - cmds: []string{"QWATCH \"SELECT \""}, + cmds: []string{"Q.WATCH \"SELECT \""}, expect: "error parsing SQL statement: syntax error at position 8", }, // TODO - once following query is registered, websocket will also attempt sending updates @@ -30,8 +30,8 @@ func TestQWatch(t *testing.T) { // Add unregister test case to handle this scenario once qunwatch support is added { name: "Successful register", - cmds: []string{`QWATCH "SELECT $key, $value WHERE $key like 'qwatch-test-key?'"`}, - expect: []interface{}{"qwatch", "SELECT $key, $value WHERE $key like 'qwatch-test-key?'", []interface{}{}}, + cmds: []string{`Q.WATCH "SELECT $key, $value WHERE $key like 'test-key?'"`}, + expect: []interface{}{"q.watch", "SELECT $key, $value WHERE $key like 'test-key?'", []interface{}{}}, }, } From bd64c93f59461e694e88f83c1a5f369cf1d55bf9 Mon Sep 17 00:00:00 2001 From: psr Date: Thu, 17 Oct 2024 13:30:47 +0530 Subject: [PATCH 20/25] fixed json tests --- .../commands/websocket/json_test.go | 33 +++++++++++-------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/integration_tests/commands/websocket/json_test.go b/integration_tests/commands/websocket/json_test.go index 2a9b67965..dbc181505 100644 --- a/integration_tests/commands/websocket/json_test.go +++ b/integration_tests/commands/websocket/json_test.go @@ -1,8 +1,9 @@ package websocket import ( - "gotest.tools/v3/assert" "testing" + + "github.com/stretchr/testify/assert" ) func TestJSONClearOperations(t *testing.T) { @@ -10,10 +11,11 @@ func TestJSONClearOperations(t *testing.T) { conn := exec.ConnectToServer() defer conn.Close() - exec.FireCommand(conn, "DEL user") + DeleteKey(t, conn, exec, "user") defer func() { - resp := exec.FireCommand(conn, "DEL user") + resp, err := exec.FireCommandAndReadResponse(conn, "DEL user") + assert.Nil(t, err) assert.Equal(t, float64(1), resp) }() @@ -86,7 +88,8 @@ func TestJSONClearOperations(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { for i, cmd := range tc.commands { - result := exec.FireCommand(conn, cmd) + result, err := exec.FireCommandAndReadResponse(conn, cmd) + assert.Nil(t, err) assert.Equal(t, tc.expected[i], result) } }) @@ -99,10 +102,11 @@ func TestJsonStrlen(t *testing.T) { defer conn.Close() - exec.FireCommand(conn, "DEL doc") + DeleteKey(t, conn, exec, "doc") defer func() { - resp := exec.FireCommand(conn, "DEL doc") + resp, err := exec.FireCommandAndReadResponse(conn, "DEL doc") + assert.Nil(t, err) assert.Equal(t, float64(1), resp) }() @@ -172,12 +176,13 @@ func TestJsonStrlen(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { for i, cmd := range tc.commands { - result := exec.FireCommand(conn, cmd) + result, err := exec.FireCommandAndReadResponse(conn, cmd) + assert.Nil(t, err, "error: %v", err) stringResult, ok := result.(string) if ok { assert.Equal(t, tc.expected[i], stringResult) } else { - assert.Assert(t, arraysArePermutations(tc.expected[i].([]interface{}), result.([]interface{}))) + assert.True(t, arraysArePermutations(tc.expected[i].([]interface{}), result.([]interface{}))) } } }) @@ -189,7 +194,7 @@ func TestJsonObjLen(t *testing.T) { conn := exec.ConnectToServer() defer conn.Close() - exec.FireCommand(conn, "DEL obj") + DeleteKey(t, conn, exec, "obj") a := `{"name":"jerry","partner":{"name":"tom","language":["rust"]}}` b := `{"name":"jerry","partner":{"name":"tom","language":["rust"]},"partner2":{"name":"spike","language":["go","rust"]}}` @@ -197,7 +202,8 @@ func TestJsonObjLen(t *testing.T) { d := `["this","is","an","array"]` defer func() { - resp := exec.FireCommand(conn, "DEL obj") + resp, err := exec.FireCommandAndReadResponse(conn, "DEL obj") + assert.Nil(t, err) assert.Equal(t, float64(1), resp) }() @@ -259,13 +265,14 @@ func TestJsonObjLen(t *testing.T) { } for _, tcase := range testCases { - exec.FireCommand(conn, "DEL obj") + DeleteKey(t, conn, exec, "obj") t.Run(tcase.name, func(t *testing.T) { for i := 0; i < len(tcase.commands); i++ { cmd := tcase.commands[i] out := tcase.expected[i] - result := exec.FireCommand(conn, cmd) - assert.DeepEqual(t, out, result) + result, err := exec.FireCommandAndReadResponse(conn, cmd) + assert.Nil(t, err) + assert.Equal(t, out, result) } }) } From 4e44c019b1954012ab828641666b01a313d61196 Mon Sep 17 00:00:00 2001 From: psr Date: Thu, 17 Oct 2024 13:34:12 +0530 Subject: [PATCH 21/25] fixed unit tests to use Q.WATCH --- internal/server/utils/redisCmdAdapter_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/server/utils/redisCmdAdapter_test.go b/internal/server/utils/redisCmdAdapter_test.go index fb1c9da42..a046a5284 100644 --- a/internal/server/utils/redisCmdAdapter_test.go +++ b/internal/server/utils/redisCmdAdapter_test.go @@ -388,14 +388,14 @@ func TestParseWebsocketMessage(t *testing.T) { }, { name: "Test simple QWATCH command", - message: "qwatch \"select $key, $value where $key like 'k?'\"", - expectedCmd: "QWATCH", + message: "q.watch \"select $key, $value where $key like 'k?'\"", + expectedCmd: "Q.WATCH", expectedArgs: []string{"select $key, $value where $key like 'k?'"}, }, { name: "Test complex QWATCH command", - message: "qwatch \"SELECT $key, $value WHERE $key LIKE 'player:*' AND '$value.score' > 10 ORDER BY $value.score DESC LIMIT 5\"", - expectedCmd: "QWATCH", + message: "q.watch \"SELECT $key, $value WHERE $key LIKE 'player:*' AND '$value.score' > 10 ORDER BY $value.score DESC LIMIT 5\"", + expectedCmd: "Q.WATCH", expectedArgs: []string{"SELECT $key, $value WHERE $key LIKE 'player:*' AND '$value.score' > 10 ORDER BY $value.score DESC LIMIT 5"}, }, } From 0544da0632387f5a5792b2293bd2aeb07cd70356 Mon Sep 17 00:00:00 2001 From: psr Date: Thu, 17 Oct 2024 13:34:44 +0530 Subject: [PATCH 22/25] minor fix --- internal/server/utils/redisCmdAdapter_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/server/utils/redisCmdAdapter_test.go b/internal/server/utils/redisCmdAdapter_test.go index a046a5284..929b3b1f9 100644 --- a/internal/server/utils/redisCmdAdapter_test.go +++ b/internal/server/utils/redisCmdAdapter_test.go @@ -387,13 +387,13 @@ func TestParseWebsocketMessage(t *testing.T) { expectedArgs: []string{"", "$..field", `{"field":"value"}`}, }, { - name: "Test simple QWATCH command", + name: "Test simple Q.WATCH command", message: "q.watch \"select $key, $value where $key like 'k?'\"", expectedCmd: "Q.WATCH", expectedArgs: []string{"select $key, $value where $key like 'k?'"}, }, { - name: "Test complex QWATCH command", + name: "Test complex Q.WATCH command", message: "q.watch \"SELECT $key, $value WHERE $key LIKE 'player:*' AND '$value.score' > 10 ORDER BY $value.score DESC LIMIT 5\"", expectedCmd: "Q.WATCH", expectedArgs: []string{"SELECT $key, $value WHERE $key LIKE 'player:*' AND '$value.score' > 10 ORDER BY $value.score DESC LIMIT 5"}, From 97a44964bb20335e6f10247de51021ccefea603c Mon Sep 17 00:00:00 2001 From: psr Date: Fri, 18 Oct 2024 15:41:03 +0530 Subject: [PATCH 23/25] added warning failing to read message from client --- internal/server/websocketServer.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/internal/server/websocketServer.go b/internal/server/websocketServer.go index 7ca776bd5..a411ff6d8 100644 --- a/internal/server/websocketServer.go +++ b/internal/server/websocketServer.go @@ -128,6 +128,7 @@ func (s *WebsocketServer) WebsocketHandler(w http.ResponseWriter, r *http.Reques // read incoming message _, msg, err := conn.ReadMessage() if err != nil { + s.logger.Warn("failed to read message from client", slog.Any("localAddr", conn.LocalAddr())) break } @@ -162,7 +163,7 @@ func (s *WebsocketServer) WebsocketHandler(w http.ResponseWriter, r *http.Reques WebsocketOp: true, } - // handle qwatch commands + // handle q.watch commands if diceDBCmd.Cmd == Qwatch || diceDBCmd.Cmd == Subscribe { clientIdentifierID := generateUniqueInt32(r) sp.Client = comm.NewHTTPQwatchClient(s.qwatchResponseChan, clientIdentifierID) @@ -185,7 +186,7 @@ func (s *WebsocketServer) processQwatchUpdates(clientIdentifierID uint32, conn * case resp := <-s.qwatchResponseChan: if resp.ClientIdentifierID == clientIdentifierID { if err := s.processResponse(conn, dicDBCmd, resp); err != nil { - s.logger.Debug("Error writing response to client. Shutting down goroutine for qwatch updates", slog.Any("clientIdentifierID", clientIdentifierID), slog.Any("error", err)) + s.logger.Debug("Error writing response to client. Shutting down goroutine for q.watch updates", slog.Any("clientIdentifierID", clientIdentifierID), slog.Any("error", err)) return } } From 4fc97a277c1f41a0694a71cf51b382728bc30f92 Mon Sep 17 00:00:00 2001 From: psr Date: Fri, 18 Oct 2024 15:47:04 +0530 Subject: [PATCH 24/25] updated docs --- docs/src/content/docs/commands/QWATCH.md | 34 ++++++++++++------------ internal/server/websocketServer.go | 1 + 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/docs/src/content/docs/commands/QWATCH.md b/docs/src/content/docs/commands/QWATCH.md index 6a0181243..dcd359450 100644 --- a/docs/src/content/docs/commands/QWATCH.md +++ b/docs/src/content/docs/commands/QWATCH.md @@ -1,9 +1,9 @@ --- -title: QWATCH -description: The `QWATCH` command is a novel feature designed to provide real-time updates to clients based on changes in underlying data. +title: Q.WATCH +description: The `Q.WATCH` command is a novel feature designed to provide real-time updates to clients based on changes in underlying data. --- -The `QWATCH` command is a novel feature designed to provide real-time updates to clients based on changes in underlying data. It operates similarly to the `SUBSCRIBE` command but focuses on SQL-like queries over data structures. Whenever data modifications affect the query's results, the updated result set is pushed to the subscribed client. This eliminates the need for clients to constantly poll for changes. +The `Q.WATCH` command is a novel feature designed to provide real-time updates to clients based on changes in underlying data. It operates similarly to the `SUBSCRIBE` command but focuses on SQL-like queries over data structures. Whenever data modifications affect the query's results, the updated result set is pushed to the subscribed client. This eliminates the need for clients to constantly poll for changes. This command is what makes DiceDB different from Redis and uniquely positions it as the easiest and most intuitive way to build real-time reactive applications like leaderboards. @@ -13,12 +13,12 @@ This command is what makes DiceDB different from Redis and uniquely positions it | -------- | --------- | | TCP-RESP | ✅ | | HTTP | ✅ | -| WebSocket| ❌ | +| WebSocket| ✅ | ## Syntax ``` -QWATCH dsql-query +Q.WATCH dsql-query ``` ## Parameters @@ -72,12 +72,12 @@ Supported features: ## Example Usage with Query Flow -Let's explore a practical example of using the `QWATCH` command to create a real-time leaderboard for a game match, including filtering with a `WHERE` clause. +Let's explore a practical example of using the `Q.WATCH` command to create a real-time leaderboard for a game match, including filtering with a `WHERE` clause. ### Query ```bash -127.0.0.1:7379> QWATCH "SELECT $key, $value WHERE $key like 'match:100:*' AND $value > 10 ORDER BY $value DESC LIMIT 3" +127.0.0.1:7379> Q.WATCH "SELECT $key, $value WHERE $key like 'match:100:*' AND $value > 10 ORDER BY $value DESC LIMIT 3" ``` This query does the following: @@ -91,11 +91,11 @@ This query does the following: Imagine we're tracking player scores in a game match with ID 100. Each player's score is stored in a key formatted as `match:100:user:`. -Let's walk through a series of updates and see how the `QWATCH` command responds. Please note +Let's walk through a series of updates and see how the `Q.WATCH` command responds. Please note that the response will be RESP encoded and parsing will be handled by the SDK that you are using. 1. Initial state (empty leaderboard): - QWATCH response: `[] (empty array)` + Q.WATCH response: `[] (empty array)` 2. Player 0 scores 5 points: @@ -103,7 +103,7 @@ that the response will be RESP encoded and parsing will be handled by the SDK th 127.0.0.1:7379> SET match:100:user:0 5 ``` - QWATCH response: `[] (no change, score <= 10)` + Q.WATCH response: `[] (no change, score <= 10)` 3. Player 1 scores 15 points: @@ -111,7 +111,7 @@ that the response will be RESP encoded and parsing will be handled by the SDK th 127.0.0.1:7379> SET match:100:user:1 15 ``` - QWATCH response: `[["match:100:user:1", "15"]]` + Q.WATCH response: `[["match:100:user:1", "15"]]` 4. Player 2 scores 20 points: @@ -119,7 +119,7 @@ that the response will be RESP encoded and parsing will be handled by the SDK th 127.0.0.1:7379> SET match:100:user:2 20 ``` - QWATCH response: `[["match:100:user:2", "20"], ["match:100:user:1", "15"]]` + Q.WATCH response: `[["match:100:user:2", "20"], ["match:100:user:1", "15"]]` 5. Player 3 scores 12 points: @@ -127,7 +127,7 @@ that the response will be RESP encoded and parsing will be handled by the SDK th 127.0.0.1:7379> SET match:100:user:3 12 ``` - QWATCH response: `[["match:100:user:2", "20"], ["match:100:user:1", "15"], ["match:100:user:3", "12"]]` + Q.WATCH response: `[["match:100:user:2", "20"], ["match:100:user:1", "15"], ["match:100:user:3", "12"]]` 6. Player 4 scores 25 points: @@ -135,21 +135,21 @@ that the response will be RESP encoded and parsing will be handled by the SDK th 127.0.0.1:7379> SET match:100:user:4 25 ``` - QWATCH response: `[["match:100:user:4", "25"], ["match:100:user:2", "20"], ["match:100:user:1", "15"]]` + Q.WATCH response: `[["match:100:user:4", "25"], ["match:100:user:2", "20"], ["match:100:user:1", "15"]]` 7. Player 0 improves their score to 30: ```bash 127.0.0.1:7379> SET match:100:user:0 30 ``` - QWATCH response: `[["match:100:user:0", "30"], ["match:100:user:4", "25"], ["match:100:user:2", "20"]]` + Q.WATCH response: `[["match:100:user:0", "30"], ["match:100:user:4", "25"], ["match:100:user:2", "20"]]` -This example demonstrates how `QWATCH` provides real-time updates as the leaderboard changes, always keeping clients informed of the top 3 scores above 10, without the need for constant polling. +This example demonstrates how `Q.WATCH` provides real-time updates as the leaderboard changes, always keeping clients informed of the top 3 scores above 10, without the need for constant polling. ## Error Handling - `ERR invalid query`: If the provided query is malformed or unsupported. - `ERR syntax error`: If the query syntax is incorrect. -- `ERR unknown command`: If the `QWATCH` command is not implemented. +- `ERR unknown command`: If the `Q.WATCH` command is not implemented. - `ERR max number of subscriptions reached`: If the maximum number of allowed subscriptions is exceeded. ## Best Practices diff --git a/internal/server/websocketServer.go b/internal/server/websocketServer.go index a411ff6d8..3e38488d5 100644 --- a/internal/server/websocketServer.go +++ b/internal/server/websocketServer.go @@ -143,6 +143,7 @@ func (s *WebsocketServer) WebsocketHandler(w http.ResponseWriter, r *http.Reques continue } + // TODO - on abort, close client connection instead of closing server? if diceDBCmd.Cmd == Abort { close(s.shutdownChan) break From a90fceb8b0e423364ffd43de7f952f9b272c5727 Mon Sep 17 00:00:00 2001 From: psr Date: Sat, 19 Oct 2024 16:52:30 +0530 Subject: [PATCH 25/25] minor fix --- integration_tests/commands/websocket/main_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration_tests/commands/websocket/main_test.go b/integration_tests/commands/websocket/main_test.go index 9940a4cb0..cc330a19d 100644 --- a/integration_tests/commands/websocket/main_test.go +++ b/integration_tests/commands/websocket/main_test.go @@ -25,7 +25,6 @@ func TestMain(m *testing.M) { Logger: l, } ctx, cancel := context.WithCancel(context.Background()) - defer cancel() RunWebsocketServer(ctx, &wg, opts) // Wait for the server to start @@ -39,6 +38,7 @@ func TestMain(m *testing.M) { conn := executor.ConnectToServer() executor.FireCommand(conn, "abort") + cancel() wg.Wait() os.Exit(exitCode) }