Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add minId overloads #2842

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/StackExchange.Redis/Interfaces/IDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2775,6 +2775,16 @@ IEnumerable<SortedSetEntry> SortedSetScan(
/// <remarks><seealso href="https://redis.io/topics/streams-intro"/></remarks>
long StreamTrim(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None);

/// <summary>
/// Trim the stream to a specified minimum timestamp.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="minId">All entries with an id (timestamp) earlier minId will be removed.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The number of messages removed from the stream.</returns>
/// <remarks><seealso href="https://redis.io/topics/streams-intro"/></remarks>
long StreamTrim(RedisKey key, RedisValue minId, CommandFlags flags);

/// <summary>
/// If key already exists and is a string, this command appends the value at the end of the string.
/// If key does not exist it is created and set as an empty string, so APPEND will be similar to SET in this special case.
Expand Down
3 changes: 3 additions & 0 deletions src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,9 @@ IAsyncEnumerable<SortedSetEntry> SortedSetScanAsync(
/// <inheritdoc cref="IDatabase.StreamTrim(RedisKey, int, bool, CommandFlags)"/>
Task<long> StreamTrimAsync(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None);

/// <inheritdoc cref="IDatabase.StreamTrim(RedisKey, RedisValue, CommandFlags)"/>
Task<long> StreamTrimAsync(RedisKey key, RedisValue minId, CommandFlags flags);

/// <inheritdoc cref="IDatabase.StringAppend(RedisKey, RedisValue, CommandFlags)"/>
Task<long> StringAppendAsync(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None);

Expand Down
3 changes: 3 additions & 0 deletions src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,9 @@ public Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions
public Task<long> StreamTrimAsync(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) =>
Inner.StreamTrimAsync(ToInner(key), maxLength, useApproximateMaxLength, flags);

public Task<long> StreamTrimAsync(RedisKey key, RedisValue minId, CommandFlags flags) =>
Inner.StreamTrimAsync(ToInner(key), minId, flags);

public Task<long> StringAppendAsync(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None) =>
Inner.StringAppendAsync(ToInner(key), value, flags);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,9 @@ public RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValu
public long StreamTrim(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) =>
Inner.StreamTrim(ToInner(key), maxLength, useApproximateMaxLength, flags);

public long StreamTrim(RedisKey key, RedisValue minId, CommandFlags flags) =>
Inner.StreamTrim(ToInner(key), minId, flags);

public long StringAppend(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None) =>
Inner.StringAppend(ToInner(key), value, flags);

Expand Down
4 changes: 3 additions & 1 deletion src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
#nullable enable
#nullable enable
StackExchange.Redis.IDatabase.StreamTrim(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue minId, StackExchange.Redis.CommandFlags flags) -> long
StackExchange.Redis.IDatabaseAsync.StreamTrimAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue minId, StackExchange.Redis.CommandFlags flags) -> System.Threading.Tasks.Task<long>!
14 changes: 14 additions & 0 deletions src/StackExchange.Redis/RedisDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3007,6 +3007,20 @@ public Task<long> StreamTrimAsync(RedisKey key, int maxLength, bool useApproxima
return ExecuteAsync(msg, ResultProcessor.Int64);
}

public long StreamTrim(RedisKey key, RedisValue minId, CommandFlags flags = CommandFlags.None)
{
var values = new[] { StreamConstants.MinId, minId };
var msg = Message.Create(Database, flags, RedisCommand.XTRIM, key, values);
return ExecuteSync(msg, ResultProcessor.Int64);
}

public Task<long> StreamTrimAsync(RedisKey key, RedisValue minId, CommandFlags flags = CommandFlags.None)
{
var values = new[] { StreamConstants.MinId, minId };
var msg = Message.Create(Database, flags, RedisCommand.XTRIM, key, values);
return ExecuteAsync(msg, ResultProcessor.Int64);
}

public long StringAppend(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(Database, flags, RedisCommand.APPEND, key, value);
Expand Down
1 change: 1 addition & 0 deletions src/StackExchange.Redis/StreamConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ internal static class StreamConstants
internal static readonly RedisValue SetId = "SETID";

internal static readonly RedisValue MaxLen = "MAXLEN";
internal static readonly RedisValue MinId = "MINID";

internal static readonly RedisValue MkStream = "MKSTREAM";

Expand Down
7 changes: 7 additions & 0 deletions tests/StackExchange.Redis.Tests/KeyPrefixedDatabaseTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1202,6 +1202,13 @@ public void StreamTrim()
mock.Received().StreamTrim("prefix:key", 1000, true, CommandFlags.None);
}

[Fact]
public void StreamTrimMinId()
{
prefixed.StreamTrim("key", 1111111111, CommandFlags.None);
mock.Received().StreamTrim("prefix:key", 1111111111, CommandFlags.None);
}

[Fact]
public void StringAppend()
{
Expand Down
7 changes: 7 additions & 0 deletions tests/StackExchange.Redis.Tests/KeyPrefixedTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1118,6 +1118,13 @@ public async Task StreamTrimAsync()
await mock.Received().StreamTrimAsync("prefix:key", 1000, true, CommandFlags.None);
}

[Fact]
public async Task StreamTrimMinIdAsync()
{
await prefixed.StreamTrimAsync("key", 1111111111, CommandFlags.None);
await mock.Received().StreamTrimAsync("prefix:key", 1111111111, CommandFlags.None);
}

[Fact]
public async Task StringAppendAsync()
{
Expand Down
20 changes: 20 additions & 0 deletions tests/StackExchange.Redis.Tests/StreamTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1916,6 +1916,26 @@ public void StreamTrimLength()
Assert.Equal(1, len);
}

[Fact]
public void StreamTrimMinId()
{
using var conn = Create(require: RedisFeatures.v5_0_0);

var db = conn.GetDatabase();
var key = Me();

// Add a couple items and check length.
db.StreamAdd(key, "field1", "value1", 1111111110);
db.StreamAdd(key, "field2", "value2", 1111111111);
db.StreamAdd(key, "field3", "value3", 1111111112);

var numRemoved = db.StreamTrim(key, 1111111111, CommandFlags.None);
var len = db.StreamLength(key);

Assert.Equal(1, numRemoved);
Assert.Equal(2, len);
}

[Fact]
public void StreamVerifyLength()
{
Expand Down
Loading