Skip to content

Commit

Permalink
#1008: Added Q.WATCH support to Websocket (#1090)
Browse files Browse the repository at this point in the history
  • Loading branch information
psrvere authored Oct 21, 2024
1 parent 26e7e02 commit 846cef3
Show file tree
Hide file tree
Showing 10 changed files with 376 additions and 137 deletions.
50 changes: 25 additions & 25 deletions docs/src/content/docs/commands/QWATCH.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
---
title: QWATCH
description: The `QWATCH` command is a novel feature designed to provide real-time updates to clients based on changes in underlying data.
title: Q.WATCH
description: The `Q.WATCH` command is a novel feature designed to provide real-time updates to clients based on changes in underlying data.
---

The `QWATCH` command is a novel feature designed to provide real-time updates to clients based on changes in underlying
The `Q.WATCH` command is a novel feature designed to provide real-time updates to clients based on changes in underlying
data. It operates similarly to the `SUBSCRIBE` command but focuses on SQL-like queries over data structures. Whenever
data modifications affect the query's results, the updated result set is pushed to the subscribed client. This
eliminates the need for clients to constantly poll for changes.
Expand All @@ -13,16 +13,16 @@ to build real-time reactive applications like leaderboards.

## Protocol Support

| Protocol | Supported |
|-----------|-----------|
| TCP-RESP | |
| HTTP | |
| WebSocket | |
| Protocol | Supported |
| -------- | --------- |
| TCP-RESP ||
| HTTP ||
| WebSocket| |

## Syntax

```bash
QWATCH <dsql-query>
Q.WATCH <dsql-query>
```

## Parameters
Expand Down Expand Up @@ -96,7 +96,7 @@ Supported conditions:

1. `Missing query`

- Error Message: `(error) ERROR wrong number of arguments for 'qwatch' command`
- Error Message: `(error) ERROR wrong number of arguments for 'q.watch' command`
- Occurs if no DSQL Query is provided.

2. `Invalid query`:
Expand All @@ -113,12 +113,12 @@ Supported conditions:

### Basic Usage

Let's explore a practical example of using the `QWATCH` command to create a real-time leaderboard for a game match,
Let's explore a practical example of using the `Q.WATCH` command to create a real-time leaderboard for a game match,
including filtering with a `WHERE` clause.

```bash
127.0.0.1:7379> QWATCH "SELECT $key, $value WHERE $key like 'match:100:*' AND $value > 10 ORDER BY $value DESC LIMIT 3"
qwatch from SELECT $key, $value WHERE $key like 'match:100:*' AND $value > 10 ORDER BY $value asc: []
127.0.0.1:7379> Q.WATCH "SELECT $key, $value WHERE $key like 'match:100:*' AND $value > 10 ORDER BY $value DESC LIMIT 3"
q.watch from SELECT $key, $value WHERE $key like 'match:100:*' AND $value > 10 ORDER BY $value asc: []
```

This query does the following:
Expand All @@ -133,7 +133,7 @@ This query does the following:
Imagine we're tracking player scores in a game match with ID 100. Each player's score is stored in a key formatted as
`match:100:user:<userID>`.

Let's walk through a series of updates and see how the `QWATCH` command responds. Please note
Let's walk through a series of updates and see how the `Q.WATCH` command responds. Please note
that the response will be RESP encoded and parsing will be handled by the SDK that you are using.

1. Initial state (empty leaderboard): `[]`
Expand All @@ -152,9 +152,9 @@ that the response will be RESP encoded and parsing will be handled by the SDK th
127.0.0.1:7379> SET match:100:user:1 15
```

QWATCH Response:
Q.WATCH Response:
```bash
qwatch from SELECT $key, $value WHERE $key like 'match:100:*' and $value > 100 ORDER BY $value asc: `[["match:100:user:1", "15"]]`
q.watch from SELECT $key, $value WHERE $key like 'match:100:*' and $value > 100 ORDER BY $value asc: `[["match:100:user:1", "15"]]`
```

4. Player 2 scores 20 points:
Expand All @@ -163,9 +163,9 @@ that the response will be RESP encoded and parsing will be handled by the SDK th
127.0.0.1:7379> SET match:100:user:2 20
```

QWATCH Response:
Q.WATCH Response:
```bash
qwatch from SELECT $key, $value WHERE $key like 'match:100:*' and $value > 100 ORDER BY $value asc: `[["match:100:user:2", "20"], ["match:100:user:1", "15"]]`
q.watch from SELECT $key, $value WHERE $key like 'match:100:*' and $value > 100 ORDER BY $value asc: `[["match:100:user:2", "20"], ["match:100:user:1", "15"]]`
```

5. Player 3 scores 12 points:
Expand All @@ -174,9 +174,9 @@ that the response will be RESP encoded and parsing will be handled by the SDK th
127.0.0.1:7379> SET match:100:user:3 12
```

QWATCH Response:
Q.WATCH Response:
```bash
qwatch from SELECT $key, $value WHERE $key like 'match:100:*' and $value > 100 ORDER BY $value asc: `[["match:100:user:2", "20"], ["match:100:user:1", "15"], ["match:100:user:3", "12"]]`
q.watch from SELECT $key, $value WHERE $key like 'match:100:*' and $value > 100 ORDER BY $value asc: `[["match:100:user:2", "20"], ["match:100:user:1", "15"], ["match:100:user:3", "12"]]`
```

6. Player 4 scores 25 points:
Expand All @@ -185,9 +185,9 @@ that the response will be RESP encoded and parsing will be handled by the SDK th
127.0.0.1:7379> SET match:100:user:4 25
```

QWATCH Response:
Q.WATCH Response:
```bash
qwatch from SELECT $key, $value WHERE $key like 'match:100:*' and $value > 100 ORDER BY $value asc: `[["match:100:user:4", "25"], ["match:100:user:2", "20"], ["match:100:user:1", "15"]]`
q.watch from SELECT $key, $value WHERE $key like 'match:100:*' and $value > 100 ORDER BY $value asc: `[["match:100:user:4", "25"], ["match:100:user:2", "20"], ["match:100:user:1", "15"]]`
```

7. Player 0 improves their score to 30:
Expand All @@ -196,12 +196,12 @@ that the response will be RESP encoded and parsing will be handled by the SDK th
127.0.0.1:7379> SET match:100:user:0 30
```

QWATCH Response:
Q.WATCH Response:
```bash
qwatch from SELECT $key, $value WHERE $key like 'match:100:*' and $value > 100 ORDER BY $value asc: `[["match:100:user:0", "30"], ["match:100:user:4", "25"], ["match:100:user:2", "20"]]`
q.watch from SELECT $key, $value WHERE $key like 'match:100:*' and $value > 100 ORDER BY $value asc: `[["match:100:user:0", "30"], ["match:100:user:4", "25"], ["match:100:user:2", "20"]]`
```

This example demonstrates how `QWATCH` provides real-time updates as the leaderboard changes, always keeping clients
This example demonstrates how `Q.WATCH` provides real-time updates as the leaderboard changes, always keeping clients
informed of the top 3 scores above 10, without the need for constant polling.

## Best Practices
Expand Down
3 changes: 1 addition & 2 deletions integration_tests/commands/websocket/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ func TestMain(m *testing.M) {
Logger: l,
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
RunWebsocketServer(ctx, &wg, opts)

// Wait for the server to start
Expand All @@ -36,10 +35,10 @@ func TestMain(m *testing.M) {
// Run the test suite
exitCode := m.Run()

// abort
conn := executor.ConnectToServer()
executor.FireCommand(conn, "abort")

cancel()
wg.Wait()
os.Exit(exitCode)
}
53 changes: 53 additions & 0 deletions integration_tests/commands/websocket/qwatch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package websocket

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestQWatch(t *testing.T) {
exec := NewWebsocketCommandExecutor()
conn := exec.ConnectToServer()

testCases := []struct {
name string
cmds []string
expect interface{}
}{
{
name: "Wrong number of arguments",
cmds: []string{"Q.WATCH "},
expect: "ERR wrong number of arguments for 'q.watch' command",
},
{
name: "Invalid query",
cmds: []string{"Q.WATCH \"SELECT \""},
expect: "error parsing SQL statement: syntax error at position 8",
},
// TODO - once following query is registered, websocket will also attempt sending updates
// while keys are set for other tests in this package
// Add unregister test case to handle this scenario once qunwatch support is added
{
name: "Successful register",
cmds: []string{`Q.WATCH "SELECT $key, $value WHERE $key like 'test-key?'"`},
expect: []interface{}{"q.watch", "SELECT $key, $value WHERE $key like 'test-key?'", []interface{}{}},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
for _, cmd := range tc.cmds {
result, err := exec.FireCommandAndReadResponse(conn, cmd)
assert.Nil(t, err)
if _, ok := tc.expect.(string); ok {
// compare strings
assert.Equal(t, tc.expect, result, "Value mismatch for cmd %s", cmd)
} else {
// compare lists
assert.ElementsMatch(t, tc.expect, result, "Value mismatch for cmd %s", cmd)
}
}
})
}
}
12 changes: 11 additions & 1 deletion integration_tests/commands/websocket/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/dicedb/dice/config"
derrors "github.com/dicedb/dice/internal/errors"
"github.com/dicedb/dice/internal/querymanager"
"github.com/dicedb/dice/internal/server"
"github.com/dicedb/dice/internal/shard"
dstore "github.com/dicedb/dice/internal/store"
Expand Down Expand Up @@ -108,7 +109,9 @@ func RunWebsocketServer(ctx context.Context, wg *sync.WaitGroup, opt TestServerO
globalErrChannel := make(chan error)
watchChan := make(chan dstore.QueryWatchEvent, config.DiceConfig.Performance.WatchChanBufSize)
shardManager := shard.NewShardManager(1, watchChan, nil, globalErrChannel, opt.Logger)
testServer := server.NewWebSocketServer(shardManager, watchChan, testPort1, opt.Logger)
queryWatcherLocal := querymanager.NewQueryManager(opt.Logger)
config.WebsocketPort = opt.Port
testServer := server.NewWebSocketServer(shardManager, testPort1, opt.Logger)
shardManagerCtx, cancelShardManager := context.WithCancel(ctx)

// run shard manager
Expand All @@ -118,6 +121,13 @@ func RunWebsocketServer(ctx context.Context, wg *sync.WaitGroup, opt TestServerO
shardManager.Run(shardManagerCtx)
}()

// run query manager
wg.Add(1)
go func() {
defer wg.Done()
queryWatcherLocal.Run(ctx, watchChan)
}()

// start websocket server
wg.Add(1)
go func() {
Expand Down
4 changes: 2 additions & 2 deletions internal/eval/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -1776,7 +1776,7 @@ func evalMULTI(args []string, store *dstore.Store) []byte {
// Every time a key in the watch list is modified, the client will be sent a response
// containing the new value of the key along with the operation that was performed on it.
// Contains only one argument, the query to be watched.
func EvalQWATCH(args []string, httpOp bool, client *comm.Client, store *dstore.Store) []byte {
func EvalQWATCH(args []string, httpOp, websocketOp bool, client *comm.Client, store *dstore.Store) []byte {
if len(args) != 1 {
return diceerrors.NewErrArity("Q.WATCH")
}
Expand All @@ -1795,7 +1795,7 @@ func EvalQWATCH(args []string, httpOp bool, client *comm.Client, store *dstore.S
})
var watchSubscription querymanager.QuerySubscription

if httpOp {
if httpOp || websocketOp {
watchSubscription = querymanager.QuerySubscription{
Subscribe: true,
Query: query,
Expand Down
11 changes: 1 addition & 10 deletions internal/eval/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,6 @@ func ExecuteCommand(c *cmd.DiceDBCmd, client *comm.Client, store *dstore.Store,
return &EvalResponse{Result: diceerrors.NewErrWithFormattedMessage("unknown command '%s', with args beginning with: %s", c.Cmd, strings.Join(c.Args, " ")), Error: nil}
}

// Till the time we refactor to handle QWATCH differently for websocket
if websocketOp {
if diceCmd.IsMigrated {
return diceCmd.NewEval(c.Args, store)
}

return &EvalResponse{Result: diceCmd.Eval(c.Args, store), Error: nil}
}

// Temporary logic till we move all commands to new eval logic.
// MigratedDiceCmds map contains refactored eval commands
// For any command we will first check in the existing map
Expand All @@ -40,7 +31,7 @@ func ExecuteCommand(c *cmd.DiceDBCmd, client *comm.Client, store *dstore.Store,
// Old implementation kept as it is, but we will be moving
// to the new implementation soon for all commands
case "SUBSCRIBE", "Q.WATCH":
return &EvalResponse{Result: EvalQWATCH(c.Args, httpOp, client, store), Error: nil}
return &EvalResponse{Result: EvalQWATCH(c.Args, httpOp, websocketOp, client, store), Error: nil}
case "UNSUBSCRIBE", "Q.UNWATCH":
return &EvalResponse{Result: EvalQUNWATCH(c.Args, httpOp, client), Error: nil}
case auth.Cmd:
Expand Down
38 changes: 32 additions & 6 deletions internal/server/utils/redisCmdAdapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"net/http"
"strconv"
"strings"

"github.com/dicedb/dice/internal/cmd"
Expand Down Expand Up @@ -33,6 +34,8 @@ const (
JSON = "json"
)

const QWatch string = "Q.WATCH"

func ParseHTTPRequest(r *http.Request) (*cmd.DiceDBCmd, error) {
commandParts := strings.Split(strings.TrimPrefix(r.URL.Path, "/"), "/")
if len(commandParts) == 0 {
Expand Down Expand Up @@ -117,16 +120,39 @@ func ParseHTTPRequest(r *http.Request) (*cmd.DiceDBCmd, error) {
}

func ParseWebsocketMessage(msg []byte) (*cmd.DiceDBCmd, error) {
cmdStr := string(msg)
cmdStr = strings.TrimSpace(cmdStr)

cmdStr := strings.TrimSpace(string(msg))
if cmdStr == "" {
return nil, diceerrors.ErrEmptyCommand
}

cmdArr := strings.Split(cmdStr, " ")
command := strings.ToUpper(cmdArr[0])
cmdArr = cmdArr[1:] // args
var command string
idx := strings.Index(cmdStr, " ")
// handle commands with no args
if idx == -1 {
command = strings.ToUpper(cmdStr)
return &cmd.DiceDBCmd{
Cmd: command,
Args: nil,
}, nil
}

// handle commands with args
command = strings.ToUpper(cmdStr[:idx])
cmdStr = cmdStr[idx+1:]

var cmdArr []string // args
// handle qwatch commands
if command == QWatch {
// remove quotes from query string
cmdStr, err := strconv.Unquote(cmdStr)
if err != nil {
return nil, fmt.Errorf("error parsing q.watch query: %v", err)
}
cmdArr = []string{cmdStr}
} else {
// handle other commands
cmdArr = strings.Split(cmdStr, " ")
}

// if key prefix is empty for JSON.INGEST command
// add "" to cmdArr
Expand Down
Loading

0 comments on commit 846cef3

Please sign in to comment.