Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: FT.AGGREGATE command added #2530

Merged
merged 13 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#### Changes
* Python: Python: FT.AGGREGATE command added([#2530](https://github.com/valkey-io/valkey-glide/pull/2530))
* Python: Add JSON.OBJLEN command ([#2495](https://github.com/valkey-io/valkey-glide/pull/2495))
* Python: FT.EXPLAIN and FT.EXPLAINCLI commands added([#2508](https://github.com/valkey-io/valkey-glide/pull/2508))
* Python: Python FT.INFO command added([#2429](https://github.com/valkey-io/valkey-glide/pull/2494))
Expand Down
20 changes: 20 additions & 0 deletions python/python/glide/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@
UpdateOptions,
)
from glide.async_commands.server_modules import ft, json
from glide.async_commands.server_modules.ft_options.ft_aggregate_options import (
FtAggregateApply,
FtAggregateClause,
FtAggregateFilter,
FtAggregateGroupBy,
FtAggregateLimit,
FtAggregateOptions,
FtAggregateReducer,
FtAggregateSortBy,
FtAggregateSortProperty,
)
from glide.async_commands.server_modules.ft_options.ft_create_options import (
DataType,
DistanceMetricType,
Expand Down Expand Up @@ -273,4 +284,13 @@
"FtSearchLimit",
"ReturnField",
"FtSeachOptions",
"FtAggregateApply",
"FtAggregateFilter",
"FtAggregateClause",
"FtAggregateLimit",
"FtAggregateOptions",
"FtAggregateGroupBy",
"FtAggregateReducer",
"FtAggregateSortBy",
"FtAggregateSortProperty",
]
1 change: 1 addition & 0 deletions python/python/glide/async_commands/command_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class OrderBy(Enum):
This enum is used for the following commands:
- `SORT`: General sorting in ascending or descending order.
- `GEOSEARCH`: Sorting items based on their proximity to a center point.
- `FT.AGGREGATE`: Used in the SortBy clause of the FT.AGGREGATE command.
"""

ASC = "ASC"
Expand Down
32 changes: 31 additions & 1 deletion python/python/glide/async_commands/server_modules/ft.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
module for `vector search` commands.
"""

from typing import List, Mapping, Optional, Union, cast
from typing import Any, List, Mapping, Optional, Union, cast

from glide.async_commands.server_modules.ft_options.ft_aggregate_options import (
FtAggregateOptions,
)
from glide.async_commands.server_modules.ft_options.ft_constants import (
CommandNames,
FtCreateKeywords,
Expand Down Expand Up @@ -276,3 +279,30 @@ async def explaincli(
"""
args: List[TEncodable] = [CommandNames.FT_EXPLAINCLI, indexName, query]
return cast(List[TEncodable], await client.custom_command(args))


async def aggregate(
client: TGlideClient,
indexName: TEncodable,
query: TEncodable,
options: Optional[FtAggregateOptions],
) -> List[Mapping[TEncodable, Any]]:
"""
A superset of the FT.SEARCH command, it allows substantial additional processing of the keys selected by the query expression.
Args:
client (TGlideClient): The client to execute the command.
indexName (TEncodable): The index name for which the query is written.
query (TEncodable): The search query, same as the query passed as an argument to FT.SEARCH.
options (Optional[FtAggregateOptions]): The optional arguments for the command.
Returns:
List[Mapping[TEncodable, Any]]: An array containing a mapping of field name and associated value as returned after the last stage of the command.

Examples:
>>> from glide import ft
>>> result = await ft.aggregate(glide_client, myIndex"", "*", FtAggregateOptions(loadFields=["__key"], clauses=[GroupBy(["@condition"], [Reducer("COUNT", [], "bicycles")])]))
[{b'condition': b'refurbished', b'bicycles': b'1'}, {b'condition': b'new', b'bicycles': b'5'}, {b'condition': b'used', b'bicycles': b'4'}]
"""
args: List[TEncodable] = [CommandNames.FT_AGGREGATE, indexName, query]
if options:
args.extend(options.to_args())
return cast(List[Mapping[TEncodable, Any]], await client.custom_command(args))
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
# Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0
from abc import ABC, abstractmethod
from enum import Enum
from typing import List, Mapping, Optional

from glide.async_commands.command_args import OrderBy
from glide.async_commands.server_modules.ft_options.ft_constants import (
FtAggregateKeywords,
)
from glide.constants import TEncodable


class FtAggregateClause(ABC):
prateek-kumar-improving marked this conversation as resolved.
Show resolved Hide resolved
"""
Abstract base class for the FT.AGGREGATE command clauses.
"""

@abstractmethod
def to_args(self) -> List[TEncodable]:
"""
Get the arguments for the clause of the FT.AGGREGATE command.

Returns:
List[TEncodable]: A list of arguments for the clause of the FT.AGGREGATE command.
"""
args: List[TEncodable] = []
return args


class FtAggregateLimit(FtAggregateClause):
"""
A clause for limiting the number of retained records.
"""

def __init__(self, offset: int, count: int):
"""
Initialize a new FtAggregateLimit instance.

Args:
offset (int): Starting point from which the records have to be retained.
count (int): The total number of records to be retained.
"""
self.offset = offset
self.count = count

def to_args(self) -> List[TEncodable]:
"""
prateek-kumar-improving marked this conversation as resolved.
Show resolved Hide resolved
Get the arguments for the Limit clause.

Returns:
List[TEncodable]: A list of Limit clause arguments.
"""
return [FtAggregateKeywords.LIMIT, str(self.offset), str(self.count)]


class FtAggregateFilter(FtAggregateClause):
"""
A clause for filtering the results using predicate expression relating to values in each result. It is applied post query and relate to the current state of the pipeline.
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
"""

def __init__(self, expression: TEncodable):
"""
Initialize a new FtAggregateFilter instance.

Args:
expression (TEncodable): The expression to filter the results.
"""
self.expression = expression

def to_args(self) -> List[TEncodable]:
"""
Get the arguments for the Filter clause.

Returns:
List[TEncodable]: A list arguments for the filter clause.
"""
return [FtAggregateKeywords.FILTER, self.expression]


class FtAggregateReducer:
"""
A clause for reducing the matching results in each group using a reduction function. The matching results are reduced into a single record.
"""

def __init__(
self,
function: TEncodable,
args: List[TEncodable],
name: Optional[TEncodable] = None,
):
"""
Initialize a new FtAggregateReducer instance.

Args:
function (TEncodable): The reduction function names for the respective group.
args (List[TEncodable]): The list of arguments for the reducer.
name (Optional[TEncodable]): User defined property name for the reducer.
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
"""
self.function = function
self.args = args
self.name = name

def to_args(self) -> List[TEncodable]:
"""
Get the arguments for the Reducer.

Returns:
List[TEncodable]: A list of arguments for the reducer.
"""
args: List[TEncodable] = [
FtAggregateKeywords.REDUCE,
self.function,
str(len(self.args)),
] + self.args
if self.name:
args.extend([FtAggregateKeywords.AS, self.name])
return args


class FtAggregateGroupBy(FtAggregateClause):
"""
A clause for grouping the results in the pipeline based on one or more properties.
"""

def __init__(
self, properties: List[TEncodable], reducers: List[FtAggregateReducer]
):
"""
Initialize a new FtAggregateGroupBy instance.

Args:
properties (List[TEncodable]): The list of properties to be used for grouping the results in the pipeline.
reducers (List[Reducer]): The list of functions that handles the group entries by performing multiple aggregate operations.
"""
self.properties = properties
self.reducers = reducers

def to_args(self) -> List[TEncodable]:
args = [
FtAggregateKeywords.GROUPBY,
str(len(self.properties)),
] + self.properties
if self.reducers:
for reducer in self.reducers:
args.extend(reducer.to_args())
return args


class FtAggregateSortProperty:
"""
This class represents the a single property for the SortBy clause.
"""

def __init__(self, property: TEncodable, order: OrderBy):
"""
Initialize a new FtAggregateSortProperty instance.

Args:
property (TEncodable): The sorting parameter.
order (OrderBy): The order for the sorting. This option can be added for each property.
"""
self.property = property
self.order = order

def to_args(self) -> List[TEncodable]:
"""
Get the arguments for the SortBy clause property.

Returns:
List[TEncodable]: A list of arguments for the SortBy clause property.
"""
return [self.property, self.order.value]


class FtAggregateSortBy(FtAggregateClause):
"""
A clause for sorting the pipeline up until the point of SORTBY, using a list of properties.
"""

def __init__(
self, properties: List[FtAggregateSortProperty], max: Optional[int] = None
):
"""
Initialize a new FtAggregateSortBy instance.

Args:
properties (List[FtAggregateSortProperty]): A list of sorting parameters for the sort operation.
max: (Optional[int]): The MAX value for optimizing the sorting, by sorting only for the n-largest elements.
"""
self.properties = properties
self.max = max

def to_args(self) -> List[TEncodable]:
"""
Get the arguments for the SortBy clause.

Returns:
List[TEncodable]: A list of arguments for the SortBy clause.
"""
args: List[TEncodable] = [
FtAggregateKeywords.SORTBY,
str(len(self.properties) * 2),
]
for property in self.properties:
args.extend(property.to_args())
if self.max:
args.extend([FtAggregateKeywords.MAX, str(self.max)])
return args


class FtAggregateApply(FtAggregateClause):
"""
A clause for applying a 1-to-1 transformation on one or more properties and stores the result as a new property down the pipeline or replaces any property using this transformation.
"""

def __init__(self, expression: TEncodable, name: TEncodable):
"""
Initialize a new FtAggregateApply instance.

Args:
expression (TEncodable): The expression to be transformed.
name (TEncodable): The new property name to store the result of apply. This name can be referenced by further APPLY/SORTBY/GROUPBY/REDUCE operations down the pipeline.
prateek-kumar-improving marked this conversation as resolved.
Show resolved Hide resolved
"""
self.expression = expression
self.name = name

def to_args(self) -> List[TEncodable]:
"""
Get the arguments for the Apply clause.

Returns:
List[TEncodable]: A list of arguments for the Apply clause.
"""
return [
FtAggregateKeywords.APPLY,
self.expression,
FtAggregateKeywords.AS,
self.name,
]


class FtAggregateOptions:
"""
This class represents the optional arguments for the FT.AGGREGATE command.
"""

def __init__(
self,
loadAll: Optional[bool] = False,
loadFields: Optional[List[TEncodable]] = [],
timeout: Optional[int] = None,
params: Optional[Mapping[TEncodable, TEncodable]] = {},
clauses: Optional[List[FtAggregateClause]] = [],
):
"""
Initialize a new FtAggregateOptions instance.

Args:
loadAll (Optional[bool]): An option to load all fields declared in the index.
loadFields (Optional[List[TEncodable]]): An option to load only the fields passed in this list.
timeout (Optional[int]): Overrides the timeout parameter of the module.
params (Optional[Mapping[TEncodable, TEncodable]]): The key/value pairs can be referenced from within the query expression.
clauses (Optional[List[FtAggregateClause]]): FILTER, LIMIT, GROUPBY, SORTBY and APPLY clauses, that can be repeated multiple times in any order and be freely intermixed. They are applied in the order specified, with the output of one clause feeding the input of the next clause.
"""
self.loadAll = loadAll
self.loadFields = loadFields
self.timeout = timeout
self.params = params
self.clauses = clauses

def to_args(self) -> List[TEncodable]:
"""
Get the optional arguments for the FT.AGGREGATE command.

Returns:
List[TEncodable]: A list of optional arguments for the FT.AGGREGATE command.
"""
args: List[TEncodable] = []
if self.loadAll:
args.extend([FtAggregateKeywords.LOAD, "*"])
elif self.loadFields:
args.extend([FtAggregateKeywords.LOAD, str(len(self.loadFields))])
args.extend(self.loadFields)
if self.timeout:
args.extend([FtAggregateKeywords.TIMEOUT, str(self.timeout)])
if self.params:
args.extend([FtAggregateKeywords.PARAMS, str(len(self.params) * 2)])
for [name, value] in self.params.items():
args.extend([name, value])
if self.clauses:
for clause in self.clauses:
args.extend(clause.to_args())
return args
Loading
Loading