From eb2c37eae6097b4cd1013036c5adb75666974421 Mon Sep 17 00:00:00 2001 From: Charley DAVID Date: Mon, 10 Jul 2023 19:26:58 +0200 Subject: [PATCH 1/4] fix(client): XCLAIM & XAUTOCLAIM after a TRIM might return nils --- .../client/lib/commands/XAUTOCLAIM.spec.ts | 64 ++++++++++++++++++- packages/client/lib/commands/XAUTOCLAIM.ts | 6 +- packages/client/lib/commands/XCLAIM.spec.ts | 40 ++++++++++++ packages/client/lib/commands/XCLAIM.ts | 2 +- .../lib/commands/generic-transformers.spec.ts | 33 ++++++++++ .../lib/commands/generic-transformers.ts | 28 +++++--- 6 files changed, 158 insertions(+), 15 deletions(-) diff --git a/packages/client/lib/commands/XAUTOCLAIM.spec.ts b/packages/client/lib/commands/XAUTOCLAIM.spec.ts index 4447a06d77..a9d6e8f14e 100644 --- a/packages/client/lib/commands/XAUTOCLAIM.spec.ts +++ b/packages/client/lib/commands/XAUTOCLAIM.spec.ts @@ -23,7 +23,7 @@ describe('XAUTOCLAIM', () => { }); }); - testUtils.testWithClient('client.xAutoClaim', async client => { + testUtils.testWithClient('client.xAutoClaim without messages', async client => { await Promise.all([ client.xGroupCreate('key', 'group', '$', { MKSTREAM: true @@ -39,4 +39,66 @@ describe('XAUTOCLAIM', () => { } ); }, GLOBAL.SERVERS.OPEN); + + testUtils.testWithClient('client.xAutoClaim with messages', async client => { + const [,,id,] = await Promise.all([ + client.xGroupCreate('key', 'group', '$', { + MKSTREAM: true + }), + client.xGroupCreateConsumer('key', 'group', 'consumer'), + client.xAdd('key', '*', { foo: 'bar' }), + client.xReadGroup('group', 'consumer', { key: 'key', id: '>' }) + ]); + + assert.deepEqual( + await client.xAutoClaim('key', 'group', 'consumer', 1, '0-0'), + { + nextId: '0-0', + messages: [{ + id, + message: Object.create(null, { 'foo': { + value: 'bar', + configurable: true, + enumerable: true + } }) + }] + } + ); + }, GLOBAL.SERVERS.OPEN); + + testUtils.testWithClient('client.xAutoClaim with trimmed messages', async client => { + const [,,,,,id2,] = await Promise.all([ + client.xGroupCreate('key', 'group', '$', { + MKSTREAM: true + }), + client.xGroupCreateConsumer('key', 'group', 'consumer'), + client.xAdd('key', '*', { foo: 'bar' }), + client.xReadGroup('group', 'consumer', { key: 'key', id: '>' }), + client.xTrim('key', 'MAXLEN', 0), + client.xAdd('key', '*', { bar: 'baz' }), + client.xReadGroup('group', 'consumer', { key: 'key', id: '>' }), + ]); + + assert.deepEqual( + await client.xAutoClaim('key', 'group', 'consumer', 1, '0-0'), + { + nextId: '0-0', + messages: testUtils.isVersionGreaterThan([7, 0]) ? [{ + id: id2, + message: Object.create(null, { 'bar': { + value: 'baz', + configurable: true, + enumerable: true + } }) + }] : [null, { + id: id2, + message: Object.create(null, { 'bar': { + value: 'baz', + configurable: true, + enumerable: true + } }) + }] + } + ); + }, GLOBAL.SERVERS.OPEN); }); diff --git a/packages/client/lib/commands/XAUTOCLAIM.ts b/packages/client/lib/commands/XAUTOCLAIM.ts index 4bf46057ba..831563981a 100644 --- a/packages/client/lib/commands/XAUTOCLAIM.ts +++ b/packages/client/lib/commands/XAUTOCLAIM.ts @@ -1,5 +1,5 @@ import { RedisCommandArgument, RedisCommandArguments } from '.'; -import { StreamMessagesReply, transformStreamMessagesReply } from './generic-transformers'; +import { StreamMessagesNullReply, transformStreamMessagesNullReply } from './generic-transformers'; export const FIRST_KEY_INDEX = 1; @@ -28,12 +28,12 @@ type XAutoClaimRawReply = [RedisCommandArgument, Array]; interface XAutoClaimReply { nextId: RedisCommandArgument; - messages: StreamMessagesReply; + messages: StreamMessagesNullReply; } export function transformReply(reply: XAutoClaimRawReply): XAutoClaimReply { return { nextId: reply[0], - messages: transformStreamMessagesReply(reply[1]) + messages: transformStreamMessagesNullReply(reply[1]) }; } diff --git a/packages/client/lib/commands/XCLAIM.spec.ts b/packages/client/lib/commands/XCLAIM.spec.ts index 141a62ab77..ad95f34277 100644 --- a/packages/client/lib/commands/XCLAIM.spec.ts +++ b/packages/client/lib/commands/XCLAIM.spec.ts @@ -87,4 +87,44 @@ describe('XCLAIM', () => { [] ); }, GLOBAL.SERVERS.OPEN); + + testUtils.testWithClient('client.xClaim with a message', async client => { + const [,,id,] = await Promise.all([ + client.xGroupCreate('key', 'group', '$', { + MKSTREAM: true + }), + client.xGroupCreateConsumer('key', 'group', 'consumer'), + client.xAdd('key', '*', { foo: 'bar' }), + client.xReadGroup('group', 'consumer', { key: 'key', id: '>' }) + ]); + + assert.deepEqual( + await client.xClaim('key', 'group', 'consumer', 1, id), + [{ + id, + message: Object.create(null, { 'foo': { + value: 'bar', + configurable: true, + enumerable: true + } }) + }] + ); + }, GLOBAL.SERVERS.OPEN); + + testUtils.testWithClient('client.xClaim with a trimmed message', async client => { + const [,,id,,,] = await Promise.all([ + client.xGroupCreate('key', 'group', '$', { + MKSTREAM: true + }), + client.xGroupCreateConsumer('key', 'group', 'consumer'), + client.xAdd('key', '*', { foo: 'bar' }), + client.xReadGroup('group', 'consumer', { key: 'key', id: '>' }), + client.xTrim('key', 'MAXLEN', 0), + ]); + + assert.deepEqual( + await client.xClaim('key', 'group', 'consumer', 1, id), + testUtils.isVersionGreaterThan([7, 0]) ? []: [null] + ); + }, GLOBAL.SERVERS.OPEN); }); diff --git a/packages/client/lib/commands/XCLAIM.ts b/packages/client/lib/commands/XCLAIM.ts index bc38f9b9e9..e7b458e237 100644 --- a/packages/client/lib/commands/XCLAIM.ts +++ b/packages/client/lib/commands/XCLAIM.ts @@ -45,4 +45,4 @@ export function transformArguments( return args; } -export { transformStreamMessagesReply as transformReply } from './generic-transformers'; +export { transformStreamMessagesNullReply as transformReply } from './generic-transformers'; diff --git a/packages/client/lib/commands/generic-transformers.spec.ts b/packages/client/lib/commands/generic-transformers.spec.ts index 301cab0a75..60caf26eaa 100644 --- a/packages/client/lib/commands/generic-transformers.spec.ts +++ b/packages/client/lib/commands/generic-transformers.spec.ts @@ -9,6 +9,7 @@ import { transformStringNumberInfinityArgument, transformTuplesReply, transformStreamMessagesReply, + transformStreamMessagesNullReply, transformStreamsMessagesReply, transformSortedSetWithScoresReply, pushGeoCountArgument, @@ -219,6 +220,38 @@ describe('Generic Transformers', () => { ); }); + it('transformStreamMessagesNullReply', () => { + assert.deepEqual( + transformStreamMessagesNullReply([null, ['0-0', ['0key', '0value']]]), + [null, { + id: '0-0', + message: Object.create(null, { + '0key': { + value: '0value', + configurable: true, + enumerable: true + } + }) + }] + ); + }); + + it('transformStreamMessagesNullReply', () => { + assert.deepEqual( + transformStreamMessagesNullReply([null, ['0-1', ['11key', '11value']]]), + [null, { + id: '0-1', + message: Object.create(null, { + '11key': { + value: '11value', + configurable: true, + enumerable: true + } + }) + }] + ); + }); + describe('transformStreamsMessagesReply', () => { it('null', () => { assert.equal( diff --git a/packages/client/lib/commands/generic-transformers.ts b/packages/client/lib/commands/generic-transformers.ts index 5048de9399..4cf610a036 100644 --- a/packages/client/lib/commands/generic-transformers.ts +++ b/packages/client/lib/commands/generic-transformers.ts @@ -92,19 +92,27 @@ export interface StreamMessageReply { message: Record; } -export type StreamMessagesReply = Array; +export function transformStreamMessageReply([id, message]: Array): StreamMessageReply { + return { + id, + message: transformTuplesReply(message) + }; +} -export function transformStreamMessagesReply(reply: Array): StreamMessagesReply { - const messages = []; +export function transformStreamMessageNullReply(reply: Array): StreamMessageReply | null { + if (reply === null) return null; + return transformStreamMessageReply(reply); +} - for (const [id, message] of reply) { - messages.push({ - id, - message: transformTuplesReply(message) - }); - } - return messages; +export type StreamMessagesReply = Array; +export function transformStreamMessagesReply(reply: Array): StreamMessagesReply { + return reply.map(transformStreamMessageReply); +} + +export type StreamMessagesNullReply = Array; +export function transformStreamMessagesNullReply(reply: Array): StreamMessagesNullReply { + return reply.map(transformStreamMessageNullReply); } export type StreamsMessagesReply = Array<{ From 8a8bec6c7baa46320377ffd3b254af334261cd1e Mon Sep 17 00:00:00 2001 From: Charley DAVID Date: Sat, 22 Jul 2023 10:33:37 +0200 Subject: [PATCH 2/4] fix(client): Fix race condition in specs --- .../client/lib/commands/XAUTOCLAIM.spec.ts | 42 +++++++------------ packages/client/lib/commands/XCLAIM.spec.ts | 30 +++++-------- packages/test-utils/lib/dockers.ts | 2 +- packages/test-utils/lib/index.ts | 1 + 4 files changed, 28 insertions(+), 47 deletions(-) diff --git a/packages/client/lib/commands/XAUTOCLAIM.spec.ts b/packages/client/lib/commands/XAUTOCLAIM.spec.ts index a9d6e8f14e..9c48753a60 100644 --- a/packages/client/lib/commands/XAUTOCLAIM.spec.ts +++ b/packages/client/lib/commands/XAUTOCLAIM.spec.ts @@ -25,9 +25,7 @@ describe('XAUTOCLAIM', () => { testUtils.testWithClient('client.xAutoClaim without messages', async client => { await Promise.all([ - client.xGroupCreate('key', 'group', '$', { - MKSTREAM: true - }), + client.xGroupCreate('key', 'group', '$', { MKSTREAM: true }), client.xGroupCreateConsumer('key', 'group', 'consumer'), ]); @@ -41,17 +39,13 @@ describe('XAUTOCLAIM', () => { }, GLOBAL.SERVERS.OPEN); testUtils.testWithClient('client.xAutoClaim with messages', async client => { - const [,,id,] = await Promise.all([ - client.xGroupCreate('key', 'group', '$', { - MKSTREAM: true - }), - client.xGroupCreateConsumer('key', 'group', 'consumer'), - client.xAdd('key', '*', { foo: 'bar' }), - client.xReadGroup('group', 'consumer', { key: 'key', id: '>' }) - ]); + await client.xGroupCreate('key', 'group', '$', { MKSTREAM: true }); + await client.xGroupCreateConsumer('key', 'group', 'consumer'); + const id = await client.xAdd('key', '*', { foo: 'bar' }); + await client.xReadGroup('group', 'consumer', { key: 'key', id: '>' }); assert.deepEqual( - await client.xAutoClaim('key', 'group', 'consumer', 1, '0-0'), + await client.xAutoClaim('key', 'group', 'consumer', 0, '0-0'), { nextId: '0-0', messages: [{ @@ -67,31 +61,27 @@ describe('XAUTOCLAIM', () => { }, GLOBAL.SERVERS.OPEN); testUtils.testWithClient('client.xAutoClaim with trimmed messages', async client => { - const [,,,,,id2,] = await Promise.all([ - client.xGroupCreate('key', 'group', '$', { - MKSTREAM: true - }), - client.xGroupCreateConsumer('key', 'group', 'consumer'), - client.xAdd('key', '*', { foo: 'bar' }), - client.xReadGroup('group', 'consumer', { key: 'key', id: '>' }), - client.xTrim('key', 'MAXLEN', 0), - client.xAdd('key', '*', { bar: 'baz' }), - client.xReadGroup('group', 'consumer', { key: 'key', id: '>' }), - ]); + await client.xGroupCreate('key', 'group', '$', { MKSTREAM: true }); + await client.xGroupCreateConsumer('key', 'group', 'consumer'); + await client.xAdd('key', '*', { foo: 'bar' }); + await client.xReadGroup('group', 'consumer', { key: 'key', id: '>' }); + await client.xTrim('key', 'MAXLEN', 0); + const id = await client.xAdd('key', '*', { bar: 'baz' }); + await client.xReadGroup('group', 'consumer', { key: 'key', id: '>' }); assert.deepEqual( - await client.xAutoClaim('key', 'group', 'consumer', 1, '0-0'), + await client.xAutoClaim('key', 'group', 'consumer', 0, '0-0'), { nextId: '0-0', messages: testUtils.isVersionGreaterThan([7, 0]) ? [{ - id: id2, + id, message: Object.create(null, { 'bar': { value: 'baz', configurable: true, enumerable: true } }) }] : [null, { - id: id2, + id, message: Object.create(null, { 'bar': { value: 'baz', configurable: true, diff --git a/packages/client/lib/commands/XCLAIM.spec.ts b/packages/client/lib/commands/XCLAIM.spec.ts index ad95f34277..6626e84c73 100644 --- a/packages/client/lib/commands/XCLAIM.spec.ts +++ b/packages/client/lib/commands/XCLAIM.spec.ts @@ -83,23 +83,18 @@ describe('XCLAIM', () => { }); assert.deepEqual( - await client.xClaim('key', 'group', 'consumer', 1, '0-0'), + await client.xClaim('key', 'group', 'consumer', 0, '0-0'), [] ); }, GLOBAL.SERVERS.OPEN); testUtils.testWithClient('client.xClaim with a message', async client => { - const [,,id,] = await Promise.all([ - client.xGroupCreate('key', 'group', '$', { - MKSTREAM: true - }), - client.xGroupCreateConsumer('key', 'group', 'consumer'), - client.xAdd('key', '*', { foo: 'bar' }), - client.xReadGroup('group', 'consumer', { key: 'key', id: '>' }) - ]); + await client.xGroupCreate('key', 'group', '$', { MKSTREAM: true }); + const id = await client.xAdd('key', '*', { foo: 'bar' }); + await client.xReadGroup('group', 'consumer', { key: 'key', id: '>' }); assert.deepEqual( - await client.xClaim('key', 'group', 'consumer', 1, id), + await client.xClaim('key', 'group', 'consumer', 0, id), [{ id, message: Object.create(null, { 'foo': { @@ -112,18 +107,13 @@ describe('XCLAIM', () => { }, GLOBAL.SERVERS.OPEN); testUtils.testWithClient('client.xClaim with a trimmed message', async client => { - const [,,id,,,] = await Promise.all([ - client.xGroupCreate('key', 'group', '$', { - MKSTREAM: true - }), - client.xGroupCreateConsumer('key', 'group', 'consumer'), - client.xAdd('key', '*', { foo: 'bar' }), - client.xReadGroup('group', 'consumer', { key: 'key', id: '>' }), - client.xTrim('key', 'MAXLEN', 0), - ]); + await client.xGroupCreate('key', 'group', '$', { MKSTREAM: true }); + const id = await client.xAdd('key', '*', { foo: 'bar' }); + await client.xReadGroup('group', 'consumer', { key: 'key', id: '>' }); + await client.xTrim('key', 'MAXLEN', 0); assert.deepEqual( - await client.xClaim('key', 'group', 'consumer', 1, id), + await client.xClaim('key', 'group', 'consumer', 0, id), testUtils.isVersionGreaterThan([7, 0]) ? []: [null] ); }, GLOBAL.SERVERS.OPEN); diff --git a/packages/test-utils/lib/dockers.ts b/packages/test-utils/lib/dockers.ts index a7e1c610ee..7ad523247c 100644 --- a/packages/test-utils/lib/dockers.ts +++ b/packages/test-utils/lib/dockers.ts @@ -14,7 +14,7 @@ interface ErrorWithCode extends Error { async function isPortAvailable(port: number): Promise { try { - const socket = createConnection({ port }); + const socket = createConnection({ host: '0.0.0.0', port }); await once(socket, 'connect'); socket.end(); } catch (err) { diff --git a/packages/test-utils/lib/index.ts b/packages/test-utils/lib/index.ts index b9195c5717..c0b7a61a1f 100644 --- a/packages/test-utils/lib/index.ts +++ b/packages/test-utils/lib/index.ts @@ -136,6 +136,7 @@ export default class TestUtils { const client = RedisClient.create({ ...options?.clientOptions, + url: 'redis://0.0.0.0', socket: { ...options?.clientOptions?.socket, port: (await dockerPromise).port From 6021b32be2fdc95fb43811d861dd75c0f7f1e97f Mon Sep 17 00:00:00 2001 From: Leibale Eidelman Date: Mon, 18 Sep 2023 18:56:17 -0400 Subject: [PATCH 3/4] revert test utils changes --- packages/test-utils/lib/dockers.ts | 2 +- packages/test-utils/lib/index.ts | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/test-utils/lib/dockers.ts b/packages/test-utils/lib/dockers.ts index 7ad523247c..a7e1c610ee 100644 --- a/packages/test-utils/lib/dockers.ts +++ b/packages/test-utils/lib/dockers.ts @@ -14,7 +14,7 @@ interface ErrorWithCode extends Error { async function isPortAvailable(port: number): Promise { try { - const socket = createConnection({ host: '0.0.0.0', port }); + const socket = createConnection({ port }); await once(socket, 'connect'); socket.end(); } catch (err) { diff --git a/packages/test-utils/lib/index.ts b/packages/test-utils/lib/index.ts index c0b7a61a1f..b9195c5717 100644 --- a/packages/test-utils/lib/index.ts +++ b/packages/test-utils/lib/index.ts @@ -136,7 +136,6 @@ export default class TestUtils { const client = RedisClient.create({ ...options?.clientOptions, - url: 'redis://0.0.0.0', socket: { ...options?.clientOptions?.socket, port: (await dockerPromise).port From 76d36f223a0814044c4ae449f53a40aaae830dcc Mon Sep 17 00:00:00 2001 From: Leibale Eidelman Date: Tue, 19 Sep 2023 17:21:08 -0400 Subject: [PATCH 4/4] make tests faster --- .../client/lib/commands/XAUTOCLAIM.spec.ts | 94 ++++++++++--------- 1 file changed, 49 insertions(+), 45 deletions(-) diff --git a/packages/client/lib/commands/XAUTOCLAIM.spec.ts b/packages/client/lib/commands/XAUTOCLAIM.spec.ts index 9c48753a60..bae914bda0 100644 --- a/packages/client/lib/commands/XAUTOCLAIM.spec.ts +++ b/packages/client/lib/commands/XAUTOCLAIM.spec.ts @@ -24,71 +24,75 @@ describe('XAUTOCLAIM', () => { }); testUtils.testWithClient('client.xAutoClaim without messages', async client => { - await Promise.all([ + const [,, reply] = await Promise.all([ client.xGroupCreate('key', 'group', '$', { MKSTREAM: true }), client.xGroupCreateConsumer('key', 'group', 'consumer'), + client.xAutoClaim('key', 'group', 'consumer', 1, '0-0') ]); - assert.deepEqual( - await client.xAutoClaim('key', 'group', 'consumer', 1, '0-0'), - { - nextId: '0-0', - messages: [] - } - ); + assert.deepEqual(reply, { + nextId: '0-0', + messages: [] + }); }, GLOBAL.SERVERS.OPEN); testUtils.testWithClient('client.xAutoClaim with messages', async client => { - await client.xGroupCreate('key', 'group', '$', { MKSTREAM: true }); - await client.xGroupCreateConsumer('key', 'group', 'consumer'); - const id = await client.xAdd('key', '*', { foo: 'bar' }); - await client.xReadGroup('group', 'consumer', { key: 'key', id: '>' }); + const [,, id,, reply] = await Promise.all([ + client.xGroupCreate('key', 'group', '$', { MKSTREAM: true }), + client.xGroupCreateConsumer('key', 'group', 'consumer'), + client.xAdd('key', '*', { foo: 'bar' }), + client.xReadGroup('group', 'consumer', { key: 'key', id: '>' }), + client.xAutoClaim('key', 'group', 'consumer', 0, '0-0') + ]); - assert.deepEqual( - await client.xAutoClaim('key', 'group', 'consumer', 0, '0-0'), - { - nextId: '0-0', - messages: [{ - id, - message: Object.create(null, { 'foo': { + assert.deepEqual(reply, { + nextId: '0-0', + messages: [{ + id, + message: Object.create(null, { + foo: { value: 'bar', configurable: true, enumerable: true - } }) - }] - } - ); + } + }) + }] + }); }, GLOBAL.SERVERS.OPEN); testUtils.testWithClient('client.xAutoClaim with trimmed messages', async client => { - await client.xGroupCreate('key', 'group', '$', { MKSTREAM: true }); - await client.xGroupCreateConsumer('key', 'group', 'consumer'); - await client.xAdd('key', '*', { foo: 'bar' }); - await client.xReadGroup('group', 'consumer', { key: 'key', id: '>' }); - await client.xTrim('key', 'MAXLEN', 0); - const id = await client.xAdd('key', '*', { bar: 'baz' }); - await client.xReadGroup('group', 'consumer', { key: 'key', id: '>' }); + const [,,,,, id,, reply] = await Promise.all([ + client.xGroupCreate('key', 'group', '$', { MKSTREAM: true }), + client.xGroupCreateConsumer('key', 'group', 'consumer'), + client.xAdd('key', '*', { foo: 'bar' }), + client.xReadGroup('group', 'consumer', { key: 'key', id: '>' }), + client.xTrim('key', 'MAXLEN', 0), + client.xAdd('key', '*', { bar: 'baz' }), + client.xReadGroup('group', 'consumer', { key: 'key', id: '>' }), + client.xAutoClaim('key', 'group', 'consumer', 0, '0-0') + ]); - assert.deepEqual( - await client.xAutoClaim('key', 'group', 'consumer', 0, '0-0'), - { - nextId: '0-0', - messages: testUtils.isVersionGreaterThan([7, 0]) ? [{ - id, - message: Object.create(null, { 'bar': { + assert.deepEqual(reply, { + nextId: '0-0', + messages: testUtils.isVersionGreaterThan([7, 0]) ? [{ + id, + message: Object.create(null, { + bar: { value: 'baz', configurable: true, enumerable: true - } }) - }] : [null, { - id, - message: Object.create(null, { 'bar': { + } + }) + }] : [null, { + id, + message: Object.create(null, { + bar: { value: 'baz', configurable: true, enumerable: true - } }) - }] - } - ); + } + }) + }] + }); }, GLOBAL.SERVERS.OPEN); });