Skip to content

Commit

Permalink
Try testing
Browse files Browse the repository at this point in the history
  • Loading branch information
adatzer committed Nov 7, 2024
1 parent c4b233c commit ee24797
Showing 1 changed file with 49 additions and 0 deletions.
49 changes: 49 additions & 0 deletions shard_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,55 @@ func TestShardConsumer(t *testing.T) {
assert.Equal(t, 100, len(result))
}

// TestExpiredShardIteratorError aims to prove that on ErrCodeExpiredIteratorException
// we recover
func TestExpiredShardIteratorError(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}
streamName := "TestExpiredShardIterator_stream"

k, dynamo := kinesisAndDynamoInstances()

defer func() {
err := cleanupTestEnvironment(t, k, dynamo, streamName)
require.NoError(t, err, "Problems cleaning up the test environment")
}()

err := setupTestEnvironment(t, k, dynamo, streamName, 1)
require.NoError(t, err, "Problems setting up the test environment")

config := NewConfig().WithBufferSize(1000)
config = config.WithShardCheckFrequency(500 * time.Millisecond)
config = config.WithLeaderActionFrequency(500 * time.Millisecond)

kinsumer1, err := NewWithInterfaces(k, dynamo, streamName, *applicationName, "client_1", config)

desc, err := k.DescribeStream(&kinesis.DescribeStreamInput{
StreamName: &streamName,
Limit: aws.Int64(shardLimit),
})
require.NoError(t, err, "Error describing stream")
shard := *desc.StreamDescription.Shards[0].ShardId // Get shard ID

// Consume a shard manually
kinsumer1.waitGroup.Add(1) // consume will mark waitgroup as done on exit, so we add to it to avoid a panic


go kinsumer1.consume(shard)

// sleep 6 minutes to cause expired iterator, then send data
//time.Sleep(6 * time.Minute)

go spamStreamModified(t, k, 100, streamName, 0)

result := readEventsToSlice(kinsumer1.records, 5*time.Second)

assert.Equal(t, 100, len(result))


}

// TestForcefulOwnershipChange aims to isolate the basic conditions where a client claims ownership of a shard before another client has released it.
func TestForcefulOwnershipChange(t *testing.T) {
if testing.Short() {
Expand Down

0 comments on commit ee24797

Please sign in to comment.