diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/schema/ksql/inference/SchemaRegisterInjector.java b/ksqldb-engine/src/main/java/io/confluent/ksql/schema/ksql/inference/SchemaRegisterInjector.java index 546b2758befa..667beb1f281a 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/schema/ksql/inference/SchemaRegisterInjector.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/schema/ksql/inference/SchemaRegisterInjector.java @@ -20,6 +20,9 @@ import io.confluent.kafka.schemaregistry.ParsedSchema; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; +import io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufSerializer; +import io.confluent.kafka.serializers.subject.DefaultReferenceSubjectNameStrategy; import io.confluent.ksql.KsqlExecutionContext; import io.confluent.ksql.execution.ddl.commands.CreateSourceCommand; import io.confluent.ksql.parser.properties.with.SourcePropertiesUtil; @@ -158,7 +161,8 @@ private void registerSchemas( config, statementText, registerIfSchemaExists, - getSRSubject(kafkaTopic, true) + getSRSubject(kafkaTopic, true), + true ); registerSchema( @@ -169,7 +173,8 @@ private void registerSchemas( config, statementText, registerIfSchemaExists, - getSRSubject(kafkaTopic, false) + getSRSubject(kafkaTopic, false), + false ); } @@ -181,7 +186,8 @@ private void registerSchema( final KsqlConfig config, final String statementText, final boolean registerIfSchemaExists, - final String subject + final String subject, + final boolean isKey ) { final Format format = FormatFactory.of(formatInfo); if (!format.supportsFeature(SerdeFeature.SCHEMA_INFERENCE)) { @@ -205,8 +211,22 @@ private void registerSchema( final ParsedSchema parsedSchema = translator.toParsedSchema( PersistenceSchema.from(schema, serdeFeatures) ); - - srClient.register(subject, parsedSchema); + if (parsedSchema instanceof ProtobufSchema) { + final ProtobufSchema resolved = AbstractKafkaProtobufSerializer.resolveDependencies( + srClient, + true, + false, + true, + null, + new DefaultReferenceSubjectNameStrategy(), + topic, + isKey, + (ProtobufSchema) parsedSchema + ); + srClient.register(subject, resolved); + } else { + srClient.register(subject, parsedSchema); + } } } catch (IOException | RestClientException e) { throw new KsqlStatementException( diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/services/SandboxedSchemaRegistryClient.java b/ksqldb-engine/src/main/java/io/confluent/ksql/services/SandboxedSchemaRegistryClient.java index 494fa661b2a3..9a9ae17434c6 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/services/SandboxedSchemaRegistryClient.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/services/SandboxedSchemaRegistryClient.java @@ -45,6 +45,7 @@ static SchemaRegistryClient createProxy(final SchemaRegistryClient delegate) { .forward("testCompatibility", methodParams(String.class, ParsedSchema.class), delegate) .swallow("deleteSubject", methodParams(String.class), Collections.emptyList()) + .forward("getVersion", methodParams(String.class, ParsedSchema.class), delegate) .build(); } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/SchemaRegisterInjectorTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/SchemaRegisterInjectorTest.java index b0a532d40903..9745a52f3b5e 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/SchemaRegisterInjectorTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/SchemaRegisterInjectorTest.java @@ -35,7 +35,9 @@ import io.confluent.kafka.schemaregistry.avro.AvroSchema; import io.confluent.kafka.schemaregistry.client.SchemaMetadata; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; import io.confluent.ksql.KsqlExecutionContext; import io.confluent.ksql.config.SessionConfig; import io.confluent.ksql.engine.KsqlPlan; @@ -69,6 +71,8 @@ import io.confluent.ksql.util.KsqlSchemaRegistryNotConfiguredException; import io.confluent.ksql.util.KsqlStatementException; import java.io.IOException; +import java.util.Arrays; +import java.util.List; import java.util.Optional; import org.apache.avro.Schema; import org.apache.avro.Schema.Type; @@ -96,6 +100,17 @@ public class SchemaRegisterInjectorTest { + "\"namespace\":\"io.confluent.ksql.avro_schemas\",\"fields\":" + "[{\"name\":\"F1\",\"type\":[\"null\",\"string\"],\"default\":null}]," + "\"connect.name\":\"io.confluent.ksql.avro_schemas.KsqlDataSourceSchema\"}"); + private static final ProtobufSchema TIMESTAMP_SCHEMA = new ProtobufSchema( + "syntax = \"proto3\"; package google.protobuf;" + + "option java_package = \"com.google.protobuf\";" + + "option java_outer_classname = \"TimestampProto\";\n" + + "option java_multiple_files = true;" + + "message Timestamp {int64 seconds = 1; int32 nanos = 2;}"); + private static final List REFERENCE_LIST = + Arrays.asList(new SchemaReference("google/protobuf/timestamp.proto", "google/protobuf/timestamp.proto", 0)); + private static final ProtobufSchema PROTOBUF_SCHEMA_WITH_REFS = new ProtobufSchema( + "syntax = \"proto3\"; import \"google/protobuf/timestamp.proto\";" + + "message ConnectDefault1 {google.protobuf.Timestamp F1 = 1;}").copy(REFERENCE_LIST); @Mock private ServiceContext serviceContext; @@ -360,6 +375,25 @@ public void shouldSupportPrimitiveValueSchemasInCreateAsStmts() throws Exception verify(schemaRegistryClient).register("SINK-value", AVRO_UNWRAPPED_VALUE_SCHEMA); } + @Test + public void shouldRegisterDependanciesForProtobuf() throws Exception { + // Given: + givenStatement("CREATE STREAM source (f1 TIMESTAMP) " + + "WITH (" + + " kafka_topic='expectedName', " + + " key_format='KAFKA', " + + " value_format='PROTOBUF', " + + " partitions=1 " + + ");"); + + // When: + injector.inject(statement); + + // Then: + verify(schemaRegistryClient).register("google/protobuf/timestamp.proto", TIMESTAMP_SCHEMA); + verify(schemaRegistryClient).register("expectedName-value", PROTOBUF_SCHEMA_WITH_REFS); + } + private void givenStatement(final String sql) { final PreparedStatement preparedStatement = parser.prepare(parser.parse(sql).get(0), metaStore); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/services/SandboxedSchemaRegistryClientTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/services/SandboxedSchemaRegistryClientTest.java index 0194947bc98e..0aa81fd9fd2d 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/services/SandboxedSchemaRegistryClientTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/services/SandboxedSchemaRegistryClientTest.java @@ -63,6 +63,7 @@ public static Collection> getMethodsToTest() { .ignore("testCompatibility", String.class, ParsedSchema.class) .ignore("deleteSubject", String.class) .ignore("getAllSubjects") + .ignore("getVersion", String.class, ParsedSchema.class) .build(); } @@ -159,5 +160,17 @@ public void shouldSwallowRegister() throws Exception { // Then: verifyZeroInteractions(delegate); } + + @Test + public void shouldGetVersion() throws Exception { + // Given: + when(delegate.getVersion("some subject", schema)).thenReturn(6); + + // When: + final int version = sandboxedClient.getVersion("some subject", schema); + + // Then: + assertThat(version, is(6)); + } } } \ No newline at end of file diff --git a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/utils/SerdeUtil.java b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/utils/SerdeUtil.java index bf8a28004fef..4f907074fcd1 100644 --- a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/utils/SerdeUtil.java +++ b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/utils/SerdeUtil.java @@ -98,7 +98,7 @@ public static Optional buildSchema(final JsonNode schema, final St return Optional.of(new JsonSchema(schemaString)); } else if (format.equalsIgnoreCase(ProtobufFormat.NAME)) { // since Protobuf schemas are not valid JSON, the schema JsonNode in - // this case is just a string + // this case is just a string. final String schemaString = schema.textValue(); return Optional.of(new ProtobufSchema(schemaString)); } diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/timestamp_-_PROTOBUF_in_out/6.2.0_1612233798997/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/timestamp_-_PROTOBUF_in_out/6.2.0_1612233798997/plan.json new file mode 100644 index 000000000000..dce10b087d28 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/timestamp_-_PROTOBUF_in_out/6.2.0_1612233798997/plan.json @@ -0,0 +1,145 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (ID STRING KEY, TIME TIMESTAMP) WITH (KAFKA_TOPIC='test', KEY_FORMAT='KAFKA', VALUE_FORMAT='PROTOBUF');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`ID` STRING KEY, `TIME` TIMESTAMP", + "topicName" : "test", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "PROTOBUF" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST2 AS SELECT *\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST2", + "schema" : "`ID` STRING KEY, `TIME` TIMESTAMP", + "topicName" : "TEST2", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "PROTOBUF" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "TEST2", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "TEST2" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "PROTOBUF" + } + }, + "sourceSchema" : "`ID` STRING KEY, `TIME` TIMESTAMP" + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "TIME AS TIME" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "PROTOBUF" + } + }, + "topicName" : "TEST2" + }, + "queryId" : "CSAS_TEST2_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/timestamp_-_PROTOBUF_in_out/6.2.0_1612233798997/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/timestamp_-_PROTOBUF_in_out/6.2.0_1612233798997/spec.json new file mode 100644 index 000000000000..5f43a9179d91 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/timestamp_-_PROTOBUF_in_out/6.2.0_1612233798997/spec.json @@ -0,0 +1,100 @@ +{ + "version" : "6.2.0", + "timestamp" : 1612233798997, + "path" : "query-validation-tests/timestamp.json", + "schemas" : { + "CSAS_TEST2_0.TEST2" : { + "schema" : "`ID` STRING KEY, `TIME` TIMESTAMP", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "PROTOBUF" + } + }, + "CSAS_TEST2_0.KsqlTopic.Source" : { + "schema" : "`ID` STRING KEY, `TIME` TIMESTAMP", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "PROTOBUF" + } + } + }, + "testCase" : { + "name" : "PROTOBUF in/out", + "inputs" : [ { + "topic" : "test", + "key" : null, + "value" : { + "time" : 10 + } + } ], + "outputs" : [ { + "topic" : "TEST2", + "key" : null, + "value" : { + "TIME" : 10 + } + } ], + "topics" : [ { + "name" : "TEST2", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test", + "valueSchema" : "syntax = \"proto3\";\n\nimport \"google/protobuf/timestamp.proto\";\n\nmessage ConnectDefault1 {\n google.protobuf.Timestamp TIME = 1;\n}\n", + "valueFormat" : "PROTOBUF", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM TEST (ID STRING KEY, time TIMESTAMP) WITH (kafka_topic='test', value_format='PROTOBUF');", "CREATE STREAM TEST2 AS SELECT * FROM TEST;" ], + "post" : { + "sources" : [ { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`ID` STRING KEY, `TIME` TIMESTAMP", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "PROTOBUF", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "TEST2", + "type" : "STREAM", + "schema" : "`ID` STRING KEY, `TIME` TIMESTAMP", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "PROTOBUF", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "PROTOBUF" + }, + "partitions" : 4, + "valueSchema" : "syntax = \"proto3\";\n\nimport \"google/protobuf/timestamp.proto\";\n\nmessage ConnectDefault1 {\n google.protobuf.Timestamp TIME = 1;\n}\n" + }, { + "name" : "TEST2", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "PROTOBUF" + }, + "partitions" : 4, + "valueSchema" : "syntax = \"proto3\";\n\nimport \"google/protobuf/timestamp.proto\";\n\nmessage ConnectDefault1 {\n google.protobuf.Timestamp TIME = 1;\n}\n" + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/timestamp_-_PROTOBUF_in_out/6.2.0_1612233798997/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/timestamp_-_PROTOBUF_in_out/6.2.0_1612233798997/topology new file mode 100644 index 000000000000..e080cd675668 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/timestamp_-_PROTOBUF_in_out/6.2.0_1612233798997/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: TEST2) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/timestamp.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/timestamp.json index 958cba55b6c5..6aebe6e57345 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/timestamp.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/timestamp.json @@ -40,6 +40,19 @@ {"topic": "TEST2", "value": {"TIME": 10}} ] }, + { + "name": "PROTOBUF in/out", + "statements": [ + "CREATE STREAM TEST (ID STRING KEY, time TIMESTAMP) WITH (kafka_topic='test', value_format='PROTOBUF');", + "CREATE STREAM TEST2 AS SELECT * FROM TEST;" + ], + "inputs": [ + {"topic": "test", "value": {"time": 10}} + ], + "outputs": [ + {"topic": "TEST2", "value": {"TIME": 10}} + ] + }, { "name": "equal - timestamp timestamp", "statements": [ diff --git a/ksqldb-serde/src/test/java/io/confluent/ksql/serde/protobuf/KsqlProtobufDeserializerTest.java b/ksqldb-serde/src/test/java/io/confluent/ksql/serde/protobuf/KsqlProtobufDeserializerTest.java index 1c51f8fd916b..b4b2e531af04 100644 --- a/ksqldb-serde/src/test/java/io/confluent/ksql/serde/protobuf/KsqlProtobufDeserializerTest.java +++ b/ksqldb-serde/src/test/java/io/confluent/ksql/serde/protobuf/KsqlProtobufDeserializerTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.data.Timestamp; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -33,7 +34,6 @@ import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; -import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer; import io.confluent.ksql.util.DecimalUtil; import io.confluent.ksql.util.KsqlConfig; @@ -85,6 +85,26 @@ public void shouldDeserializeDecimalField() { assertThat(result, is(value)); } + @Test + public void shouldDeserializeTimestampField() { + final ConnectSchema schema = (ConnectSchema) SchemaBuilder.struct() + .field("f0", Timestamp.SCHEMA) + .build(); + + // Given: + final Deserializer deserializer = + givenDeserializerForSchema(schema, + Struct.class); + final Struct value = new Struct(schema).put("f0", new java.sql.Timestamp(2000)); + final byte[] bytes = givenConnectSerialized(value, schema); + + // When: + final Object result = deserializer.deserialize(SOME_TOPIC, bytes); + + // Then: + assertThat(result, is(value)); + } + private byte[] givenConnectSerialized( final Object value, final Schema connectSchema diff --git a/ksqldb-serde/src/test/java/io/confluent/ksql/serde/protobuf/KsqlProtobufSerializerTest.java b/ksqldb-serde/src/test/java/io/confluent/ksql/serde/protobuf/KsqlProtobufSerializerTest.java index 45374dcca681..3337d09347c5 100644 --- a/ksqldb-serde/src/test/java/io/confluent/ksql/serde/protobuf/KsqlProtobufSerializerTest.java +++ b/ksqldb-serde/src/test/java/io/confluent/ksql/serde/protobuf/KsqlProtobufSerializerTest.java @@ -18,6 +18,7 @@ import com.google.common.collect.ImmutableMap; import com.google.protobuf.Descriptors; import com.google.protobuf.Message; +import com.google.protobuf.Timestamp; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.connect.data.ConnectSchema; @@ -63,6 +64,13 @@ public class KsqlProtobufSerializerTest { " { key: \"scale\", value: \"2\" }\n" + " ]}];\n" + "}\n"); + private static final ParsedSchema TIMESTAMP_SCHEMA = + parseProtobufSchema( + "syntax = \"proto3\";\n" + + "\n" + + "import \"google/protobuf/timestamp.proto\";\n" + + "\n" + + "message ConnectDefault1 {google.protobuf.Timestamp F1 = 1;}\n"); private static final String SOME_TOPIC = "bob"; @@ -94,6 +102,16 @@ public void shouldSerializeDecimalField() { ); } + @Test + public void shouldSerializeTimestampField() { + shouldSerializeFieldTypeCorrectly( + org.apache.kafka.connect.data.Timestamp.SCHEMA, + new java.sql.Timestamp(2000), + TIMESTAMP_SCHEMA, + Timestamp.newBuilder().setSeconds(2).setNanos(0).build() + ); + } + @SuppressWarnings("unchecked") private T deserialize(final byte[] serializedRow) { @@ -123,8 +141,7 @@ private void shouldSerializeFieldTypeCorrectly( final Message record = deserialize(bytes); assertThat(record.getAllFields().size(), equalTo(1)); Descriptors.FieldDescriptor field = record.getDescriptorForType().findFieldByName("field0"); - assertThat(DecimalUtils.toBigDecimal((Message) record.getField(field)), - equalTo(DecimalUtils.toBigDecimal(protobufValue))); + assertThat(record.getField(field).toString(), equalTo(protobufValue.toString())); } private Serializer givenSerializerForSchema(