Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node: add ZUNIONSTORE #2145

Merged
merged 14 commits into from
Aug 19, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#### Changes
* Node: Added ZUNIONSTORE command ([#2145](https://github.com/valkey-io/valkey-glide/pull/2145))
* Java: Added PUBSUB CHANNELS, NUMPAT and NUMSUB commands ([#2105](https://github.com/valkey-io/valkey-glide/pull/2105))
* Java: Added binary support for custom command ([#2109](https://github.com/valkey-io/valkey-glide/pull/2109))
* Node: Added SSCAN command ([#2132](https://github.com/valkey-io/valkey-glide/pull/2132))
Expand Down
37 changes: 37 additions & 0 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ import {
createZRevRankWithScore,
createZScan,
createZScore,
createZUnionStore,
TimeUnit,
cyip10 marked this conversation as resolved.
Show resolved Hide resolved
} from "./Commands";
import {
ClosingError,
Expand Down Expand Up @@ -3417,6 +3419,41 @@ export class BaseClient {
return this.createWritePromise(createZScore(key, member));
}

/**
* Computes the union of sorted sets given by the specified `keys` and stores the result in `destination`.
* If `destination` already exists, it is overwritten. Otherwise, a new sorted set will be created.
* To get the result directly, see `zunion_withscores`.
cyip10 marked this conversation as resolved.
Show resolved Hide resolved
*
* @see {@link https://valkey.io/commands/zunionstore/|valkey.io} for details.
cyip10 marked this conversation as resolved.
Show resolved Hide resolved
* @param destination - The key of the destination sorted set.
* @param keys - The keys of the sorted sets with possible formats:
* string[] - for keys only.
* KeyWeight[] - for weighted keys with score multipliers.
cyip10 marked this conversation as resolved.
Show resolved Hide resolved
* @param aggregationType - Specifies the aggregation strategy to apply when combining the scores of elements. See `AggregationType`.
cyip10 marked this conversation as resolved.
Show resolved Hide resolved
* @returns - The number of elements in the resulting sorted set stored at `destination`.
cyip10 marked this conversation as resolved.
Show resolved Hide resolved
*
* * @example
* ```typescript
* // Example usage of zunionstore command with an existing key
* await client.zadd("key1", {"member1": 10.5, "member2": 8.2})
* await client.zadd("key2", {"member1": 9.5})
* await client.zunionstore("my_sorted_set", ["key1", "key2"]) // Output: 2 - Indicates that the sorted set "my_sorted_set" contains two elements.
* await client.zrangeWithScores("my_sorted_set", RangeByIndex(0, -1)) // Output: {'member1': 20, 'member2': 8.2} - "member1" is now stored in "my_sorted_set" with score of 20 and "member2" with score of 8.2.
* await client.zunionstore("my_sorted_set", ["key1", "key2"] , AggregationType.MAX ) // Output: 2 - Indicates that the sorted set "my_sorted_set" contains two elements, and each score is the maximum score between the sets.
* await client.zrangeWithScores("my_sorted_set", RangeByIndex(0, -1)) // Output: {'member1': 10.5, 'member2': 8.2} - "member1" is now stored in "my_sorted_set" with score of 10.5 and "member2" with score of 8.2.
* await client.zunionstore("my_sorted_set", ["key1, "key2], {weights: [2, 1]}) // Output: 30.5 for "member1" and 16.4 for "member2"
cyip10 marked this conversation as resolved.
Show resolved Hide resolved
* ```
*/
public async zunionstore(
destination: string,
keys: string[] | KeyWeight[],
aggregationType?: AggregationType,
): Promise<number> {
return this.createWritePromise(
createZUnionStore(destination, keys, aggregationType),
);
}

/**
* Returns the scores associated with the specified `members` in the sorted set stored at `key`.
*
Expand Down
12 changes: 12 additions & 0 deletions node/src/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1525,6 +1525,18 @@ export function createZScore(
return createCommand(RequestType.ZScore, [key, member]);
}

/**
* @internal
*/
export function createZUnionStore(
destination: string,
keys: string[] | KeyWeight[],
aggregationType?: AggregationType,
): command_request.Command {
const args = createZCmdStoreArgs(destination, keys, aggregationType);
return createCommand(RequestType.ZUnionStore, args);
}

/**
* @internal
*/
Expand Down
25 changes: 25 additions & 0 deletions node/src/Transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ import {
createZRevRankWithScore,
createZScan,
createZScore,
createZUnionStore,
TimeUnit,
} from "./Commands";
import { command_request } from "./ProtobufMessage";

Expand Down Expand Up @@ -1751,6 +1753,29 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
return this.addAndReturn(createZScore(key, member));
}

/**
* Computes the union of sorted sets given by the specified `keys` and stores the result in `destination`.
* If `destination` already exists, it is overwritten. Otherwise, a new sorted set will be created.
* To get the result directly, see `zunion_withscores`.
*
cyip10 marked this conversation as resolved.
Show resolved Hide resolved
* @see {@link https://valkey.io/commands/zunionstore/|valkey.io} for details.
* @param destination - The key of the destination sorted set.
* @param keys - The keys of the sorted sets with possible formats:
* string[] - for keys only.
* KeyWeight[] - for weighted keys with score multipliers.
* @param aggregationType - Specifies the aggregation strategy to apply when combining the scores of elements. See `AggregationType`.
* Command Response - The number of elements in the resulting sorted set stored at `destination`.
jonathanl-bq marked this conversation as resolved.
Show resolved Hide resolved
*/
public zunionstore(
destination: string,
keys: string[] | KeyWeight[],
aggregationType?: AggregationType,
): T {
return this.addAndReturn(
createZUnionStore(destination, keys, aggregationType),
);
}

/**
* Returns the scores associated with the specified `members` in the sorted set stored at `key`.
*
Expand Down
190 changes: 190 additions & 0 deletions node/tests/SharedTests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3772,6 +3772,196 @@ export function runBaseTests<Context>(config: {
config.timeout,
);

// ZUnionStore command tests
async function zunionStoreWithMaxAggregation(client: BaseClient) {
const key1 = "{testKey}:1-" + uuidv4();
const key2 = "{testKey}:2-" + uuidv4();
const key3 = "{testKey}:3-" + uuidv4();
const range = {
start: 0,
stop: -1,
};

const membersScores1 = { one: 1.0, two: 2.0 };
const membersScores2 = { one: 1.5, two: 2.5, three: 3.5 };

expect(await client.zadd(key1, membersScores1)).toEqual(2);
expect(await client.zadd(key2, membersScores2)).toEqual(3);

// Union results are aggregated by the MAX score of elements
expect(await client.zunionStore(key3, [key1, key2], "MAX")).toEqual(3);
const zunionstoreMapMax = await client.zrangeWithScores(key3, range);
const expectedMapMax = {
one: 1.5,
two: 2.5,
three: 3.5,
};
expect(zunionstoreMapMax).toEqual(expectedMapMax);
}

async function zunionStoreWithMinAggregation(client: BaseClient) {
const key1 = "{testKey}:1-" + uuidv4();
const key2 = "{testKey}:2-" + uuidv4();
const key3 = "{testKey}:3-" + uuidv4();
const range = {
start: 0,
stop: -1,
};

const membersScores1 = { one: 1.0, two: 2.0 };
const membersScores2 = { one: 1.5, two: 2.5, three: 3.5 };

expect(await client.zadd(key1, membersScores1)).toEqual(2);
expect(await client.zadd(key2, membersScores2)).toEqual(3);

// Union results are aggregated by the MIN score of elements
expect(await client.zunionStore(key3, [key1, key2], "MIN")).toEqual(3);
const zunionstoreMapMin = await client.zrangeWithScores(key3, range);
const expectedMapMin = {
one: 1.0,
two: 2.0,
three: 3.5,
};
expect(zunionstoreMapMin).toEqual(expectedMapMin);
}

async function zunionStoreWithSumAggregation(client: BaseClient) {
const key1 = "{testKey}:1-" + uuidv4();
const key2 = "{testKey}:2-" + uuidv4();
const key3 = "{testKey}:3-" + uuidv4();
const range = {
start: 0,
stop: -1,
};

const membersScores1 = { one: 1.0, two: 2.0 };
const membersScores2 = { one: 1.5, two: 2.5, three: 3.5 };

expect(await client.zadd(key1, membersScores1)).toEqual(2);
expect(await client.zadd(key2, membersScores2)).toEqual(3);

// Union results are aggregated by the SUM score of elements
expect(await client.zunionStore(key3, [key1, key2], "SUM")).toEqual(3);
const zunionstoreMapSum = await client.zrangeWithScores(key3, range);
const expectedMapSum = {
one: 2.5,
two: 4.5,
three: 3.5,
};
expect(zunionstoreMapSum).toEqual(expectedMapSum);
}

async function zunionStoreBasicTest(client: BaseClient) {
const key1 = "{testKey}:1-" + uuidv4();
const key2 = "{testKey}:2-" + uuidv4();
const key3 = "{testKey}:3-" + uuidv4();
const range = {
start: 0,
stop: -1,
};

const membersScores1 = { one: 1.0, two: 2.0 };
const membersScores2 = { one: 2.0, two: 3.0, three: 4.0 };

expect(await client.zadd(key1, membersScores1)).toEqual(2);
expect(await client.zadd(key2, membersScores2)).toEqual(3);

expect(await client.zunionStore(key3, [key1, key2])).toEqual(3);
const zunionstoreMap = await client.zrangeWithScores(key3, range);
const expectedMap = {
one: 3.0,
three: 4.0,
two: 5.0,
};
expect(zunionstoreMap).toEqual(expectedMap);
}

async function zunionStoreWithWeightsAndAggregation(client: BaseClient) {
const key1 = "{testKey}:1-" + uuidv4();
const key2 = "{testKey}:2-" + uuidv4();
const key3 = "{testKey}:3-" + uuidv4();
const range = {
start: 0,
stop: -1,
};
const membersScores1 = { one: 1.0, two: 2.0 };
const membersScores2 = { one: 1.5, two: 2.5, three: 3.5 };

expect(await client.zadd(key1, membersScores1)).toEqual(2);
expect(await client.zadd(key2, membersScores2)).toEqual(3);

// Scores are multiplied by 2.0 for key1 and key2 during aggregation.
expect(
await client.zunionStore(
key3,
[
[key1, 2.0],
[key2, 2.0],
],
"SUM",
),
).toEqual(3);
const zunionstoreMapMultiplied = await client.zrangeWithScores(
key3,
range,
);
const expectedMapMultiplied = {
one: 5.0,
three: 7.0,
two: 9.0,
};
expect(zunionstoreMapMultiplied).toEqual(expectedMapMultiplied);
}

async function zunionStoreEmptyCases(client: BaseClient) {
const key1 = "{testKey}:1-" + uuidv4();
const key2 = "{testKey}:2-" + uuidv4();
const range = {
start: 0,
stop: -1,
};
const membersScores1 = { one: 1.0, two: 2.0 };

expect(await client.zadd(key1, membersScores1)).toEqual(2);

// Non existing key
expect(
await client.zunionStore(key2, [
key1,
"{testKey}-non_existing_key",
]),
).toEqual(2);

const zunionstore_map_nonexistingkey = await client.zrangeWithScores(
key2,
range,
);

const expectedMapMultiplied = {
one: 1.0,
two: 2.0,
};
expect(zunionstore_map_nonexistingkey).toEqual(expectedMapMultiplied);

// Empty list check
await expect(client.zunionStore("{xyz}", [])).rejects.toThrow();
}

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`zunionstore test_%p`,
async (protocol) => {
await runTest(async (client: BaseClient) => {
await zunionStoreBasicTest(client);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did you choose to put everything into a single test case? Better to split this into separate test cases that can be run separately.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was Adar's desicion and I asked to split it (I keep asking)

await zunionStoreWithMaxAggregation(client);
await zunionStoreWithMinAggregation(client);
await zunionStoreWithSumAggregation(client);
await zunionStoreWithWeightsAndAggregation(client);
await zunionStoreEmptyCases(client);
}, protocol);
},
config.timeout,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`zmscore test_%p`,
async (protocol) => {
Expand Down
Loading