diff --git a/kafka-client/build.gradle.kts b/kafka-client/build.gradle.kts index 717b13e4b..8b54294ae 100644 --- a/kafka-client/build.gradle.kts +++ b/kafka-client/build.gradle.kts @@ -138,6 +138,7 @@ dependencies { implementation(libs.bundles.scylla) implementation(libs.bundles.commons) implementation(libs.mongodb.driver.sync) + implementation(libs.bundles.dynamo) implementation(libs.bundles.otel) implementation(libs.bundles.grpc) implementation(libs.protobuf.java.util) diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java index c908d7fba..049c990da 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java @@ -12,6 +12,7 @@ package dev.responsive.kafka.api; +import static dev.responsive.kafka.api.config.ResponsiveConfig.DYNAMODB_ENDPOINT_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.METRICS_ENABLED_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_ADDITIONAL_CONNECTION_STRING_PARAMS_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_ENDPOINT_CONFIG; @@ -41,6 +42,7 @@ import dev.responsive.kafka.internal.config.ResponsiveStreamsConfig; import dev.responsive.kafka.internal.db.CassandraClientFactory; import dev.responsive.kafka.internal.db.DefaultCassandraClientFactory; +import dev.responsive.kafka.internal.db.DynamoClient; import dev.responsive.kafka.internal.db.mongo.CollectionCreationOptions; import dev.responsive.kafka.internal.db.mongo.ResponsiveMongoClient; import dev.responsive.kafka.internal.db.rs3.RS3TableFactory; @@ -61,6 +63,7 @@ import dev.responsive.kafka.internal.utils.SessionUtil; import java.io.File; import java.io.IOException; +import java.net.URI; import java.nio.file.Files; import java.time.Duration; import java.util.Base64; @@ -93,6 +96,11 @@ import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; public class ResponsiveKafkaStreams extends KafkaStreams { @@ -532,23 +540,26 @@ public Params build() { final var admin = responsiveKafkaClientSupplier.getAdmin(responsiveConfig.originals()); if (compatibilityMode == CompatibilityMode.METRICS_ONLY) { sessionClients = new SessionClients( - Optional.empty(), Optional.empty(), Optional.empty(), false, admin); + Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), false, admin); return this; } final var backendType = ConfigUtils.storageBackend(responsiveConfig); switch (backendType) { case CASSANDRA: + LOG.info("using Cassandra responsive store"); final var cqlSession = cassandraFactory.createCqlSession(responsiveConfig, metrics); sessionClients = new SessionClients( Optional.empty(), Optional.of(cassandraFactory.createClient(cqlSession, responsiveConfig)), Optional.empty(), + Optional.empty(), false, admin ); break; case MONGO_DB: + LOG.info("using MongoDB responsive store"); final var hostname = responsiveConfig.getString(MONGO_ENDPOINT_CONFIG); final String clientId = responsiveConfig.getString(MONGO_USERNAME_CONFIG); final Password clientSecret = responsiveConfig.getPassword(MONGO_PASSWORD_CONFIG); @@ -570,6 +581,27 @@ public Params build() { )), Optional.empty(), Optional.empty(), + Optional.empty(), + false, + admin + ); + break; + case DYNAMO_DB: + LOG.info("using DynamoDB responsive store"); + final var dynamoEndpoint = responsiveConfig.getString(DYNAMODB_ENDPOINT_CONFIG); + final var dynamoDB = DynamoDbAsyncClient.builder() + .region(Region.US_WEST_2) + // TODO(agavra): support real credentials + .credentialsProvider(StaticCredentialsProvider.create( + AwsBasicCredentials.create("dummykey", "dummysecret"))) + .endpointOverride(URI.create(dynamoEndpoint)) + .httpClientBuilder(NettyNioAsyncHttpClient.builder()) + .build(); + sessionClients = new SessionClients( + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.of(new DynamoClient(dynamoDB, responsiveConfig)), false, admin ); @@ -580,6 +612,7 @@ public Params build() { Optional.empty(), Optional.empty(), Optional.empty(), + Optional.empty(), true, admin ); @@ -592,6 +625,7 @@ public Params build() { Optional.empty(), Optional.empty(), Optional.of(new RS3TableFactory(rs3Host, rs3Port)), + Optional.empty(), false, admin ); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java b/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java index 07bceaf43..0ce44e056 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java @@ -119,6 +119,11 @@ public class ResponsiveConfig extends AbstractConfig { public static final String RS3_PORT_CONFIG = "responsive.rs3.port"; private static final String RS3_PORT_DOC = "The port to use when connecting to RS3."; + // ------------------ DynamoDB specific configurations ----------------------- + + public static final String DYNAMODB_ENDPOINT_CONFIG = "responsive.dynamodb.endpoint"; + private static final String DYNAMODB_ENDPOINT_DOC = "The DynamoDB endpoint to connect to."; + // ------------------ ScyllaDB specific configurations ---------------------- public static final String CASSANDRA_USERNAME_CONFIG = "responsive.cassandra.username"; @@ -370,6 +375,16 @@ public class ResponsiveConfig extends AbstractConfig { MONGO_ENDPOINT_DOC ) + // dyanmodb connection configurations + .define( + DYNAMODB_ENDPOINT_CONFIG, + Type.STRING, + null, + new ConfigDef.NonEmptyString(), + Importance.HIGH, + DYNAMODB_ENDPOINT_DOC + ) + // cassandra connection configurations .define( CASSANDRA_USERNAME_CONFIG, diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/config/StorageBackend.java b/kafka-client/src/main/java/dev/responsive/kafka/api/config/StorageBackend.java index a9c384d21..ca386853e 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/config/StorageBackend.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/config/StorageBackend.java @@ -17,6 +17,7 @@ public enum StorageBackend { CASSANDRA, MONGO_DB, + DYNAMO_DB, IN_MEMORY, RS3; diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/DynamoClient.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/DynamoClient.java new file mode 100644 index 000000000..40a182086 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/DynamoClient.java @@ -0,0 +1,50 @@ +/* + * Copyright 2024 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + +package dev.responsive.kafka.internal.db; + +import dev.responsive.kafka.api.config.ResponsiveConfig; +import dev.responsive.kafka.internal.db.dynamo.DynamoKVTable; +import dev.responsive.kafka.internal.db.partitioning.TablePartitioner; +import dev.responsive.kafka.internal.db.spec.DefaultTableSpec; +import dev.responsive.kafka.internal.stores.TtlResolver; +import java.util.Optional; +import java.util.concurrent.TimeoutException; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; + +public class DynamoClient { + + private final ResponsiveConfig responsiveConfig; + private final TableCache kvTableCache; + + + public DynamoClient( + final DynamoDbAsyncClient dynamoDB, + final ResponsiveConfig responsiveConfig + ) { + this.responsiveConfig = responsiveConfig; + kvTableCache = new TableCache<>( + spec -> new DynamoKVTable( + dynamoDB, + spec.tableName() + )); + } + + public DynamoKVTable kvTable( + final String name, + final Optional> ttlResolver + ) throws InterruptedException, TimeoutException { + return kvTableCache.create( + new DefaultTableSpec(name, TablePartitioner.defaultPartitioner(), ttlResolver) + ); + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/dynamo/DynamoKVFlushManager.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/dynamo/DynamoKVFlushManager.java new file mode 100644 index 000000000..e2512ae32 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/dynamo/DynamoKVFlushManager.java @@ -0,0 +1,93 @@ +/* + * Copyright 2024 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + +package dev.responsive.kafka.internal.db.dynamo; + +import dev.responsive.kafka.internal.db.KVFlushManager; +import dev.responsive.kafka.internal.db.RemoteWriter; +import dev.responsive.kafka.internal.db.partitioning.TablePartitioner; +import dev.responsive.kafka.internal.stores.RemoteWriteResult; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; + +public class DynamoKVFlushManager extends KVFlushManager { + + private final String logPrefix; + private final Logger log; + + private final DynamoKVTable table; + + private final int kafkaPartition; + private final DynamoDbAsyncClient dynamo; + private final TablePartitioner partitioner; + + + public DynamoKVFlushManager( + final DynamoKVTable table, + final int kafkaPartition, + final DynamoDbAsyncClient dynamo + ) { + this.table = table; + this.kafkaPartition = kafkaPartition; + this.dynamo = dynamo; + + partitioner = TablePartitioner.defaultPartitioner(); + logPrefix = String.format("%s[%d] kv-store {epoch=%d} ", + table.name(), kafkaPartition, table.localEpoch(kafkaPartition)); + log = new LogContext(logPrefix).logger(DynamoKVFlushManager.class); + } + + @Override + public String tableName() { + return table.name(); + } + + @Override + public TablePartitioner partitioner() { + return partitioner; + } + + @Override + public RemoteWriter createWriter( + final Integer tablePartition, + final long consumedOffset + ) { + return new DynamoWriter<>(table, kafkaPartition, tablePartition, dynamo); + } + + @Override + public String failedFlushInfo(final long batchOffset, final Integer failedTablePartition) { + // TODO(agavra): add persisted epoch + return String.format(", ", + batchOffset, table.fetchOffset(kafkaPartition), + table.localEpoch(kafkaPartition), 0); + } + + @Override + public String logPrefix() { + return logPrefix; + } + + @Override + public RemoteWriteResult updateOffset(final long consumedOffset) { + try { + table.setOffset(kafkaPartition, consumedOffset); + return RemoteWriteResult.success(kafkaPartition); + } catch (final Exception e) { + // TODO(agavra): improve error handling to be closer to MongoKVFlushManager + log.error("Failed to update offset", e); + return RemoteWriteResult.failure(kafkaPartition); + } + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/dynamo/DynamoKVTable.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/dynamo/DynamoKVTable.java new file mode 100644 index 000000000..df39a5e2e --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/dynamo/DynamoKVTable.java @@ -0,0 +1,269 @@ +/* + * Copyright 2024 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + +package dev.responsive.kafka.internal.db.dynamo; + +import static dev.responsive.kafka.internal.stores.ResponsiveStoreRegistration.NO_COMMITTED_OFFSET; + +import com.fasterxml.jackson.databind.ObjectMapper; +import dev.responsive.kafka.internal.db.KVFlushManager; +import dev.responsive.kafka.internal.db.RemoteKVTable; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.BillingMode; +import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest; +import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest; +import software.amazon.awssdk.services.dynamodb.model.DynamoDbRequest; +import software.amazon.awssdk.services.dynamodb.model.GetItemRequest; +import software.amazon.awssdk.services.dynamodb.model.GetItemResponse; +import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement; +import software.amazon.awssdk.services.dynamodb.model.KeyType; +import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; +import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException; +import software.amazon.awssdk.services.dynamodb.model.ReturnValue; +import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; +import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; + +public class DynamoKVTable implements RemoteKVTable { + + private static final Logger LOG = LoggerFactory.getLogger(DynamoKVTable.class); + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private final String name; + private final DynamoDbAsyncClient dynamoDB; + private final ConcurrentMap kafkaPartitionToEpoch = new ConcurrentHashMap<>(); + + public DynamoKVTable( + final DynamoDbAsyncClient dynamoDB, + final String name + ) { + this.dynamoDB = dynamoDB; + this.name = name; + createTable(name, ScalarAttributeType.B); + createTable(metaTableName(name), ScalarAttributeType.N); + } + + private void createTable(final String name, final ScalarAttributeType keyType) { + final var createTable = CreateTableRequest.builder() + .tableName(name) + .keySchema(KeySchemaElement.builder() + .attributeName("key") + .keyType(KeyType.HASH) + .build()) + .attributeDefinitions(AttributeDefinition.builder() + .attributeName("key") + .attributeType(keyType) + .build()) + .billingMode(BillingMode.PAY_PER_REQUEST) // TODO(agavra): allow provisioned throughput + .build(); + + try { + dynamoDB + .createTable(createTable) + .thenApply(resp -> resp.tableDescription().tableName()) + .exceptionally(e -> { + if (e.getCause() instanceof ResourceInUseException) { + // ignored + return name; + } else { + throw new RuntimeException(e); + } + }) + .get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + @Override + public KVFlushManager init(final int kafkaPartition) { + try { + final UpdateItemRequest update = UpdateItemRequest.builder() + .tableName(metaTableName(this.name)) + .key(Map.of( + "key", AttributeValue.fromN(String.valueOf(kafkaPartition)) + )) + .updateExpression( + "SET epoch = if_not_exists(epoch, :zero) + :one, #o = if_not_exists(#o, :offsetValue)" + ) + .expressionAttributeNames(Map.of( + "#o", "offset" + )) + .expressionAttributeValues(Map.of( + ":zero", AttributeValue.fromN("0"), + ":one", AttributeValue.fromN("1"), + ":offsetValue", AttributeValue.fromN(String.valueOf(NO_COMMITTED_OFFSET)) + )) + .returnValues(ReturnValue.UPDATED_NEW) + .build(); + + final var result = dynamoDB.updateItem(update).get(); + final var epoch = result.attributes().get("epoch").n(); + kafkaPartitionToEpoch.put(kafkaPartition, Long.parseLong(epoch)); + } catch (final Exception e) { + throw new RuntimeException(e); + } + + return new DynamoKVFlushManager(this, kafkaPartition, dynamoDB); + } + + @Override + public byte[] get(final int kafkaPartition, final Bytes key, final long streamTimeMs) { + // TODO(agavra): support minValidTs + final GetItemRequest req = GetItemRequest.builder() + .tableName(name) + .key(Map.of("key", AttributeValue.fromB(SdkBytes.fromByteArray(key.get())))) + .build(); + + try { + final GetItemResponse resp = dynamoDB.getItem(req).get(); + return resp.hasItem() ? resp.item().get("val").b().asByteArray() : null; + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + @Override + public KeyValueIterator range( + final int kafkaPartition, + final Bytes from, + final Bytes to, + final long streamTimeMs + ) { + throw new UnsupportedOperationException(); + } + + @Override + public KeyValueIterator all(final int kafkaPartition, final long streamTimeMs) { + throw new UnsupportedOperationException(); + } + + @Override + public long approximateNumEntries(final int kafkaPartition) { + LOG.warn("approximateNumEntries is not yet implemented for DynamoDB"); + return 0; + } + + @Override + public String name() { + return name; + } + + @Override + public PutItemRequest insert( + final int kafkaPartition, + final Bytes key, + final byte[] value, + final long epochMillis + ) { + final long epoch = kafkaPartitionToEpoch.get(kafkaPartition); + // TODO(agavra): account for TTL index and tombstones once delete is updated + return PutItemRequest.builder() + .tableName(name) + .item(Map.of( + "key", AttributeValue.fromB(SdkBytes.fromByteArray(key.get())), + "val", AttributeValue.fromB(SdkBytes.fromByteArray(value)), + "epoch", AttributeValue.fromN(String.valueOf(epoch)), + "ts", AttributeValue.fromN(String.valueOf(epochMillis)) + )) + .conditionExpression("attribute_not_exists(epoch) OR epoch <= :epochValue") + .expressionAttributeValues(Map.of( + ":epochValue", AttributeValue.fromN(String.valueOf(epoch)) + )) + .build(); + } + + @Override + public DeleteItemRequest delete(final int kafkaPartition, final Bytes key) { + // TODO(agavra): replace me with tombstone and ttl index + final long epoch = kafkaPartitionToEpoch.get(kafkaPartition); + return DeleteItemRequest.builder() + .tableName(name) + .key(Map.of("key", AttributeValue.fromB(SdkBytes.fromByteArray(key.get())))) + .conditionExpression("attribute_not_exists(epoch) OR epoch <= :epochValue") + .expressionAttributeValues(Map.of( + ":epochValue", AttributeValue.fromN(String.valueOf(epoch)) + )) + .build(); + } + + public void setOffset(final int kafkaPartition, final long offset) { + final long epoch = kafkaPartitionToEpoch.get(kafkaPartition); + + try { + final var req = PutItemRequest.builder() + .tableName(metaTableName(name)) + .item(Map.of( + "key", AttributeValue.fromN(String.valueOf(kafkaPartition)), + "epoch", AttributeValue.fromN(String.valueOf(epoch)), + "offset", AttributeValue.fromN(String.valueOf(offset)) + )) + .conditionExpression("attribute_not_exists(epoch) OR epoch <= :epochValue") + .expressionAttributeValues(Map.of( + ":epochValue", AttributeValue.fromN(String.valueOf(epoch)) + )) + .build(); + + // TODO(agavra): should we check if it worked (re: epoch)? + dynamoDB.putItem(req).get(); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public long fetchOffset(final int kafkaPartition) { + final var offset = getMeta(kafkaPartition).get("offset").n(); + return Long.parseLong(offset); + } + + private static String metaTableName(final String tableName) { + return tableName + "_md"; + } + + public long localEpoch(final int kafkaPartition) { + return kafkaPartitionToEpoch.get(kafkaPartition); + } + + public long fetchEpoch(final int kafkaPartition) { + final var epoch = getMeta(kafkaPartition).get("epoch").n(); + return Long.parseLong(epoch); + } + + private Map getMeta(final int kafkaPartition) { + final var req = GetItemRequest.builder() + .tableName(metaTableName(name)) + .key(Map.of("key", AttributeValue.fromN(String.valueOf(kafkaPartition)))) + .build(); + try { + final var resp = dynamoDB.getItem(req).get(); + if (resp.hasItem()) { + return resp.item(); + } + throw new IllegalStateException( + "Could not find metadata row for partition " + kafkaPartition + " in table " + + metaTableName(name)); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/dynamo/DynamoWriter.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/dynamo/DynamoWriter.java new file mode 100644 index 000000000..c285c1a09 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/dynamo/DynamoWriter.java @@ -0,0 +1,90 @@ +/* + * Copyright 2024 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + +package dev.responsive.kafka.internal.db.dynamo; + +import com.google.common.collect.Iterators; +import dev.responsive.kafka.internal.db.RemoteTable; +import dev.responsive.kafka.internal.db.RemoteWriter; +import dev.responsive.kafka.internal.stores.RemoteWriteResult; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest; +import software.amazon.awssdk.services.dynamodb.model.DynamoDbRequest; +import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; + +public class DynamoWriter implements RemoteWriter { + + private static final Logger LOG = LoggerFactory.getLogger(DynamoWriter.class); + private final RemoteTable + table; + private final int kafkaPartition; + private final P tablePartition; + + private final List accumulatedWrites = new ArrayList<>(); + private final DynamoDbAsyncClient dynamo; + + public DynamoWriter( + final RemoteTable table, + final int kafkaPartition, + final P tablePartition, + final DynamoDbAsyncClient dynamo + ) { + this.table = table; + this.kafkaPartition = kafkaPartition; + this.tablePartition = tablePartition; + this.dynamo = dynamo; + } + + @Override + public void insert(final K key, final byte[] value, final long epochMillis) { + accumulatedWrites.add(table.insert(kafkaPartition, key, value, epochMillis)); + } + + @Override + public void delete(final K key) { + accumulatedWrites.add(table.delete(kafkaPartition, key)); + } + + @Override + public CompletionStage> flush() { + if (accumulatedWrites.isEmpty()) { + LOG.info("Skipping empty bulk write for partition {}", tablePartition); + return CompletableFuture.completedFuture(RemoteWriteResult.success(tablePartition)); + } + + try { + final List> futures = new ArrayList<>(); + final var requests = Iterators.consumingIterator(accumulatedWrites.iterator()); + while (requests.hasNext()) { + final var request = requests.next(); + if (request instanceof PutItemRequest) { + futures.add(dynamo.putItem((PutItemRequest) request)); + } else if (request instanceof DeleteItemRequest) { + futures.add(dynamo.deleteItem((DeleteItemRequest) request)); + } else { + throw new IllegalArgumentException("Unexpected accumulated request: " + request); + } + } + CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join(); + return CompletableFuture.completedFuture(RemoteWriteResult.success(tablePartition)); + } catch (final Exception e) { + LOG.error("Failed to flush to {}[{}].", table.name(), tablePartition, e); + return CompletableFuture.completedFuture(RemoteWriteResult.failure(tablePartition)); + } + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/PartitionedOperations.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/PartitionedOperations.java index 01664e4ad..2d788d4a5 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/PartitionedOperations.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/PartitionedOperations.java @@ -102,6 +102,9 @@ public static PartitionedOperations create( case MONGO_DB: table = createMongo(params, sessionClients, ttlResolver); break; + case DYNAMO_DB: + table = creatDynamoDb(params, sessionClients, ttlResolver); + break; case IN_MEMORY: table = createInMemory(params, ttlResolver); break; @@ -247,6 +250,14 @@ private static RemoteKVTable createRS3( return sessionClients.rs3TableFactory().kvTable(params.name().tableName()); } + private static RemoteKVTable creatDynamoDb( + final ResponsiveKeyValueParams params, + final SessionClients sessionClients, + final Optional> ttlResolver + ) throws InterruptedException, TimeoutException { + return sessionClients.dynamoDbClient().kvTable(params.name().tableName(), ttlResolver); + } + @SuppressWarnings("rawtypes") public PartitionedOperations( final Logger log, diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/SessionClients.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/SessionClients.java index a1c33c3f3..d9d40b4a9 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/SessionClients.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/SessionClients.java @@ -16,6 +16,7 @@ import dev.responsive.kafka.api.config.ResponsiveConfig; import dev.responsive.kafka.api.config.StorageBackend; import dev.responsive.kafka.internal.db.CassandraClient; +import dev.responsive.kafka.internal.db.DynamoClient; import dev.responsive.kafka.internal.db.mongo.ResponsiveMongoClient; import dev.responsive.kafka.internal.db.rs3.RS3TableFactory; import dev.responsive.kafka.internal.metrics.ResponsiveMetrics; @@ -35,6 +36,7 @@ public class SessionClients { private final Optional mongoClient; private final Optional cassandraClient; private final Optional rs3TableFactory; + private final Optional dynamoDbClient; private final boolean inMemory; private final Admin admin; @@ -47,12 +49,14 @@ public SessionClients( final Optional mongoClient, final Optional cassandraClient, final Optional rs3TableFactory, + final Optional dynamoDbClient, final boolean inMemory, final Admin admin ) { this.mongoClient = mongoClient; this.cassandraClient = cassandraClient; this.rs3TableFactory = rs3TableFactory; + this.dynamoDbClient = dynamoDbClient; this.inMemory = inMemory; this.admin = admin; } @@ -77,6 +81,8 @@ public StorageBackend storageBackend() { return StorageBackend.CASSANDRA; } else if (rs3TableFactory.isPresent()) { return StorageBackend.RS3; + } else if (dynamoDbClient.isPresent()) { + return StorageBackend.DYNAMO_DB; } else if (inMemory) { return StorageBackend.IN_MEMORY; } else { @@ -120,6 +126,17 @@ public CassandraClient cassandraClient() { return cassandraClient.get(); } + public DynamoClient dynamoDbClient() { + if (dynamoDbClient.isEmpty()) { + final IllegalStateException fatalException = + new IllegalStateException("DynamoDB client was missing"); + LOG.error(fatalException.getMessage(), fatalException); + throw fatalException; + } + + return dynamoDbClient.get(); + } + public Admin admin() { return admin; } diff --git a/kafka-client/src/test/java/dev/responsive/kafka/integration/MinimalIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/integration/MinimalIntegrationTest.java index ac5795370..e7442bd6e 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/integration/MinimalIntegrationTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/integration/MinimalIntegrationTest.java @@ -69,7 +69,7 @@ public class MinimalIntegrationTest { @RegisterExtension - static ResponsiveExtension EXTENSION = new ResponsiveExtension(StorageBackend.MONGO_DB); + static ResponsiveExtension EXTENSION = new ResponsiveExtension(StorageBackend.DYNAMO_DB); private static final String INPUT_TOPIC = "input"; private static final String OUTPUT_TOPIC = "output"; diff --git a/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKeyValueStoreEosIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKeyValueStoreEosIntegrationTest.java index 8a7fa1f5f..d2085a121 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKeyValueStoreEosIntegrationTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKeyValueStoreEosIntegrationTest.java @@ -74,15 +74,17 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestInfo; -import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@ExtendWith(ResponsiveExtension.class) public class ResponsiveKeyValueStoreEosIntegrationTest { + @RegisterExtension + static ResponsiveExtension EXTENSION = new ResponsiveExtension(); + private static final Logger LOG = LoggerFactory.getLogger(ResponsiveKeyValueStoreEosIntegrationTest.class); diff --git a/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKeyValueStoreRestoreIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKeyValueStoreRestoreIntegrationTest.java index d523df003..83eebf0fd 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKeyValueStoreRestoreIntegrationTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKeyValueStoreRestoreIntegrationTest.java @@ -12,6 +12,7 @@ package dev.responsive.kafka.integration; +import static dev.responsive.kafka.api.config.ResponsiveConfig.DYNAMODB_ENDPOINT_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_ENDPOINT_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_PASSWORD_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_USERNAME_CONFIG; @@ -55,10 +56,12 @@ import dev.responsive.kafka.api.config.StorageBackend; import dev.responsive.kafka.api.stores.ResponsiveKeyValueParams; import dev.responsive.kafka.api.stores.ResponsiveStores; +import dev.responsive.kafka.internal.config.ConfigUtils; import dev.responsive.kafka.internal.db.CassandraClient; import dev.responsive.kafka.internal.db.CassandraClientFactory; import dev.responsive.kafka.internal.db.DefaultCassandraClientFactory; import dev.responsive.kafka.internal.db.RemoteKVTable; +import dev.responsive.kafka.internal.db.dynamo.DynamoKVTable; import dev.responsive.kafka.internal.db.mongo.CollectionCreationOptions; import dev.responsive.kafka.internal.db.mongo.MongoKVTable; import dev.responsive.kafka.internal.db.partitioning.TablePartitioner; @@ -71,6 +74,7 @@ import dev.responsive.kafka.testutils.IntegrationTestUtils.MockResponsiveKafkaStreams; import dev.responsive.kafka.testutils.ResponsiveConfigParam; import dev.responsive.kafka.testutils.ResponsiveExtension; +import java.net.URI; import java.time.Duration; import java.util.Collection; import java.util.HashMap; @@ -114,6 +118,11 @@ import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; public class ResponsiveKeyValueStoreRestoreIntegrationTest { @@ -352,41 +361,55 @@ private RemoteKVTable remoteKVTable( final ResponsiveConfig config, final TopicPartition changelog ) throws InterruptedException, TimeoutException { - final RemoteKVTable table; - if (type == KVSchema.FACT) { - final CassandraClient cassandraClient = defaultFactory.createClient( - defaultFactory.createCqlSession(config, null), - config); - - table = cassandraClient.factFactory() - .create(new DefaultTableSpec( - aggName(), - TablePartitioner.defaultPartitioner(), - TtlResolver.NO_TTL - )); - - } else if (type == KVSchema.KEY_VALUE) { - final var hostname = config.getString(MONGO_ENDPOINT_CONFIG); - final String user = config.getString(MONGO_USERNAME_CONFIG); - final Password pass = config.getPassword(MONGO_PASSWORD_CONFIG); - final var mongoClient = SessionUtil.connect( - hostname, - user, - pass == null ? null : pass.value(), - "", - null - ); - table = new MongoKVTable( - mongoClient, - aggName(), - CollectionCreationOptions.fromConfig(config), - TtlResolver.NO_TTL - ); - table.init(0); - } else { - throw new IllegalArgumentException("Unsupported type: " + type); + RemoteKVTable table; + switch (ConfigUtils.storageBackend(config)) { + case CASSANDRA: + final CassandraClient cassandraClient = defaultFactory.createClient( + defaultFactory.createCqlSession(config, null), + config); + + table = cassandraClient.factFactory() + .create(new DefaultTableSpec( + aggName(), + TablePartitioner.defaultPartitioner(), + TtlResolver.NO_TTL + )); + break; + case MONGO_DB: + final var hostname = config.getString(MONGO_ENDPOINT_CONFIG); + final String user = config.getString(MONGO_USERNAME_CONFIG); + final Password pass = config.getPassword(MONGO_PASSWORD_CONFIG); + final var mongoClient = SessionUtil.connect( + hostname, + user, + pass == null ? null : pass.value(), + "", + null + ); + table = new MongoKVTable( + mongoClient, + aggName(), + CollectionCreationOptions.fromConfig(config), + TtlResolver.NO_TTL + ); + break; + case DYNAMO_DB: + final var dynamoEndpoint = config.getString(DYNAMODB_ENDPOINT_CONFIG); + final var dynamoDB = DynamoDbAsyncClient.builder() + .region(Region.US_WEST_2) + .credentialsProvider(StaticCredentialsProvider.create( + AwsBasicCredentials.create("dummykey", "dummysecret"))) + .endpointOverride(URI.create(dynamoEndpoint)) + .httpClientBuilder(NettyNioAsyncHttpClient.builder()) + .build(); + table = new DynamoKVTable(dynamoDB, aggName()); + break; + default: + throw new IllegalArgumentException(ConfigUtils.storageBackend(config).toString()); } + + table.init(0); return table; } @@ -553,11 +576,8 @@ private Map getMutableProperties(final KVSchema type) { final Map properties = new HashMap<>(responsiveProps); if (type == KVSchema.FACT) { + // override the default since only cassandra has different behavior for fact stores properties.put(ResponsiveConfig.STORAGE_BACKEND_TYPE_CONFIG, StorageBackend.CASSANDRA.name()); - } else if (type == KVSchema.KEY_VALUE) { - properties.put(ResponsiveConfig.STORAGE_BACKEND_TYPE_CONFIG, StorageBackend.MONGO_DB.name()); - } else { - throw new IllegalArgumentException("Unexpected schema type: " + type.name()); } properties.put(KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/CommitBufferTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/CommitBufferTest.java index 5e3961f5e..84c3938de 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/CommitBufferTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/CommitBufferTest.java @@ -173,6 +173,7 @@ public void before( Optional.empty(), Optional.of(client), Optional.empty(), + Optional.empty(), false, admin ); diff --git a/kafka-client/src/test/java/dev/responsive/kafka/testutils/DynamoDbContainer.java b/kafka-client/src/test/java/dev/responsive/kafka/testutils/DynamoDbContainer.java new file mode 100644 index 000000000..0df6c3949 --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/testutils/DynamoDbContainer.java @@ -0,0 +1,32 @@ +/* + * Copyright 2024 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + +package dev.responsive.kafka.testutils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.utility.DockerImageName; + +public class DynamoDbContainer extends GenericContainer { + private static final Logger LOG = LoggerFactory.getLogger(DynamoDbContainer.class); + + public DynamoDbContainer(final DockerImageName dockerImageName) { + super(dockerImageName); + this.withExposedPorts(8000); + this.withCommand("-jar DynamoDBLocal.jar -inMemory -sharedDb"); + } + + public String getConnectionString() { + return "http://" + getHost() + ":" + getMappedPort(8000); + } +} diff --git a/kafka-client/src/test/java/dev/responsive/kafka/testutils/ResponsiveExtension.java b/kafka-client/src/test/java/dev/responsive/kafka/testutils/ResponsiveExtension.java index 6d2a29bdd..0cb92e637 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/testutils/ResponsiveExtension.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/testutils/ResponsiveExtension.java @@ -17,6 +17,7 @@ import static dev.responsive.kafka.api.config.ResponsiveConfig.CASSANDRA_DESIRED_NUM_PARTITION_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.CASSANDRA_HOSTNAME_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.CASSANDRA_PORT_CONFIG; +import static dev.responsive.kafka.api.config.ResponsiveConfig.DYNAMODB_ENDPOINT_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_ENDPOINT_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.RESPONSIVE_ENV_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.RESPONSIVE_LICENSE_CONFIG; @@ -67,6 +68,7 @@ public class ResponsiveExtension .withEnv("KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS", "60000") .withReuse(true); public static MongoDBContainer mongo = new MongoDBContainer(TestConstants.MONGODB); + public static DynamoDbContainer dynamo = new DynamoDbContainer(TestConstants.DYNAMODB); public static Admin admin; @@ -98,7 +100,7 @@ public static void startAll() { LOG.info("Starting up Responsive test harness at {}", start); final var kafkaFuture = Startables.deepStart(kafka); - final var storageFuture = Startables.deepStart(cassandra, mongo); + final var storageFuture = Startables.deepStart(cassandra, mongo, dynamo); try { kafkaFuture.get(); admin = Admin.create(Map.of(BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers())); @@ -115,6 +117,7 @@ public static void startAll() { public static void stopAll() { cassandra.stop(); mongo.stop(); + dynamo.stop(); kafka.stop(); admin.close(); } @@ -152,6 +155,8 @@ public Object resolveParameter( return kafka; } else if (parameterContext.getParameter().getType() == RS3Container.class) { return rs3; + } else if (parameterContext.getParameter().getType() == DynamoDbContainer.class) { + return dynamo; } else if (parameterContext.getParameter().getType() == Admin.class) { return admin; } else if (isContainerConfig(parameterContext)) { @@ -173,6 +178,8 @@ public Object resolveParameter( map.put(STORAGE_BACKEND_TYPE_CONFIG, StorageBackend.RS3.name()); map.put(RS3_HOSTNAME_CONFIG, "localhost"); map.put(RS3_PORT_CONFIG, rs3.getMappedPort(50051)); + } else if (backend == StorageBackend.DYNAMO_DB) { + map.put(STORAGE_BACKEND_TYPE_CONFIG, StorageBackend.DYNAMO_DB.name()); } // throw in configuration for both backends since som tests are parameterized with both @@ -181,6 +188,7 @@ public Object resolveParameter( map.put(CASSANDRA_PORT_CONFIG, cassandra.getContactPoint().getPort()); map.put(CASSANDRA_DATACENTER_CONFIG, cassandra.getLocalDatacenter()); map.put(MONGO_ENDPOINT_CONFIG, mongo.getConnectionString()); + map.put(DYNAMODB_ENDPOINT_CONFIG, dynamo.getConnectionString()); if (parameterContext.getParameter().getType().equals(Map.class)) { return map; diff --git a/kafka-client/src/test/java/dev/responsive/kafka/testutils/TestConstants.java b/kafka-client/src/test/java/dev/responsive/kafka/testutils/TestConstants.java index 7aae1d30e..cc52feae9 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/testutils/TestConstants.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/testutils/TestConstants.java @@ -19,5 +19,7 @@ public class TestConstants { public static final DockerImageName CASSANDRA = DockerImageName.parse("cassandra:4.1.0"); public static final DockerImageName KAFKA = DockerImageName.parse("confluentinc/cp-kafka:7.3.2"); public static final DockerImageName MONGODB = DockerImageName.parse("mongo:7.0.2"); + public static final DockerImageName DYNAMODB = + DockerImageName.parse("amazon/dynamodb-local:2.2.1"); } \ No newline at end of file diff --git a/settings.gradle.kts b/settings.gradle.kts index 72225c6d0..ac938e048 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -52,6 +52,7 @@ dependencyResolutionManagement { version("log4j2", "2.20.0") version("mongoDB", "4.10.2") version("fabric8", "6.13.4") + version("aws", "2.29.20") library("jackson", "com.fasterxml.jackson.datatype", "jackson-datatype-jdk8").versionRef("jackson") @@ -67,6 +68,10 @@ dependencyResolutionManagement { library("mongodb-driver-core", "org.mongodb", "mongodb-driver-core").versionRef("mongoDB") library("mongodb-driver-sync", "org.mongodb", "mongodb-driver-sync").versionRef("mongoDB") + library("dynamodb-sdk", "software.amazon.awssdk", "dynamodb").versionRef("aws") + library("netty-nio-client", "software.amazon.awssdk", "netty-nio-client").versionRef("aws") + bundle("dynamo", listOf("dynamodb-sdk", "netty-nio-client")) + library("javaoperatorsdk", "io.javaoperatorsdk", "operator-framework").versionRef("javaoperatorsdk") library("grpc-netty", "io.grpc", "grpc-netty").versionRef("grpc")