Skip to content

Commit

Permalink
Java: re-enabled xclaim binary tests (valkey-io#1867)
Browse files Browse the repository at this point in the history
Java: always return `byte[]` if `encoding_utf8` is `false` for the following types: `BulkString`, `SimpleString` & `VerbatimString`
  • Loading branch information
eifrah-aws authored Jul 8, 2024
1 parent 0bf25d1 commit aa4fbae
Show file tree
Hide file tree
Showing 2 changed files with 204 additions and 192 deletions.
380 changes: 190 additions & 190 deletions java/integTest/src/test/java/glide/SharedCommandTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -7799,196 +7799,196 @@ public void xpending_xclaim(BaseClient client) {
assertEquals(2, pending_results_extended.length);
}

// @SneakyThrows
// @ParameterizedTest(autoCloseArguments = false)
// @MethodSource("getClients")
// public void xpending_xclaim_binary(BaseClient client) {

// GlideString key = gs(UUID.randomUUID().toString());
// GlideString groupName = gs("group" + UUID.randomUUID());
// GlideString zeroStreamId = gs("0");
// GlideString consumer1 = gs("consumer-1-" + UUID.randomUUID());
// GlideString consumer2 = gs("consumer-2-" + UUID.randomUUID());

// // create group and consumer for the group
// assertEquals(
// OK,
// client
// .xgroupCreate(
// key, groupName, zeroStreamId,
// StreamGroupOptions.builder().makeStream().build())
// .get());
// assertTrue(client.xgroupCreateConsumer(key, groupName, consumer1).get());
// assertTrue(client.xgroupCreateConsumer(key, groupName, consumer2).get());

// // Add two stream entries for consumer 1
// GlideString streamid_1 = client.xadd(key, Map.of(gs("field1"), gs("value1"))).get();
// assertNotNull(streamid_1);
// GlideString streamid_2 = client.xadd(key, Map.of(gs("field2"), gs("value2"))).get();
// assertNotNull(streamid_2);

// // read the entire stream for the consumer and mark messages as pending
// var result_1 = client.xreadgroup(Map.of(key, gs(">")), groupName, consumer1).get();
// assertDeepEquals(
// Map.of(
// key,
// Map.of(
// streamid_1, new GlideString[][] {{gs("field1"), gs("value1")}},
// streamid_2, new GlideString[][] {{gs("field2"),
// gs("value2")}})),
// result_1);

// // Add three stream entries for consumer 2
// GlideString streamid_3 = client.xadd(key, Map.of(gs("field3"), gs("value3"))).get();
// assertNotNull(streamid_3);
// GlideString streamid_4 = client.xadd(key, Map.of(gs("field4"), gs("value4"))).get();
// assertNotNull(streamid_4);
// GlideString streamid_5 = client.xadd(key, Map.of(gs("field5"), gs("value5"))).get();
// assertNotNull(streamid_5);

// // read the entire stream for the consumer and mark messages as pending
// var result_2 = client.xreadgroup(Map.of(key, gs(">")), groupName, consumer2).get();
// assertDeepEquals(
// Map.of(
// key,
// Map.of(
// streamid_3, new GlideString[][] {{gs("field3"), gs("value3")}},
// streamid_4, new GlideString[][] {{gs("field4"), gs("value4")}},
// streamid_5, new GlideString[][] {{gs("field5"),
// gs("value5")}})),
// result_2);

// Object[] pending_results = client.xpending(key, groupName).get();
// Object[] expectedResult = {
// Long.valueOf(5L),
// streamid_1,
// streamid_5,
// new Object[][] {{consumer1, gs("2")}, {consumer2, gs("3")}}
// };
// assertDeepEquals(expectedResult, pending_results);

// // ensure idle_time > 0
// Thread.sleep(2000);
// Object[][] pending_results_extended =
// client.xpending(key, groupName, InfRangeBound.MIN, InfRangeBound.MAX,
// 10L).get();

// // because of idle time return, we have to remove it from the expected results
// // and check it separately
// assertArrayEquals(
// new Object[] {streamid_1, consumer1, 1L},
// ArrayUtils.remove(pending_results_extended[0], 2));
// assertTrue((Long) pending_results_extended[0][2] > 0L);

// assertArrayEquals(
// new Object[] {streamid_2, consumer1, 1L},
// ArrayUtils.remove(pending_results_extended[1], 2));
// assertTrue((Long) pending_results_extended[1][2] > 0L);

// assertArrayEquals(
// new Object[] {streamid_3, consumer2, 1L},
// ArrayUtils.remove(pending_results_extended[2], 2));
// assertTrue((Long) pending_results_extended[2][2] >= 0L);

// assertArrayEquals(
// new Object[] {streamid_4, consumer2, 1L},
// ArrayUtils.remove(pending_results_extended[3], 2));
// assertTrue((Long) pending_results_extended[3][2] >= 0L);

// assertArrayEquals(
// new Object[] {streamid_5, consumer2, 1L},
// ArrayUtils.remove(pending_results_extended[4], 2));
// assertTrue((Long) pending_results_extended[4][2] >= 0L);

// // use claim to claim stream 3 and 5 for consumer 1
// var claimResults =
// client
// .xclaim(key, groupName, consumer1, 0L, new GlideString[] {streamid_3,
// streamid_5})
// .get();

// assertDeepEquals(
// Map.of(
// streamid_3,
// new GlideString[][] {{gs("field3"), gs("value3")}},
// streamid_5,
// new GlideString[][] {{gs("field5"), gs("value5")}}),
// claimResults);

// var claimResultsJustId =
// client
// .xclaimJustId(key, groupName, consumer1, 0L, new GlideString[]
// {streamid_3, streamid_5})
// .get();
// assertArrayEquals(new GlideString[] {streamid_3, streamid_5}, claimResultsJustId);

// // add one more stream
// GlideString streamid_6 = client.xadd(key, Map.of(gs("field6"), gs("value6"))).get();
// assertNotNull(streamid_6);

// // using force, we can xclaim the message without reading it
// var claimForceResults =
// client
// .xclaim(
// key,
// groupName,
// consumer2,
// 0L,
// new GlideString[] {streamid_6},
// StreamClaimOptions.builder().force().retryCount(99L).build())
// .get();
// assertDeepEquals(
// Map.of(streamid_6, new GlideString[][] {{gs("field6"), gs("value6")}}),
// claimForceResults);

// Object[][] forcePendingResults =
// client.xpending(key, groupName, IdBound.of(streamid_6), IdBound.of(streamid_6),
// 1L).get();
// assertEquals(streamid_6, forcePendingResults[0][0]);
// assertEquals(consumer2, forcePendingResults[0][1]);
// assertEquals(99L, forcePendingResults[0][3]);

// // acknowledge streams 2, 3, 4, and 6 and remove them from the xpending results
// assertEquals(
// 4L,
// client
// .xack(
// key, groupName, new GlideString[] {streamid_2, streamid_3,
// streamid_4, streamid_6})
// .get());

// pending_results_extended =
// client
// .xpending(key, groupName, IdBound.ofExclusive(streamid_3),
// InfRangeBound.MAX, 10L)
// .get();
// assertEquals(1, pending_results_extended.length);
// assertEquals(streamid_5, pending_results_extended[0][0]);
// assertEquals(consumer1, pending_results_extended[0][1]);

// pending_results_extended =
// client
// .xpending(key, groupName, InfRangeBound.MIN,
// IdBound.ofExclusive(streamid_5), 10L)
// .get();
// assertEquals(1, pending_results_extended.length);
// assertEquals(streamid_1, pending_results_extended[0][0]);
// assertEquals(consumer1, pending_results_extended[0][1]);

// pending_results_extended =
// client
// .xpending(
// key,
// groupName,
// InfRangeBound.MIN,
// InfRangeBound.MAX,
// 10L,
//
// StreamPendingOptionsBinary.builder().minIdleTime(1L).consumer(consumer1).build())
// .get();
// // note: streams ID 1 and 5 are still pending, all others were acknowledged
// assertEquals(2, pending_results_extended.length);
// }
@SneakyThrows
@ParameterizedTest(autoCloseArguments = false)
@MethodSource("getClients")
public void xpending_xclaim_binary(BaseClient client) {

GlideString key = gs(UUID.randomUUID().toString());
GlideString groupName = gs("groupbin" + UUID.randomUUID());
GlideString zeroStreamId = gs("0");
GlideString consumer1 = gs("consumer-1-" + UUID.randomUUID());
GlideString consumer2 = gs("consumer-2-" + UUID.randomUUID());

// create group and consumer for the group
assertEquals(
OK,
client
.xgroupCreate(
key, groupName, zeroStreamId, StreamGroupOptions.builder().makeStream().build())
.get());
assertTrue(client.xgroupCreateConsumer(key, groupName, consumer1).get());
assertTrue(client.xgroupCreateConsumer(key, groupName, consumer2).get());

// Add two stream entries for consumer 1
GlideString streamid_1 = client.xadd(key, Map.of(gs("field1"), gs("value1"))).get();
assertNotNull(streamid_1);
GlideString streamid_2 = client.xadd(key, Map.of(gs("field2"), gs("value2"))).get();
assertNotNull(streamid_2);

// read the entire stream for the consumer and mark messages as pending
var result_1 = client.xreadgroup(Map.of(key, gs(">")), groupName, consumer1).get();
assertDeepEquals(
Map.of(
key,
Map.of(
streamid_1, new GlideString[][] {{gs("field1"), gs("value1")}},
streamid_2, new GlideString[][] {{gs("field2"), gs("value2")}})),
result_1);

// Add three stream entries for consumer 2
GlideString streamid_3 = client.xadd(key, Map.of(gs("field3"), gs("value3"))).get();
assertNotNull(streamid_3);
GlideString streamid_4 = client.xadd(key, Map.of(gs("field4"), gs("value4"))).get();
assertNotNull(streamid_4);
GlideString streamid_5 = client.xadd(key, Map.of(gs("field5"), gs("value5"))).get();
assertNotNull(streamid_5);

// read the entire stream for the consumer and mark messages as pending
var result_2 = client.xreadgroup(Map.of(key, gs(">")), groupName, consumer2).get();
assertDeepEquals(
Map.of(
key,
Map.of(
streamid_3, new GlideString[][] {{gs("field3"), gs("value3")}},
streamid_4, new GlideString[][] {{gs("field4"), gs("value4")}},
streamid_5, new GlideString[][] {{gs("field5"), gs("value5")}})),
result_2);

Object[] pending_results = client.xpending(key, groupName).get();
Object[] expectedResult = {
Long.valueOf(5L),
streamid_1,
streamid_5,
new Object[][] {{consumer1, gs("2")}, {consumer2, gs("3")}}
};
assertDeepEquals(expectedResult, pending_results);

// ensure idle_time > 0
Thread.sleep(2000);
Object[][] pending_results_extended =
client.xpending(key, groupName, InfRangeBound.MIN, InfRangeBound.MAX, 10L).get();

System.out.println("xpending result:");
for (int i = 0; i < pending_results_extended.length; i++) {
System.out.println((GlideString) pending_results_extended[i][0]);
}

// because of idle time return, we have to remove it from the expected results
// and check it separately
assertArrayEquals(
new Object[] {streamid_1, consumer1, 1L},
ArrayUtils.remove(pending_results_extended[0], 2));
assertTrue((Long) pending_results_extended[0][2] > 0L);

assertArrayEquals(
new Object[] {streamid_2, consumer1, 1L},
ArrayUtils.remove(pending_results_extended[1], 2));
assertTrue((Long) pending_results_extended[1][2] > 0L);

assertArrayEquals(
new Object[] {streamid_3, consumer2, 1L},
ArrayUtils.remove(pending_results_extended[2], 2));
assertTrue((Long) pending_results_extended[2][2] >= 0L);

assertArrayEquals(
new Object[] {streamid_4, consumer2, 1L},
ArrayUtils.remove(pending_results_extended[3], 2));
assertTrue((Long) pending_results_extended[3][2] >= 0L);

assertArrayEquals(
new Object[] {streamid_5, consumer2, 1L},
ArrayUtils.remove(pending_results_extended[4], 2));
assertTrue((Long) pending_results_extended[4][2] >= 0L);

// use claim to claim stream 3 and 5 for consumer 1
var claimResults =
client
.xclaim(key, groupName, consumer1, 0L, new GlideString[] {streamid_3, streamid_5})
.get();
assertNotNull(claimResults);
assertEquals(claimResults.size(), 2);
for (var e : claimResults.entrySet()) {
System.out.println("Key: " + e.getKey().getString());
}

assertNotNull(claimResults.get(streamid_5));
assertNotNull(claimResults.get(streamid_3));
assertDeepEquals(
Map.of(
streamid_3,
new GlideString[][] {{gs("field3"), gs("value3")}},
streamid_5,
new GlideString[][] {{gs("field5"), gs("value5")}}),
claimResults);

var claimResultsJustId =
client
.xclaimJustId(key, groupName, consumer1, 0L, new GlideString[] {streamid_3, streamid_5})
.get();
assertArrayEquals(new GlideString[] {streamid_3, streamid_5}, claimResultsJustId);

// add one more stream
GlideString streamid_6 = client.xadd(key, Map.of(gs("field6"), gs("value6"))).get();
assertNotNull(streamid_6);

// using force, we can xclaim the message without reading it
var claimForceResults =
client
.xclaim(
key,
groupName,
consumer2,
0L,
new GlideString[] {streamid_6},
StreamClaimOptions.builder().force().retryCount(99L).build())
.get();
assertDeepEquals(
Map.of(streamid_6, new GlideString[][] {{gs("field6"), gs("value6")}}), claimForceResults);

Object[][] forcePendingResults =
client.xpending(key, groupName, IdBound.of(streamid_6), IdBound.of(streamid_6), 1L).get();
assertEquals(streamid_6, forcePendingResults[0][0]);
assertEquals(consumer2, forcePendingResults[0][1]);
assertEquals(99L, forcePendingResults[0][3]);

// acknowledge streams 2, 3, 4, and 6 and remove them from the xpending results
assertEquals(
4L,
client
.xack(
key, groupName, new GlideString[] {streamid_2, streamid_3, streamid_4, streamid_6})
.get());

pending_results_extended =
client
.xpending(key, groupName, IdBound.ofExclusive(streamid_3), InfRangeBound.MAX, 10L)
.get();
assertEquals(1, pending_results_extended.length);
assertEquals(streamid_5, pending_results_extended[0][0]);
assertEquals(consumer1, pending_results_extended[0][1]);

pending_results_extended =
client
.xpending(key, groupName, InfRangeBound.MIN, IdBound.ofExclusive(streamid_5), 10L)
.get();
assertEquals(1, pending_results_extended.length);
assertEquals(streamid_1, pending_results_extended[0][0]);
assertEquals(consumer1, pending_results_extended[0][1]);

pending_results_extended =
client
.xpending(
key,
groupName,
InfRangeBound.MIN,
InfRangeBound.MAX,
10L,
StreamPendingOptionsBinary.builder().minIdleTime(1L).consumer(consumer1).build())
.get();
// note: streams ID 1 and 5 are still pending, all others were acknowledged
assertEquals(2, pending_results_extended.length);
}

@SneakyThrows
@ParameterizedTest(autoCloseArguments = false)
Expand Down
Loading

0 comments on commit aa4fbae

Please sign in to comment.