Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node: FT.AGGREGATE #2554

Merged
merged 5 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
* Java: Added `JSON.SET` and `JSON.GET` ([#2462](https://github.com/valkey-io/valkey-glide/pull/2462))
* Node: Added `FT.CREATE` ([#2501](https://github.com/valkey-io/valkey-glide/pull/2501))
* Node: Added `FT.INFO` ([#2540](https://github.com/valkey-io/valkey-glide/pull/2540))
* Node: Added `FT.AGGREGATE` ([#2554](https://github.com/valkey-io/valkey-glide/pull/2554))
* Java: Added `JSON.DEBUG` ([#2520](https://github.com/valkey-io/valkey-glide/pull/2520))
* Java: Added `JSON.ARRINSERT` and `JSON.ARRLEN` ([#2476](https://github.com/valkey-io/valkey-glide/pull/2476))
* Java: Added `JSON.ARRPOP` ([#2486](https://github.com/valkey-io/valkey-glide/pull/2486))
Expand Down
20 changes: 20 additions & 0 deletions node/npm/glide/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,16 @@ function initialize() {
VectorFieldAttributesHnsw,
FtCreateOptions,
FtInfoReturnType,
FtAggregateOptions,
FtAggregateClauseType,
FtAggregateClause,
FtAggregateLimit,
FtAggregateFilter,
FtAggregateGroupBy,
FtAggregateReducer,
FtAggregateSortBy,
FtAggregateSortProperty,
FtAggregateApply,
GlideRecord,
GlideString,
JsonGetOptions,
Expand Down Expand Up @@ -246,6 +256,16 @@ function initialize() {
VectorFieldAttributesHnsw,
FtCreateOptions,
FtInfoReturnType,
FtAggregateOptions,
FtAggregateClauseType,
FtAggregateClause,
FtAggregateLimit,
FtAggregateFilter,
FtAggregateGroupBy,
FtAggregateReducer,
FtAggregateSortBy,
FtAggregateSortProperty,
FtAggregateApply,
GlideRecord,
GlideJson,
GlideString,
Expand Down
162 changes: 161 additions & 1 deletion node/src/server-modules/GlideFt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,17 @@ import {
} from "../BaseClient";
import { GlideClient } from "../GlideClient";
import { GlideClusterClient } from "../GlideClusterClient";
import { Field, FtCreateOptions } from "./GlideFtOptions";
import {
Field,
FtAggregateClauseType,
FtAggregateOptions,
FtCreateOptions,
} from "./GlideFtOptions";

// Can't disable that rule for specific lines, because prettier moves the comment with "eslint-disable-line"
// Disabling for the entire file. `FT.SEARCH`, `FT.AGGREGATE` and `FT.PROFILE` return types depend on the
// search query and user data formats.
/* eslint-disable @typescript-eslint/no-explicit-any */

/** Data type of {@link GlideFt.info | info} command response. */
type FtInfoReturnType = Record<
Expand Down Expand Up @@ -204,6 +214,156 @@ export class GlideFt {
}) as Promise<"OK">;
}

/**
* Runs a search query on an index, and perform aggregate transformations on the results.
*
* @param client - The client to execute the command.
* @param indexName - The index name.
* @param query - The text query to search.
* @param options - Additional parameters for the command - see {@link FtAggregateOptions} and {@link DecoderOption}.
* @returns Results of the last stage of the pipeline.
*
* @example
* ```typescript
* const options: FtAggregateOptions = {
* loadFields: ["__key"],
* clauses: [
* {
* type: FtAggregateClauseType.GROUPBY,
* properties: ["@condition"],
* reducers: [
* {
* function: "TOLIST",
* args: ["__key"],
* name: "bicycles",
* },
* ],
* },
* ],
* };
* const result = await GlideFt.aggregate("myIndex", "*", options);
* console.log(result); // Output:
* // [
* // [
* // {
* // key: "condition",
* // value: "refurbished"
* // },
* // {
* // key: "bicycles",
* // value: [ "bicycle:9" ]
* // }
* // ],
* // [
* // {
* // key: "condition",
* // value: "used"
* // },
* // {
* // key: "bicycles",
* // value: [ "bicycle:1", "bicycle:2", "bicycle:3" ]
* // }
* // ],
* // [
* // {
* // key: "condition",
* // value: "new"
* // },
* // {
* // key: "bicycles",
* // value: [ "bicycle:0", "bicycle:5" ]
* // }
* // ]
* // ]
* ```
*/
static async aggregate(
client: GlideClient | GlideClusterClient,
indexName: GlideString,
query: GlideString,
options?: DecoderOption & FtAggregateOptions,
): Promise<GlideRecord<any>[]> {
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
const args: GlideString[] = ["FT.AGGREGATE", indexName, query];

if (options) {
if (options.loadAll) args.push("LOAD", "*");
if (options.loadFields)
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
args.push(
"LOAD",
options.loadFields.length.toString(),
...options.loadFields,
);

if (options.timeout)
args.push("TIMEOUT", options.timeout.toString());

if (options.params && options.params.length) {
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
args.push(
"PARAMS",
(options.params.length * 2).toString(),
...options.params.flatMap((pair) => pair),
);
}

if (options.clauses) {
for (const clause of options.clauses) {
switch (clause.type) {
case FtAggregateClauseType.LIMIT:
args.push(
clause.type,
clause.offset.toString(),
clause.count.toString(),
);
break;
case FtAggregateClauseType.FILTER:
args.push(clause.type, clause.expression);
break;
case FtAggregateClauseType.GROUPBY:
args.push(
clause.type,
clause.properties.length.toString(),
...clause.properties,
);

for (const reducer of clause.reducers) {
args.push(
"REDUCE",
reducer.function,
reducer.args.length.toString(),
...reducer.args,
);
if (reducer.name) args.push("AS", reducer.name);
}

break;
case FtAggregateClauseType.SORTBY:
args.push(
clause.type,
(clause.properties.length * 2).toString(),
);
for (const property of clause.properties)
args.push(property.property, property.order);
if (clause.max)
args.push("MAX", clause.max.toString());
break;
case FtAggregateClauseType.APPLY:
args.push(
clause.type,
clause.expression,
"AS",
clause.name,
);
break;
}
}
}
}

return _handleCustomCommand(client, args, options) as Promise<
GlideRecord<any>[]
>;
}

/**
* Returns information about a given index.
*
Expand Down
117 changes: 116 additions & 1 deletion node/src/server-modules/GlideFtOptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*/

import { GlideString } from "../BaseClient";
import { GlideFt } from "./GlideFt"; // eslint-disable-line @typescript-eslint/no-unused-vars

interface BaseField {
/** The name of the field. */
Expand Down Expand Up @@ -109,7 +110,7 @@ export type VectorFieldAttributesHnsw = VectorFieldAttributes & {
export type Field = TextField | TagField | NumericField | VectorField;

/**
* Represents the input options to be used in the FT.CREATE command.
* Represents the input options to be used in the {@link GlideFt.create | FT.CREATE} command.
* All fields in this class are optional inputs for FT.CREATE.
*/
export interface FtCreateOptions {
Expand All @@ -118,3 +119,117 @@ export interface FtCreateOptions {
/** The prefix of the key to be indexed. */
prefixes?: GlideString[];
}

/** Additional parameters for {@link GlideFt.aggregate | FT.AGGREGATE} command. */
export type FtAggregateOptions = {
/** Query timeout in milliseconds. */
timeout?: number;
/**
* {@link FtAggregateFilter | FILTER}, {@link FtAggregateLimit | LIMIT}, {@link FtAggregateGroupBy | GROUPBY},
* {@link FtAggregateSortBy | SORTBY} and {@link FtAggregateApply | APPLY} clauses, that can be repeated
* multiple times in any order and be freely intermixed. They are applied in the order specified,
* with the output of one clause feeding the input of the next clause.
*/
clauses?: FtAggregateClause[];
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
/** The key/value pairs can be referenced from within the query expression. */
params?: [GlideString, GlideString][];
} & (
| {
/** List of fields to load from the index. */
loadFields?: GlideString[];
/** `loadAll` and `loadFields` are mutually exclusive. */
loadAll?: never;
}
| {
/** Option to load all fields declared in the index */
loadAll?: boolean;
/** `loadAll` and `loadFields` are mutually exclusive. */
loadFields?: never;
}
);

/** The {@link GlideFt.aggregate | FT.AGGREGATE} clause type. */
export enum FtAggregateClauseType {
LIMIT = "LIMIT",
FILTER = "FILTER",
GROUPBY = "GROUPBY",
SORTBY = "SORTBY",
APPLY = "APPLY",
}

export type FtAggregateClause =
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
| FtAggregateLimit
| FtAggregateFilter
| FtAggregateGroupBy
| FtAggregateSortBy
| FtAggregateApply;

/** A clause for limiting the number of retained records. */
export interface FtAggregateLimit {
type: FtAggregateClauseType.LIMIT;
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
/** Starting point from which the records have to be retained. */
offset: number;
/** The total number of records to be retained. */
count: number;
}

/**
* A clause for filtering the results using predicate expression relating to values in each result.
* It is applied post query and relate to the current state of the pipeline.
*/
export interface FtAggregateFilter {
type: FtAggregateClauseType.FILTER;
/** The expression to filter the results. */
expression: GlideString;
}

/** A clause for grouping the results in the pipeline based on one or more properties. */
export interface FtAggregateGroupBy {
type: FtAggregateClauseType.GROUPBY;
/** The list of properties to be used for grouping the results in the pipeline. */
properties: GlideString[];
/** The list of functions that handles the group entries by performing multiple aggregate operations. */
reducers: FtAggregateReducer[];
}

/**
* A clause for reducing the matching results in each group using a reduction function.
* The matching results are reduced into a single record.
*/
export interface FtAggregateReducer {
/** The reduction function name for the respective group. */
function: string;
/** The list of arguments for the reducer. */
args: GlideString[];
/** User defined property name for the reducer. */
name?: GlideString;
}

/** A clause for sorting the pipeline up until the point of SORTBY, using a list of properties. */
export interface FtAggregateSortBy {
type: FtAggregateClauseType.SORTBY;
/** A list of sorting parameters for the sort operation. */
properties: FtAggregateSortProperty[];
/** The MAX value for optimizing the sorting, by sorting only for the n-largest elements. */
max?: number;
}

/** A single property for the {@link FtAggregateSortBy | SORTBY} clause. */
export interface FtAggregateSortProperty {
/** The sorting parameter. */
property: GlideString;
/** The order for the sorting. */
order: "ASC" | "DESC";
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* A clause for applying a 1-to-1 transformation on one or more properties and stores the result
* as a new property down the pipeline or replaces any property using this transformation.
*/
export interface FtAggregateApply {
type: FtAggregateClauseType.APPLY;
/** The transformation expression. */
expression: GlideString;
/** The new property name to store the result of apply. This name can be referenced by further operations down the pipeline. */
name: GlideString;
}
Loading
Loading