Skip to content

Commit

Permalink
Merge branch 'main' into niharika-sortcmds
Browse files Browse the repository at this point in the history
  • Loading branch information
niharikabhavaraju committed Jan 12, 2025
2 parents 8c7943d + d31d3fa commit 733196e
Show file tree
Hide file tree
Showing 11 changed files with 737 additions and 45 deletions.
2 changes: 1 addition & 1 deletion glide-core/redis-rs/redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2666,7 +2666,7 @@ where
}
}

async fn calculate_topology_from_random_nodes<'a, C>(
async fn calculate_topology_from_random_nodes<C>(
inner: &Core<C>,
num_of_nodes_to_query: usize,
curr_retry: usize,
Expand Down
18 changes: 3 additions & 15 deletions glide-core/redis-rs/redis/src/cluster_topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,24 +76,12 @@ pub(crate) fn slot(key: &[u8]) -> u16 {
}

fn get_hashtag(key: &[u8]) -> Option<&[u8]> {
let open = key.iter().position(|v| *v == b'{');
let open = match open {
Some(open) => open,
None => return None,
};
let open = key.iter().position(|v| *v == b'{')?;

let close = key[open..].iter().position(|v| *v == b'}');
let close = match close {
Some(close) => close,
None => return None,
};
let close = key[open..].iter().position(|v| *v == b'}')?;

let rv = &key[open + 1..open + close];
if rv.is_empty() {
None
} else {
Some(rv)
}
(!rv.is_empty()).then_some(rv)
}

/// Returns the slot that matches `key`.
Expand Down
12 changes: 6 additions & 6 deletions glide-core/redis-rs/redis/src/sentinel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ fn get_valid_replicas_addresses(
}

#[cfg(feature = "aio")]
async fn async_get_valid_replicas_addresses<'a>(
async fn async_get_valid_replicas_addresses(
replicas: Vec<HashMap<String, String>>,
node_connection_info: &SentinelNodeConnectionInfo,
) -> Vec<ConnectionInfo> {
Expand Down Expand Up @@ -608,15 +608,15 @@ impl Sentinel {
self.async_try_all_sentinels(sentinel_masters_cmd()).await
}

async fn async_get_sentinel_replicas<'a>(
async fn async_get_sentinel_replicas(
&mut self,
service_name: &'a str,
service_name: &str,
) -> RedisResult<Vec<HashMap<String, String>>> {
self.async_try_all_sentinels(sentinel_replicas_cmd(service_name))
.await
}

async fn async_find_master_address<'a>(
async fn async_find_master_address(
&mut self,
service_name: &str,
node_connection_info: &SentinelNodeConnectionInfo,
Expand All @@ -625,7 +625,7 @@ impl Sentinel {
async_find_valid_master(masters, service_name, node_connection_info).await
}

async fn async_find_valid_replica_addresses<'a>(
async fn async_find_valid_replica_addresses(
&mut self,
service_name: &str,
node_connection_info: &SentinelNodeConnectionInfo,
Expand Down Expand Up @@ -667,7 +667,7 @@ impl Sentinel {
/// There is no guarantee that we'll actually be connecting to a different replica
/// in the next call, but in a static set of replicas (no replicas added or
/// removed), on average we'll choose each replica the same number of times.
pub async fn async_replica_rotate_for<'a>(
pub async fn async_replica_rotate_for(
&mut self,
service_name: &str,
node_connection_info: Option<&SentinelNodeConnectionInfo>,
Expand Down
2 changes: 1 addition & 1 deletion glide-core/redis-rs/redis/tests/test_cluster_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1178,7 +1178,7 @@ mod test_cluster_scan_async {
for key in excepted_keys.iter() {
assert!(keys.contains(key));
}
assert!(keys.len() > 0);
assert!(!keys.is_empty());
}

#[tokio::test]
Expand Down
115 changes: 115 additions & 0 deletions go/api/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1476,6 +1476,101 @@ func (client *baseClient) BZPopMin(keys []string, timeoutSecs float64) (Result[K
return handleKeyWithMemberAndScoreResponse(result)
}

// Returns the specified range of elements in the sorted set stored at `key`.
// `ZRANGE` can perform different types of range queries: by index (rank), by the score, or by lexicographical order.
//
// To get the elements with their scores, see [ZRangeWithScores].
//
// See [valkey.io] for more details.
//
// Parameters:
//
// key - The key of the sorted set.
// rangeQuery - The range query object representing the type of range query to perform.
// - For range queries by index (rank), use [RangeByIndex].
// - For range queries by lexicographical order, use [RangeByLex].
// - For range queries by score, use [RangeByScore].
//
// Return value:
//
// An array of elements within the specified range.
// If `key` does not exist, it is treated as an empty sorted set, and the command returns an empty array.
//
// Example:
//
// // Retrieve all members of a sorted set in ascending order
// result, err := client.ZRange("my_sorted_set", options.NewRangeByIndexQuery(0, -1))
//
// // Retrieve members within a score range in descending order
//
// query := options.NewRangeByScoreQuery(options.NewScoreBoundary(3, false),
// options.NewInfiniteScoreBoundary(options.NegativeInfinity)).
//
// .SetReverse()
// result, err := client.ZRange("my_sorted_set", query)
// // `result` contains members which have scores within the range of negative infinity to 3, in descending order
//
// [valkey.io]: https://valkey.io/commands/zrange/
func (client *baseClient) ZRange(key string, rangeQuery options.ZRangeQuery) ([]Result[string], error) {
args := make([]string, 0, 10)
args = append(args, key)
args = append(args, rangeQuery.ToArgs()...)
result, err := client.executeCommand(C.ZRange, args)
if err != nil {
return nil, err
}

return handleStringArrayResponse(result)
}

// Returns the specified range of elements with their scores in the sorted set stored at `key`.
// `ZRANGE` can perform different types of range queries: by index (rank), by the score, or by lexicographical order.
//
// See [valkey.io] for more details.
//
// Parameters:
//
// key - The key of the sorted set.
// rangeQuery - The range query object representing the type of range query to perform.
// - For range queries by index (rank), use [RangeByIndex].
// - For range queries by score, use [RangeByScore].
//
// Return value:
//
// A map of elements and their scores within the specified range.
// If `key` does not exist, it is treated as an empty sorted set, and the command returns an empty map.
//
// Example:
//
// // Retrieve all members of a sorted set in ascending order
// result, err := client.ZRangeWithScores("my_sorted_set", options.NewRangeByIndexQuery(0, -1))
//
// // Retrieve members within a score range in descending order
//
// query := options.NewRangeByScoreQuery(options.NewScoreBoundary(3, false),
// options.NewInfiniteScoreBoundary(options.NegativeInfinity)).
//
// SetReverse()
// result, err := client.ZRangeWithScores("my_sorted_set", query)
// // `result` contains members with scores within the range of negative infinity to 3, in descending order
//
// [valkey.io]: https://valkey.io/commands/zrange/
func (client *baseClient) ZRangeWithScores(
key string,
rangeQuery options.ZRangeQueryWithScores,
) (map[Result[string]]Result[float64], error) {
args := make([]string, 0, 10)
args = append(args, key)
args = append(args, rangeQuery.ToArgs()...)
args = append(args, "WITHSCORES")
result, err := client.executeCommand(C.ZRange, args)
if err != nil {
return nil, err
}

return handleStringDoubleMapResponse(result)
}

func (client *baseClient) Persist(key string) (Result[bool], error) {
result, err := client.executeCommand(C.Persist, []string{key})
if err != nil {
Expand Down Expand Up @@ -1516,6 +1611,26 @@ func (client *baseClient) ZRevRankWithScore(key string, member string) (Result[i
return handleLongAndDoubleOrNullResponse(result)
}

func (client *baseClient) XTrim(key string, options *options.XTrimOptions) (Result[int64], error) {
xTrimArgs, err := options.ToArgs()
if err != nil {
return CreateNilInt64Result(), err
}
result, err := client.executeCommand(C.XTrim, append([]string{key}, xTrimArgs...))
if err != nil {
return CreateNilInt64Result(), err
}
return handleLongResponse(result)
}

func (client *baseClient) XLen(key string) (Result[int64], error) {
result, err := client.executeCommand(C.XLen, []string{key})
if err != nil {
return CreateNilInt64Result(), err
}
return handleLongResponse(result)
}

func (client *baseClient) Sort(key string) ([]Result[string], error) {
result, err := client.executeCommand(C.Sort, []string{key})
if err != nil {
Expand Down
38 changes: 18 additions & 20 deletions go/api/options/stream_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,36 +85,34 @@ func NewXTrimOptionsWithMaxLen(threshold int64) *XTrimOptions {
}

// Match exactly on the threshold.
func (xto *XTrimOptions) SetExactTrimming() *XTrimOptions {
xto.exact = triStateBoolTrue
return xto
func (xTrimOptions *XTrimOptions) SetExactTrimming() *XTrimOptions {
xTrimOptions.exact = triStateBoolTrue
return xTrimOptions
}

// Trim in a near-exact manner, which is more efficient.
func (xto *XTrimOptions) SetNearlyExactTrimming() *XTrimOptions {
xto.exact = triStateBoolFalse
return xto
func (xTrimOptions *XTrimOptions) SetNearlyExactTrimming() *XTrimOptions {
xTrimOptions.exact = triStateBoolFalse
return xTrimOptions
}

// Max number of stream entries to be trimmed for non-exact match.
func (xto *XTrimOptions) SetNearlyExactTrimmingAndLimit(limit int64) *XTrimOptions {
xto.exact = triStateBoolFalse
xto.limit = limit
return xto
func (xTrimOptions *XTrimOptions) SetNearlyExactTrimmingAndLimit(limit int64) *XTrimOptions {
xTrimOptions.exact = triStateBoolFalse
xTrimOptions.limit = limit
return xTrimOptions
}

func (xto *XTrimOptions) ToArgs() ([]string, error) {
args := []string{}
args = append(args, xto.method)
if xto.exact == triStateBoolTrue {
func (xTrimOptions *XTrimOptions) ToArgs() ([]string, error) {
args := []string{xTrimOptions.method}
if xTrimOptions.exact == triStateBoolTrue {
args = append(args, "=")
} else if xto.exact == triStateBoolFalse {
} else if xTrimOptions.exact == triStateBoolFalse {
args = append(args, "~")
}
args = append(args, xto.threshold)
if xto.limit > 0 {
args = append(args, "LIMIT", utils.IntToString(xto.limit))
args = append(args, xTrimOptions.threshold)
if xTrimOptions.limit > 0 {
args = append(args, "LIMIT", utils.IntToString(xTrimOptions.limit))
}
var err error
return args, err
return args, nil
}
Loading

0 comments on commit 733196e

Please sign in to comment.