Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: add xadd, xlen, xrange, xrevrange and xread for stream support #449

Merged
merged 3 commits into from
Jun 23, 2018

Conversation

critocrito
Copy link
Contributor

Support basic stream operations (https://redis.io/topics/streams-intro) for
the upcoming Redis 5 release. Some features are missing, such as consumer
groups, capped streams or some of the auxiliary commands.

Event ids are auto incremented integers in the form of 1-0, 2-0,
etc. Events are stored internally as an array of arrays and for every event an
additional record is kept whether the event has been already polled or not.

const redis = new MockRedis({
  mystream: [
    ["1-0", ["key1", "val1", "key2", "val2"]],
    ["2-0", ["key1", "val1", "key2", "val2"]]
  ],
  "stream:mystream:1-0": {polled: false},
  "stream:mystream:2-0": {polled: false},
})

To poll events in a blocking manner is supported and the poll happens every
100ms.

// This will resolve once a new event becomes available.
redis.xread("BLOCK", "0", "STREAMS", "mystream", "$").then(console.log);
// After 1 second add a new event, that will resolve the waiting `XREAD`
// command.
setTimeout(() => redis.xadd("mystream", "*", "key", "val"), 1000);

ioredis doesn't support Redis streams yet, but there is pull request for redis-commands to add support. In the meantime ioredis' createBuiltinCommand can be used to add support manually:

import Redis from "ioredis";

const {string: xadd} = Redis.prototype.createBuiltinCommand("xadd");
const {string: xread} = Redis.prototype.createBuiltinCommand("xread");
Redis.prototype.xadd = xadd;
Redis.prototype.xread = xread;
const redis = new Redis();

critocrito and others added 3 commits June 20, 2018 10:56
Support basic stream operations (https://redis.io/topics/streams-intro) for
the upcoming Redis 5 release. Some features are missing, such as consumer
groups, capped streams or some of the auxiliary commands.

Event ids are auto incremented integers in the form of `1-0`, `2-0`,
etc. Events are stored internally as an array of arrays and for every event an
additional record is kept whether the event has been already polled or not.

```
const redis = new MockRedis({
  mystream: [
    ["1-0", ["key1", "val1", "key2", "val2"]],
    ["2-0", ["key1", "val1", "key2", "val2"]]
  ],
  "stream:mystream:1-0": {polled: false},
  "stream:mystream:2-0": {polled: false},
})
```

To poll events in a blocking manner is supported and the poll happens every
100ms.

```
// This will resolve once a new event becomes available.
redis.xread("BLOCK", "0", "STREAMS", "mystream", "$").then(console.log);
// After 1 second add a new event, that will resolve the waiting `XREAD`
// command.
setTimeout(() => redis.xadd("mystream", "*", "key", "val"), 1000);
```
Copy link
Owner

@stipsan stipsan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey thanks for this! I'll merge and prepare a release 😄

@stipsan stipsan merged commit 4103c63 into stipsan:master Jun 23, 2018
stipsan added a commit that referenced this pull request Jun 23, 2018
## Added

### `xadd`, `xlen`, `xrange`, `xrevrange` and `xread` for stream support (#449 @critocrito)

Support basic stream operations (https://redis.io/topics/streams-intro) for
the upcoming Redis 5 release. Some features are missing, such as consumer
groups, capped streams or some of the auxiliary commands.

Event ids are auto incremented integers in the form of `1-0`, `2-0`,
etc. Events are stored internally as an array of arrays and for every event an
additional record is kept whether the event has been already polled or not.

```
const redis = new MockRedis({
  mystream: [
    ["1-0", ["key1", "val1", "key2", "val2"]],
    ["2-0", ["key1", "val1", "key2", "val2"]]
  ],
  "stream:mystream:1-0": {polled: false},
  "stream:mystream:2-0": {polled: false},
})
```

To poll events in a blocking manner is supported and the poll happens every
100ms.

```
// This will resolve once a new event becomes available.
redis.xread("BLOCK", "0", "STREAMS", "mystream", "$").then(console.log);
// After 1 second add a new event, that will resolve the waiting `XREAD`
// command.
setTimeout(() => redis.xadd("mystream", "*", "key", "val"), 1000);
```
`ioredis` doesn't support Redis streams yet, but there is [pull request for `redis-commands`](NodeRedis/redis-commands#20) to add support. In the meantime `ioredis`' `createBuiltinCommand` can be used to add support manually:

```
import Redis from "ioredis";

const {string: xadd} = Redis.prototype.createBuiltinCommand("xadd");
const {string: xread} = Redis.prototype.createBuiltinCommand("xread");
Redis.prototype.xadd = xadd;
Redis.prototype.xread = xread;
const redis = new Redis();
```
@critocrito critocrito deleted the redis-5-streams branch June 25, 2018 15:33
@erulabs
Copy link

erulabs commented Jun 26, 2018

@critocrito Heyo! I can't seem to get this working on redis 5.0-rc3 and redis unstable, and it seems like the client stays blocked on the XREAD and the XADD is never sent to the Redis server. info clients shows a blocked_client, and command logging shows that I don't actually ever get the XADD. Is there some non-mocked trick to getting this working? My understanding is that XREAD is intentionally blocking, and there is no equivalent to "UNSUBSCRIBE" for streams, so in this example case we would need two redis handles (for adding and reading accordingly)

Note: The example / ioredis code all works perfectly fine (against a real redis5.0-rc3), but two Redis handles are required.

@critocrito
Copy link
Contributor Author

Hello @erulabs, I'm not sure I exactly understand where your problem lies. My application has a discrete process that pushes events on a stream, and a discreet process that polls events from a stream. Those processes really operate independently from each other, so yes, they do have separate handles to Redis.

You can use the same handle within the same process if you use asynchronous IO using Promises. I can only point you to examples how to use this code: the unit tests of ioredis-mock and here again some unit tests in some other project that I work on.

@erulabs
Copy link

erulabs commented Jun 27, 2018

Just that the example code above of:

// This will resolve once a new event becomes available.
redis.xread("BLOCK", "0", "STREAMS", "mystream", "$").then(console.log);
// After 1 second add a new event, that will resolve the waiting `XREAD`
// command.
setTimeout(() => redis.xadd("mystream", "*", "key", "val"), 1000);

will not work against a real, running redis 5 service, since the xadd will never be executed, as the xread is blocking the redis connection. It doesn't block the node.js thread, so the xadd javascript function is called, but the real redis server would not get that call, and thus the xread promise would not resolve. If, from another process, you executed an xadd, then the xread would resolve.

I can provide some example code if you want (or maybe a container so its easy to replicate against a running redis service), but just wanted to point out that the example and the mock test would not work in the real world.

@critocrito
Copy link
Contributor Author

Ahh I see. Yeah, the above code snippet relates to a unit test using ioredis-mock. I have never tried the exact same code on a real redis client. Thanks for pointing it out. My real world use case doesn't push and pull to the same stream within the same process or on the same client handle.

@erulabs
Copy link

erulabs commented Jun 27, 2018

No problem - thanks for the library! Worth noting that I brought this up on the redis mailing lists and Antirez himself came to the rescue, and we now have a "CLIENT UNBLOCK" syntax (here is the discussion: https://groups.google.com/forum/#!topic/redis-db/bfgZ-3QgCmM)! Doesn't exactly resolve the syntax above, but it's a very nice feature!

@stipsan
Copy link
Owner

stipsan commented Jul 18, 2018

Thanks for the detailed info @erulabs and @critocrito!

I would love for this to behave more accurately. The whole point of this library is to be as accurate in its emulation as possible. Right now there is no emulation of things like network connectivity, sentinels or pub/sub. I'll make an issue linking to the information above to ensure it'll be handled correctly in the future.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants