Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node: BLMOVE #2027

Merged
merged 15 commits into from
Jul 30, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#### Changes
* Node: Added BLMOVE command ([#2027](https://github.com/valkey-io/valkey-glide/pull/2027))
* Node: Exported client configuration types ([#2023](https://github.com/valkey-io/valkey-glide/pull/2023))
* Java, Python: Update docs for GEOSEARCH command ([#2017](https://github.com/valkey-io/valkey-glide/pull/2017))
* Node: Added FUNCTION LIST command ([#2019](https://github.com/valkey-io/valkey-glide/pull/2019))
Expand Down
48 changes: 48 additions & 0 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ import {
createLInsert,
createLLen,
createLMove,
createBLMove,
createLPop,
createLPos,
createLPush,
Expand Down Expand Up @@ -1623,6 +1624,53 @@ export class BaseClient {
);
}

/**
* Blocks the connection until it pops atomically and removes the left/right-most element to the
* list stored at `source` depending on `whereFrom`, and pushes the element at the first/last element
* of the list stored at `destination` depending on `whereTo`.
* `BLMOVE` is the blocking variant of {@link lmove}.
*
* @remarks
* 1. When in cluster mode, both `source` and `destination` must map to the same hash slot.
* 2. `BLMOVE` is a client blocking command, see https://github.com/aws/glide-for-redis/wiki/General-Concepts#blocking-commands for more details and best practices.
*
* See https://valkey.io/commands/blmove/ for details.
*
* @param source - The key to the source list.
* @param destination - The key to the destination list.
* @param whereFrom - The {@link ListDirection} to remove the element from.
* @param whereTo - The {@link ListDirection} to add the element to.
* @param timeout - The number of seconds to wait for a blocking operation to complete. A value of `0` will block indefinitely.
* @returns The popped element, or `null` if `source` does not exist or if the operation timed-out.
*
* since Valkey version 6.2.0.
*
* @example
* ```typescript
* await client.lpush("testKey1", ["two", "one"]);
* await client.lpush("testKey2", ["four", "three"]);
* const result = await client.blmove("testKey1", "testKey2", ListDirection.LEFT, ListDirection.LEFT, 0.1);
* console.log(result); // Output: "one"
*
* const result2 = await client.lrange("testKey1", 0, -1);
* console.log(result2); // Output: "two"
*
* const updated_array2 = await client.lrange("testKey2", 0, -1);
* console.log(updated_array2); // Output: ["one", "three", "four"]
* ```
*/
public async blmove(
source: string,
destination: string,
whereFrom: ListDirection,
whereTo: ListDirection,
timeout: number,
): Promise<string | null> {
return this.createWritePromise(
createBLMove(source, destination, whereFrom, whereTo, timeout),
);
}

/**
* Sets the list element at `index` to `element`.
* The index is zero-based, so `0` means the first element, `1` the second element and so on.
Expand Down
19 changes: 19 additions & 0 deletions node/src/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,25 @@ export function createLMove(
]);
}

/**
* @internal
*/
export function createBLMove(
source: string,
destination: string,
whereFrom: ListDirection,
whereTo: ListDirection,
timeout: number,
): command_request.Command {
return createCommand(RequestType.BLMove, [
source,
destination,
whereFrom,
whereTo,
timeout.toString(),
]);
}

/**
* @internal
*/
Expand Down
36 changes: 36 additions & 0 deletions node/src/Transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ import {
createLInsert,
createLLen,
createLMove,
createBLMove,
createLPop,
createLPos,
createLPush,
Expand Down Expand Up @@ -819,6 +820,41 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
);
}

/**
*
* Blocks the connection until it pops atomically and removes the left/right-most element to the
* list stored at `source` depending on `whereFrom`, and pushes the element at the first/last element
* of the list stored at `destination` depending on `whereTo`.
* `BLMOVE` is the blocking variant of {@link lmove}.
*
* @remarks
* 1. When in cluster mode, both `source` and `destination` must map to the same hash slot.
* 2. `BLMOVE` is a client blocking command, see https://github.com/aws/glide-for-redis/wiki/General-Concepts#blocking-commands for more details and best practices.
*
* See https://valkey.io/commands/blmove/ for details.
*
* @param source - The key to the source list.
* @param destination - The key to the destination list.
* @param whereFrom - The {@link ListDirection} to remove the element from.
* @param whereTo - The {@link ListDirection} to add the element to.
* @param timeout - The number of seconds to wait for a blocking operation to complete. A value of `0` will block indefinitely.
*
* Command Response - The popped element, or `null` if `source` does not exist or if the operation timed-out.
*
* since Valkey version 6.2.0.
*/
public blmove(
source: string,
destination: string,
whereFrom: ListDirection,
whereTo: ListDirection,
timeout: number,
): T {
return this.addAndReturn(
createBLMove(source, destination, whereFrom, whereTo, timeout),
);
}

/**
* Sets the list element at `index` to `element`.
* The index is zero-based, so `0` means the first element, `1` the second element and so on.
Expand Down
33 changes: 33 additions & 0 deletions node/tests/RedisClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import {
transactionTest,
validateTransactionResponse,
} from "./TestUtilities";
import { ListDirection } from "..";

/* eslint-disable @typescript-eslint/no-var-requires */

Expand Down Expand Up @@ -126,6 +127,38 @@ describe("GlideClient", () => {
},
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
"check that blocking commands returns never timeout_%p",
async (protocol) => {
client = await GlideClient.createClient(
getClientConfigurationOption(
cluster.getAddresses(),
protocol,
300,
),
);

const blmovePromise = client.blmove(
"source",
"destination",
ListDirection.LEFT,
ListDirection.LEFT,
0.1,
);
const timeoutPromise = new Promise((resolve) => {
setTimeout(resolve, 500);
});

try {
await Promise.race([blmovePromise, timeoutPromise]);
} finally {
Promise.resolve(blmovePromise);
client.close();
}
},
5000,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
"select dbsize flushdb test %p",
async (protocol) => {
Expand Down
8 changes: 8 additions & 0 deletions node/tests/RedisClusterClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
FunctionListResponse,
GlideClusterClient,
InfoOptions,
ListDirection,
ProtocolVersion,
Routes,
ScoreFilter,
Expand Down Expand Up @@ -324,6 +325,13 @@ describe("GlideClusterClient", () => {

if (gte(cluster.getVersion(), "6.2.0")) {
promises.push(
client.blmove(
"abc",
"def",
ListDirection.LEFT,
ListDirection.LEFT,
0.2,
),
client.zdiff(["abc", "zxy", "lkn"]),
client.zdiffWithScores(["abc", "zxy", "lkn"]),
client.zdiffstore("abc", ["zxy", "lkn"]),
Expand Down
127 changes: 127 additions & 0 deletions node/tests/SharedTests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1362,6 +1362,133 @@ export function runBaseTests<Context>(config: {
config.timeout,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`blmove list_%p`,
async (protocol) => {
await runTest(async (client: BaseClient, cluster) => {
if (cluster.checkIfServerVersionLessThan("6.2.0")) {
return;
}

const key1 = "{key}-1" + uuidv4();
const key2 = "{key}-2" + uuidv4();
const lpushArgs1 = ["2", "1"];
const lpushArgs2 = ["4", "3"];

// Initialize the tests
expect(await client.lpush(key1, lpushArgs1)).toEqual(2);
expect(await client.lpush(key2, lpushArgs2)).toEqual(2);

// Move from LEFT to LEFT with blocking
checkSimple(
await client.blmove(
key1,
key2,
ListDirection.LEFT,
ListDirection.LEFT,
0.1,
),
).toEqual("1");

// Move from LEFT to RIGHT with blocking
checkSimple(
await client.blmove(
key1,
key2,
ListDirection.LEFT,
ListDirection.RIGHT,
0.1,
),
).toEqual("2");

checkSimple(await client.lrange(key2, 0, -1)).toEqual([
"1",
"3",
"4",
"2",
]);
checkSimple(await client.lrange(key1, 0, -1)).toEqual([]);

// Move from RIGHT to LEFT non-existing destination with blocking
checkSimple(
await client.blmove(
key2,
key1,
ListDirection.RIGHT,
ListDirection.LEFT,
0.1,
),
).toEqual("2");

checkSimple(await client.lrange(key2, 0, -1)).toEqual([
"1",
"3",
"4",
]);
checkSimple(await client.lrange(key1, 0, -1)).toEqual(["2"]);

// Move from RIGHT to RIGHT with blocking
checkSimple(
await client.blmove(
key2,
key1,
ListDirection.RIGHT,
ListDirection.RIGHT,
0.1,
),
).toEqual("4");

checkSimple(await client.lrange(key2, 0, -1)).toEqual([
"1",
"3",
]);
checkSimple(await client.lrange(key1, 0, -1)).toEqual([
"2",
"4",
]);

// Non-existing source key with blocking
expect(
await client.blmove(
"{key}-non_existing_key" + uuidv4(),
key1,
ListDirection.LEFT,
ListDirection.LEFT,
0.1,
),
).toEqual(null);

// Non-list source key with blocking
const key3 = "{key}-3" + uuidv4();
checkSimple(await client.set(key3, "value")).toEqual("OK");
await expect(
client.blmove(
key3,
key1,
ListDirection.LEFT,
ListDirection.LEFT,
0.1,
),
).rejects.toThrow(RequestError);

// Non-list destination key
await expect(
client.blmove(
key1,
key3,
ListDirection.LEFT,
ListDirection.LEFT,
0.1,
),
).rejects.toThrow(RequestError);

// TODO: add test case with 0 timeout (no timeout) should never time out,
// but we wrap the test with timeout to avoid test failing or stuck forever
}, protocol);
},
config.timeout,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`lset test_%p`,
async (protocol) => {
Expand Down
19 changes: 14 additions & 5 deletions node/tests/TestUtilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -542,13 +542,22 @@ export async function transactionTest(
field + "3",
]);

baseTransaction.lpopCount(key5, 2);
responseData.push(["lpopCount(key5, 2)", [field + "2"]]);
} else {
baseTransaction.lpopCount(key5, 2);
responseData.push(["lpopCount(key5, 2)", [field + "3", field + "2"]]);
baseTransaction.blmove(
key20,
key5,
ListDirection.LEFT,
ListDirection.LEFT,
3,
);
responseData.push([
"blmove(key20, key5, ListDirection.LEFT, ListDirection.LEFT, 3)",
field + "3",
]);
}

baseTransaction.lpopCount(key5, 2);
responseData.push(["lpopCount(key5, 2)", [field + "3", field + "2"]]);

baseTransaction.linsert(
key5,
InsertPosition.Before,
Expand Down
Loading