Skip to content

Commit

Permalink
Java: FT.AGGREGATE. (#2468)
Browse files Browse the repository at this point in the history
* `FT.AGGREGATE`.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>
  • Loading branch information
Yury-Fridlyand authored Oct 18, 2024
1 parent fa12563 commit 87412e8
Show file tree
Hide file tree
Showing 5 changed files with 934 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* Java: Added `FT.CREATE` ([#2414](https://github.com/valkey-io/valkey-glide/pull/2414))
* Java: Added `FT.DROPINDEX` ([#2440](https://github.com/valkey-io/valkey-glide/pull/2440))
* Java: Added `FT.SEARCH` ([#2439](https://github.com/valkey-io/valkey-glide/pull/2439))
* Java: Added `FT.AGGREGATE` ([#2466](https://github.com/valkey-io/valkey-glide/pull/2466))
* Java: Added `JSON.SET` and `JSON.GET` ([#2462](https://github.com/valkey-io/valkey-glide/pull/2462))
* Core: Update routing for commands from server modules ([#2461](https://github.com/valkey-io/valkey-glide/pull/2461))

Expand Down
66 changes: 66 additions & 0 deletions glide-core/src/client/value_conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub(crate) enum ExpectedReturnType<'a> {
ArrayOfStrings,
ArrayOfBools,
ArrayOfDoubleOrNull,
FTAggregateReturnType,
FTSearchReturnType,
Lolwut,
ArrayOfStringAndArrays,
Expand Down Expand Up @@ -893,6 +894,70 @@ pub(crate) fn convert_to_expected_type(
)
.into()),
},
ExpectedReturnType::FTAggregateReturnType => match value {
/*
Example of the response
1) "3"
2) 1) "condition"
2) "refurbished"
3) "bicylces"
4) 1) "bicycle:9"
3) 1) "condition"
2) "used"
3) "bicylces"
4) 1) "bicycle:1"
2) "bicycle:2"
3) "bicycle:3"
4) "bicycle:4"
4) 1) "condition"
2) "new"
3) "bicylces"
4) 1) "bicycle:5"
2) "bicycle:6"
Converting response to (array of maps)
1) 1# "condition" => "refurbished"
2# "bicylces" =>
1) "bicycle:9"
2) 1# "condition" => "used"
2# "bicylces" =>
1) "bicycle:1"
2) "bicycle:2"
3) "bicycle:3"
4) "bicycle:4"
3) 1# "condition" => "new"
2# "bicylces" =>
1) "bicycle:5"
2) "bicycle:6"
Very first element in the response is meaningless and should be ignored.
*/
Value::Array(array) => {
let mut res = Vec::with_capacity(array.len() - 1);
for aggregation in array.into_iter().skip(1) {
let Value::Array(fields) = aggregation else {
return Err((
ErrorKind::TypeError,
"Response couldn't be converted for FT.AGGREGATION",
format!("(`fields` was {:?})", get_value_type(&aggregation)),
)
.into());
};
res.push(convert_array_to_map_by_type(
fields,
None,
None,
)?);
}
Ok(Value::Array(res))
}
_ => Err((
ErrorKind::TypeError,
"Response couldn't be converted to FT.AGGREGATION",
format!("(response was {:?})", get_value_type(&value)),
)
.into()),
},
ExpectedReturnType::FTSearchReturnType => match value {
/*
Example of the response
Expand Down Expand Up @@ -1303,6 +1368,7 @@ pub(crate) fn expected_type_for_cmd(cmd: &Cmd) -> Option<ExpectedReturnType> {
key_type: &None,
value_type: &None,
}),
b"FT.AGGREGATE" => Some(ExpectedReturnType::FTAggregateReturnType),
b"FT.SEARCH" => Some(ExpectedReturnType::FTSearchReturnType),
_ => None,
}
Expand Down
169 changes: 169 additions & 0 deletions java/client/src/main/java/glide/api/commands/servermodules/FT.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@
package glide.api.commands.servermodules;

import static glide.api.models.GlideString.gs;
import static glide.utils.ArrayTransformUtils.castArray;
import static glide.utils.ArrayTransformUtils.concatenateArrays;

import glide.api.BaseClient;
import glide.api.GlideClient;
import glide.api.GlideClusterClient;
import glide.api.models.ClusterValue;
import glide.api.models.GlideString;
import glide.api.models.commands.FT.FTAggregateOptions;
import glide.api.models.commands.FT.FTCreateOptions;
import glide.api.models.commands.FT.FTCreateOptions.FieldInfo;
import glide.api.models.commands.FT.FTSearchOptions;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import lombok.NonNull;
Expand Down Expand Up @@ -305,6 +308,172 @@ public static CompletableFuture<String> dropindex(
return executeCommand(client, new GlideString[] {gs("FT.DROPINDEX"), indexName}, false);
}

/**
* Runs a search query on an index, and perform aggregate transformations on the results.
*
* @param client The client to execute the command.
* @param indexName The index name.
* @param query The text query to search.
* @return Results of the last stage of the pipeline.
* @example
* <pre>{@code
* // example of using the API:
* FT.aggregate(client, "myIndex", "*").get();
* // the response contains data in the following format:
* Map<GlideString, Object>[] response = new Map[] {
* Map.of(
* gs("condition"), gs("refurbished"),
* gs("bicycles"), new Object[] { gs("bicycle:9") }
* ),
* Map.of(
* gs("condition"), gs("used"),
* gs("bicycles"), new Object[] { gs("bicycle:1"), gs("bicycle:2"), gs("bicycle:3") }
* ),
* Map.of(
* gs("condition"), gs("new"),
* gs("bicycles"), new Object[] { gs("bicycle:0"), gs("bicycle:5") }
* )
* };
* }</pre>
*/
public static CompletableFuture<Map<GlideString, Object>[]> aggregate(
@NonNull BaseClient client, @NonNull String indexName, @NonNull String query) {
return aggregate(client, gs(indexName), gs(query));
}

/**
* Runs a search query on an index, and perform aggregate transformations on the results.
*
* @param client The client to execute the command.
* @param indexName The index name.
* @param query The text query to search.
* @param options Additional parameters for the command - see {@link FTAggregateOptions}.
* @return Results of the last stage of the pipeline.
* @example
* <pre>{@code
* // example of using the API:
* FTAggregateOptions options = FTAggregateOptions.builder()
* .loadFields(new String[] {"__key"})
* .addExpression(
* new FTAggregateOptions.GroupBy(
* new String[] {"@condition"},
* new Reducer[] {
* new Reducer("TOLIST", new String[] {"__key"}, "bicycles")
* }))
* .build();
* FT.aggregate(client, "myIndex", "*", options).get();
* // the response contains data in the following format:
* Map<GlideString, Object>[] response = new Map[] {
* Map.of(
* gs("condition"), gs("refurbished"),
* gs("bicycles"), new Object[] { gs("bicycle:9") }
* ),
* Map.of(
* gs("condition"), gs("used"),
* gs("bicycles"), new Object[] { gs("bicycle:1"), gs("bicycle:2"), gs("bicycle:3") }
* ),
* Map.of(
* gs("condition"), gs("new"),
* gs("bicycles"), new Object[] { gs("bicycle:0"), gs("bicycle:5") }
* )
* };
* }</pre>
*/
public static CompletableFuture<Map<GlideString, Object>[]> aggregate(
@NonNull BaseClient client,
@NonNull String indexName,
@NonNull String query,
@NonNull FTAggregateOptions options) {
return aggregate(client, gs(indexName), gs(query), options);
}

/**
* Runs a search query on an index, and perform aggregate transformations on the results.
*
* @param client The client to execute the command.
* @param indexName The index name.
* @param query The text query to search.
* @return Results of the last stage of the pipeline.
* @example
* <pre>{@code
* // example of using the API:
* FT.aggregate(client, gs("myIndex"), gs("*")).get();
* // the response contains data in the following format:
* Map<GlideString, Object>[] response = new Map[] {
* Map.of(
* gs("condition"), gs("refurbished"),
* gs("bicycles"), new Object[] { gs("bicycle:9") }
* ),
* Map.of(
* gs("condition"), gs("used"),
* gs("bicycles"), new Object[] { gs("bicycle:1"), gs("bicycle:2"), gs("bicycle:3") }
* ),
* Map.of(
* gs("condition"), gs("new"),
* gs("bicycles"), new Object[] { gs("bicycle:0"), gs("bicycle:5") }
* )
* };
* }</pre>
*/
@SuppressWarnings("unchecked")
public static CompletableFuture<Map<GlideString, Object>[]> aggregate(
@NonNull BaseClient client, @NonNull GlideString indexName, @NonNull GlideString query) {
var args = new GlideString[] {gs("FT.AGGREGATE"), indexName, query};
return FT.<Object[]>executeCommand(client, args, false)
.thenApply(res -> castArray(res, Map.class));
}

/**
* Runs a search query on an index, and perform aggregate transformations on the results.
*
* @param client The client to execute the command.
* @param indexName The index name.
* @param query The text query to search.
* @param options Additional parameters for the command - see {@link FTAggregateOptions}.
* @return Results of the last stage of the pipeline.
* @example
* <pre>{@code
* // example of using the API:
* FTAggregateOptions options = FTAggregateOptions.builder()
* .loadFields(new String[] {"__key"})
* .addExpression(
* new FTAggregateOptions.GroupBy(
* new String[] {"@condition"},
* new Reducer[] {
* new Reducer("TOLIST", new String[] {"__key"}, "bicycles")
* }))
* .build();
* FT.aggregate(client, gs("myIndex"), gs("*"), options).get();
* // the response contains data in the following format:
* Map<GlideString, Object>[] response = new Map[] {
* Map.of(
* gs("condition"), gs("refurbished"),
* gs("bicycles"), new Object[] { gs("bicycle:9") }
* ),
* Map.of(
* gs("condition"), gs("used"),
* gs("bicycles"), new Object[] { gs("bicycle:1"), gs("bicycle:2"), gs("bicycle:3") }
* ),
* Map.of(
* gs("condition"), gs("new"),
* gs("bicycles"), new Object[] { gs("bicycle:0"), gs("bicycle:5") }
* )
* };
* }</pre>
*/
@SuppressWarnings("unchecked")
public static CompletableFuture<Map<GlideString, Object>[]> aggregate(
@NonNull BaseClient client,
@NonNull GlideString indexName,
@NonNull GlideString query,
@NonNull FTAggregateOptions options) {
var args =
concatenateArrays(
new GlideString[] {gs("FT.AGGREGATE"), indexName, query}, options.toArgs());
return FT.<Object[]>executeCommand(client, args, false)
.thenApply(res -> castArray(res, Map.class));
}

/**
* A wrapper for custom command API.
*
Expand Down
Loading

0 comments on commit 87412e8

Please sign in to comment.