Skip to content

Commit

Permalink
Feat: add xadd, xlen, xrange, xrevrange and xread for stream support (s…
Browse files Browse the repository at this point in the history
…tipsan#449)

Support basic stream operations (https://redis.io/topics/streams-intro) for
the upcoming Redis 5 release. Some features are missing, such as consumer
groups, capped streams or some of the auxiliary commands.

Event ids are auto incremented integers in the form of `1-0`, `2-0`,
etc. Events are stored internally as an array of arrays and for every event an
additional record is kept whether the event has been already polled or not.

```
const redis = new MockRedis({
  mystream: [
    ["1-0", ["key1", "val1", "key2", "val2"]],
    ["2-0", ["key1", "val1", "key2", "val2"]]
  ],
  "stream:mystream:1-0": {polled: false},
  "stream:mystream:2-0": {polled: false},
})
```

To poll events in a blocking manner is supported and the poll happens every
100ms.

```
// This will resolve once a new event becomes available.
redis.xread("BLOCK", "0", "STREAMS", "mystream", "$").then(console.log);
// After 1 second add a new event, that will resolve the waiting `XREAD`
// command.
setTimeout(() => redis.xadd("mystream", "*", "key", "val"), 1000);
```
`ioredis` doesn't support Redis streams yet, but there is [pull request for `redis-commands`](NodeRedis/redis-commands#20) to add support. In the meantime `ioredis`' `createBuiltinCommand` can be used to add support manually:

```
import Redis from "ioredis";

const {string: xadd} = Redis.prototype.createBuiltinCommand("xadd");
const {string: xread} = Redis.prototype.createBuiltinCommand("xread");
Redis.prototype.xadd = xadd;
Redis.prototype.xread = xread;
const redis = new Redis();
```
  • Loading branch information
critocrito authored and stipsan committed Jun 23, 2018
1 parent e86ebd9 commit 4103c63
Show file tree
Hide file tree
Showing 12 changed files with 593 additions and 0 deletions.
7 changes: 7 additions & 0 deletions compat.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,13 @@
| [unwatch](http://redis.io/commands/UNWATCH) | :white_check_mark: | :x: |
| [wait](http://redis.io/commands/WAIT) | :white_check_mark: | :x: |
| [watch](http://redis.io/commands/WATCH) | :white_check_mark: | :x: |
| [xadd](http://redis.io/commands/XADD) | :x: | :white_check_mark: |
| [xlen](http://redis.io/commands/XLEN) | :x: | :white_check_mark: |
| [xpending](http://redis.io/commands/XPENDING) | :x: | :x: |
| [xrange](http://redis.io/commands/XRANGE) | :x: | :white_check_mark: |
| [xread](http://redis.io/commands/XREAD) | :x: | :white_check_mark: |
| [xreadgroup](http://redis.io/commands/XREADGROUP) | :x: | :x: |
| [xrevrange](http://redis.io/commands/XREVRANGE) | :x: | :x: |
| [zadd](http://redis.io/commands/ZADD) | :white_check_mark: | :white_check_mark: |
| [zcard](http://redis.io/commands/ZCARD) | :white_check_mark: | :x: |
| [zcount](http://redis.io/commands/ZCOUNT) | :white_check_mark: | :x: |
Expand Down
5 changes: 5 additions & 0 deletions src/commands/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ export * from './time';
export * from './ttl';
export * from './type';
export * from './unsubscribe';
export * from './xadd';
export * from './xlen';
export * from './xrange';
export * from './xread';
export * from './xrevrange';
export * from './zadd';
export * from './zincrby';
export * from './zrange';
Expand Down
21 changes: 21 additions & 0 deletions src/commands/xadd.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
export function xadd(stream, id, ...args) {
if (!stream || !id || args.length === 0 || args.length % 2 !== 0) {
throw new Error("ERR wrong number of arguments for 'xadd' command");
}
if (!this.data.has(stream)) {
this.data.set(stream, []);
}
const eventId = `${id === '*' ? this.data.get(stream).length + 1 : id}-0`;
const list = this.data.get(stream);

if (list.length > 0 && list[0][0] === `${eventId}`) {
throw new Error(
'ERR The ID specified in XADD is equal or smaller than the target stream top item'
);
}

this.data.set(`stream:${stream}:${eventId}`, { polled: false });
this.data.set(stream, list.concat([[`${eventId}`, [...args]]]));

return `${eventId}`;
}
3 changes: 3 additions & 0 deletions src/commands/xlen.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export function xlen(stream) {
return (this.data.get(stream) || []).length;
}
27 changes: 27 additions & 0 deletions src/commands/xrange.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
export function xrange(stream, start, end, ...args) {
if (!stream || !start || !end) {
throw new Error("ERR wrong number of arguments for 'xrange' command");
}

const [COUNT, count] = args;

if (COUNT && !count) {
throw new Error('ERR syntax error');
}

if (!this.data.has(stream)) {
return [];
}

const list = this.data.get(stream);

const min = start === '-' ? -Infinity : start;
const max = end === '+' ? Infinity : end;

const result = list.filter(
([eventId]) => min <= parseInt(eventId, 10) && max >= parseInt(eventId, 10)
);

if (count) return result.slice(0, count);
return result;
}
68 changes: 68 additions & 0 deletions src/commands/xread.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
export function xread(option, ...args) {
const pollStream = (stream, id, count = 1) => {
const data = this.data.get(stream);
if (!data) return [];
return data.reduce((memo, [eventId, ...row]) => {
const { polled } = this.data.get(`stream:${stream}:${eventId}`);
if (!polled && (id === '$' || eventId >= id) && memo.length < count) {
this.data.set(`stream:${stream}:${eventId}`, { polled: true });
return [[stream, [eventId, ...row]]].concat(memo);
}
return memo;
}, []);
};

const { op, opVal, rest } =
option === 'STREAMS'
? { op: 'COUNT', opVal: Infinity, rest: args }
: {
op: option,
opVal: parseInt(args[0], 10),
rest: args.slice(2),
};

if (['COUNT', 'BLOCK'].indexOf(op) < 0) {
throw new Error('ERR syntax error');
}

if (rest.length % 2 !== 0) {
throw new Error(
"ERR Unbalanced XREAD list of streams: for each stream key an ID or '$' must be specified."
);
}

// Turn ["stream1", "stream2", "id1", "id2"] into tuples of
// [["stream1", "id1"], ["stream2", "id2"]]
const toPoll = rest.reduce((memo, arg, i) => {
const chunk = Math.floor(i / 2);
const tuple = memo[chunk] || [];
// eslint-disable-next-line no-param-reassign
memo[chunk] = tuple.concat(arg);
return memo;
}, []);

const pollEvents = (streams, countVal) =>
streams.reduce(
(memo, [stream, id]) => pollStream(stream, id, countVal).concat(memo),
[]
);

return op === 'BLOCK'
? new Promise(resolve => {
let timeElapsed = 0;
const f = () =>
setTimeout(() => {
if (opVal > 0 && timeElapsed < opVal) return resolve(null);
const events = pollEvents(toPoll, 1);
if (events.length > 0) return resolve(events);
timeElapsed += 100;
return f();
}, 100);
f();
})
: new Promise(resolve => {
const events = pollEvents(toPoll, opVal);
if (events.length === 0) return resolve(null);
return resolve(events.slice().reverse());
});
}
29 changes: 29 additions & 0 deletions src/commands/xrevrange.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
export function xrevrange(stream, end, start, ...args) {
if (!stream || !start || !end) {
throw new Error("ERR wrong number of arguments for 'xrevrange' command");
}

const [COUNT, count] = args;

if (COUNT && !count) {
throw new Error('ERR syntax error');
}

if (!this.data.has(stream)) {
return [];
}

const list = this.data
.get(stream)
.slice()
.reverse();
const min = start === '-' ? -Infinity : start;
const max = end === '+' ? Infinity : end;

const result = list.filter(
([eventId]) => min <= parseInt(eventId, 10) && max >= parseInt(eventId, 10)
);

if (count) return result.slice(0, count);
return result;
}
55 changes: 55 additions & 0 deletions test/commands/xadd.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import expect from 'expect';

import MockRedis from '../../src';

describe('xadd', () => {
it('should add events to a stream', () => {
const redis = new MockRedis();
return redis
.xadd('stream', '*', 'key', 'val')
.then(id => {
expect(id).toBe('1-0');
expect(redis.data.get('stream')).toEqual([['1-0', ['key', 'val']]]);
expect(redis.data.get(`stream:stream:${id}`)).toEqual({
polled: false,
});
})
.then(() => redis.xadd('stream', '*', 'key', 'val'))
.then(id => {
expect(id).toBe('2-0');
expect(redis.data.get('stream')).toEqual([
['1-0', ['key', 'val']],
['2-0', ['key', 'val']],
]);
expect(redis.data.get(`stream:stream:${id}`)).toEqual({
polled: false,
});
});
});

it('should throw with an illegal amount of arguments', () => {
const redis = new MockRedis();
return Promise.all([
redis.xadd().catch(err => err.message),
redis.xadd('stream').catch(err => err.message),
redis.xadd('stream', '*').catch(err => err.message),
redis.xadd('stream', '*', 'one').catch(err => err.message),
]).then(errors =>
errors.forEach(err =>
expect(err).toBe("ERR wrong number of arguments for 'xadd' command")
)
);
});

it('should throw with a duplicate id', () => {
const redis = new MockRedis();
redis
.xadd('stream', '*', 'key', 'value')
.then(id => redis.xadd('stream', id, 'key', 'value'))
.catch(err =>
expect(err.message).toBe(
'ERR The ID specified in XADD is equal or smaller than the target stream top item'
)
);
});
});
23 changes: 23 additions & 0 deletions test/commands/xlen.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import expect from 'expect';

import MockRedis from '../../src';

describe('xlen', () => {
it('should return the number of events in the stream', () => {
const redis = new MockRedis({
data: {
stream: [
['3-0', ['key', 'val']],
['2-0', ['key', 'val']],
['1-0', ['key', 'val']],
],
},
});
return redis.xlen('stream').then(len => expect(len).toBe(3));
});

it('should return 0 for a non existing stream', () => {
const redis = new MockRedis();
return redis.xlen('non-existing').then(len => expect(len).toBe(0));
});
});
98 changes: 98 additions & 0 deletions test/commands/xrange.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import expect from 'expect';

import MockRedis from '../../src';

describe('xrange', () => {
it('returns an empty list on a non existing stream', () => {
const redis = new MockRedis();
return redis
.xrange('non-existing', '-', '+')
.then(events => expect(events).toEqual([]));
});

it('returns the contents of the stream', () => {
const redis = new MockRedis({
data: {
stream: [
['1-0', ['key', 'val']],
['2-0', ['key', 'val']],
['3-0', ['key', 'val']],
['4-0', ['key', 'val']],
],
'stream:stream:1-0': { polled: false },
'stream:stream:2-0': { polled: false },
'stream:stream:3-0': { polled: false },
'stream:stream:4-0': { polled: false },
},
});
return Promise.all([
redis.xrange('stream', '-', '+'),
redis.xrange('stream', '2', '+'),
redis.xrange('stream', '-', '2'),
redis.xrange('stream', '2', '3'),
redis.xrange('stream', '2', '2'),
]).then(([events1, events2, events3, events4, events5]) => {
expect(events1).toEqual([
['1-0', ['key', 'val']],
['2-0', ['key', 'val']],
['3-0', ['key', 'val']],
['4-0', ['key', 'val']],
]);
expect(events2).toEqual([
['2-0', ['key', 'val']],
['3-0', ['key', 'val']],
['4-0', ['key', 'val']],
]);
expect(events3).toEqual([
['1-0', ['key', 'val']],
['2-0', ['key', 'val']],
]);
expect(events4).toEqual([
['2-0', ['key', 'val']],
['3-0', ['key', 'val']],
]);
expect(events5).toEqual([['2-0', ['key', 'val']]]);
});
});

it('should limit the count of events', () => {
const redis = new MockRedis({
data: {
stream: [
['1-0', ['key', 'val']],
['2-0', ['key', 'val']],
['3-0', ['key', 'val']],
],
'stream:stream:1-0': { polled: false },
'stream:stream:2-0': { polled: false },
'stream:stream:3-0': { polled: false },
},
});
return redis.xrange('stream', '-', '+', 'COUNT', '2').then(events => {
expect(events).toEqual([
['1-0', ['key', 'val']],
['2-0', ['key', 'val']],
]);
});
});

it('should throw with a wrong number of arguments', () => {
const redis = new MockRedis();
Promise.all([
redis.xrange('stream', '-').catch(err => err.message),
redis.xrange('stream').catch(err => err.message),
redis.xrange().catch(err => err.message),
]).then(errors =>
errors.forEach(err =>
expect(err).toBe("ERR wrong number of arguments for 'xrange' command")
)
);
});

it('should throw with a missing count', () => {
const redis = new MockRedis();
redis
.xrange('stream', '-', '+', 'COUNT')
.catch(err => expect(err.message).toBe('ERR syntax error'));
});
});
Loading

0 comments on commit 4103c63

Please sign in to comment.