Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support timestamp protobuf serde #6927

Merged
merged 4 commits into from
Feb 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufSerializer;
import io.confluent.kafka.serializers.subject.DefaultReferenceSubjectNameStrategy;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.execution.ddl.commands.CreateSourceCommand;
import io.confluent.ksql.parser.properties.with.SourcePropertiesUtil;
Expand Down Expand Up @@ -158,7 +161,8 @@ private void registerSchemas(
config,
statementText,
registerIfSchemaExists,
getSRSubject(kafkaTopic, true)
getSRSubject(kafkaTopic, true),
true
);

registerSchema(
Expand All @@ -169,7 +173,8 @@ private void registerSchemas(
config,
statementText,
registerIfSchemaExists,
getSRSubject(kafkaTopic, false)
getSRSubject(kafkaTopic, false),
false
);
}

Expand All @@ -181,7 +186,8 @@ private void registerSchema(
final KsqlConfig config,
final String statementText,
final boolean registerIfSchemaExists,
final String subject
final String subject,
final boolean isKey
) {
final Format format = FormatFactory.of(formatInfo);
if (!format.supportsFeature(SerdeFeature.SCHEMA_INFERENCE)) {
Expand All @@ -205,8 +211,22 @@ private void registerSchema(
final ParsedSchema parsedSchema = translator.toParsedSchema(
PersistenceSchema.from(schema, serdeFeatures)
);

srClient.register(subject, parsedSchema);
if (parsedSchema instanceof ProtobufSchema) {
final ProtobufSchema resolved = AbstractKafkaProtobufSerializer.resolveDependencies(
srClient,
true,
false,
true,
null,
new DefaultReferenceSubjectNameStrategy(),
topic,
isKey,
(ProtobufSchema) parsedSchema
);
srClient.register(subject, resolved);
} else {
srClient.register(subject, parsedSchema);
}
}
} catch (IOException | RestClientException e) {
throw new KsqlStatementException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ static SchemaRegistryClient createProxy(final SchemaRegistryClient delegate) {
.forward("testCompatibility",
methodParams(String.class, ParsedSchema.class), delegate)
.swallow("deleteSubject", methodParams(String.class), Collections.emptyList())
.forward("getVersion", methodParams(String.class, ParsedSchema.class), delegate)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.engine.KsqlPlan;
Expand Down Expand Up @@ -69,6 +71,8 @@
import io.confluent.ksql.util.KsqlSchemaRegistryNotConfiguredException;
import io.confluent.ksql.util.KsqlStatementException;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
Expand Down Expand Up @@ -96,6 +100,17 @@ public class SchemaRegisterInjectorTest {
+ "\"namespace\":\"io.confluent.ksql.avro_schemas\",\"fields\":"
+ "[{\"name\":\"F1\",\"type\":[\"null\",\"string\"],\"default\":null}],"
+ "\"connect.name\":\"io.confluent.ksql.avro_schemas.KsqlDataSourceSchema\"}");
private static final ProtobufSchema TIMESTAMP_SCHEMA = new ProtobufSchema(
"syntax = \"proto3\"; package google.protobuf;"
+ "option java_package = \"com.google.protobuf\";"
+ "option java_outer_classname = \"TimestampProto\";\n"
+ "option java_multiple_files = true;"
+ "message Timestamp {int64 seconds = 1; int32 nanos = 2;}");
private static final List<SchemaReference> REFERENCE_LIST =
Arrays.asList(new SchemaReference("google/protobuf/timestamp.proto", "google/protobuf/timestamp.proto", 0));
private static final ProtobufSchema PROTOBUF_SCHEMA_WITH_REFS = new ProtobufSchema(
"syntax = \"proto3\"; import \"google/protobuf/timestamp.proto\";"
+ "message ConnectDefault1 {google.protobuf.Timestamp F1 = 1;}").copy(REFERENCE_LIST);

@Mock
private ServiceContext serviceContext;
Expand Down Expand Up @@ -360,6 +375,25 @@ public void shouldSupportPrimitiveValueSchemasInCreateAsStmts() throws Exception
verify(schemaRegistryClient).register("SINK-value", AVRO_UNWRAPPED_VALUE_SCHEMA);
}

@Test
public void shouldRegisterDependanciesForProtobuf() throws Exception {
// Given:
givenStatement("CREATE STREAM source (f1 TIMESTAMP) "
+ "WITH ("
+ " kafka_topic='expectedName', "
+ " key_format='KAFKA', "
+ " value_format='PROTOBUF', "
+ " partitions=1 "
+ ");");

// When:
injector.inject(statement);

// Then:
verify(schemaRegistryClient).register("google/protobuf/timestamp.proto", TIMESTAMP_SCHEMA);
verify(schemaRegistryClient).register("expectedName-value", PROTOBUF_SCHEMA_WITH_REFS);
}

private void givenStatement(final String sql) {
final PreparedStatement<?> preparedStatement =
parser.prepare(parser.parse(sql).get(0), metaStore);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public static Collection<TestCase<SchemaRegistryClient>> getMethodsToTest() {
.ignore("testCompatibility", String.class, ParsedSchema.class)
.ignore("deleteSubject", String.class)
.ignore("getAllSubjects")
.ignore("getVersion", String.class, ParsedSchema.class)
.build();
}

Expand Down Expand Up @@ -159,5 +160,17 @@ public void shouldSwallowRegister() throws Exception {
// Then:
verifyZeroInteractions(delegate);
}

@Test
public void shouldGetVersion() throws Exception {
// Given:
when(delegate.getVersion("some subject", schema)).thenReturn(6);

// When:
final int version = sandboxedClient.getVersion("some subject", schema);

// Then:
assertThat(version, is(6));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public static Optional<ParsedSchema> buildSchema(final JsonNode schema, final St
return Optional.of(new JsonSchema(schemaString));
} else if (format.equalsIgnoreCase(ProtobufFormat.NAME)) {
// since Protobuf schemas are not valid JSON, the schema JsonNode in
// this case is just a string
// this case is just a string.
final String schemaString = schema.textValue();
return Optional.of(new ProtobufSchema(schemaString));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM TEST (ID STRING KEY, TIME TIMESTAMP) WITH (KAFKA_TOPIC='test', KEY_FORMAT='KAFKA', VALUE_FORMAT='PROTOBUF');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "TEST",
"schema" : "`ID` STRING KEY, `TIME` TIMESTAMP",
"topicName" : "test",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "PROTOBUF"
}
},
"orReplace" : false
}
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM TEST2 AS SELECT *\nFROM TEST TEST\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "TEST2",
"schema" : "`ID` STRING KEY, `TIME` TIMESTAMP",
"topicName" : "TEST2",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "PROTOBUF"
}
},
"orReplace" : false
},
"queryPlan" : {
"sources" : [ "TEST" ],
"sink" : "TEST2",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "TEST2"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "test",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "PROTOBUF"
}
},
"sourceSchema" : "`ID` STRING KEY, `TIME` TIMESTAMP"
},
"keyColumnNames" : [ "ID" ],
"selectExpressions" : [ "TIME AS TIME" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "PROTOBUF"
}
},
"topicName" : "TEST2"
},
"queryId" : "CSAS_TEST2_0"
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"metric.reporters" : "",
"ksql.transient.prefix" : "transient_",
"ksql.query.status.running.threshold.seconds" : "300",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.persistence.default.format.key" : "KAFKA",
"ksql.query.persistent.max.bytes.buffering.total" : "-1",
"ksql.query.error.max.queue.size" : "10",
"ksql.variable.substitution.enable" : "true",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.query.pull.max.qps" : "2147483647",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.query.pull.metrics.enabled" : "false",
"ksql.create.or.replace.enabled" : "true",
"ksql.metrics.extension" : null,
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.cast.strings.preserve.nulls" : "true",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.pull.queries.enable" : "true",
"ksql.suppress.enabled" : "false",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.query.persistent.active.limit" : "2147483647",
"ksql.persistence.wrap.single.values" : null,
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.query.retry.backoff.initial.ms" : "15000",
"ksql.query.transient.max.bytes.buffering.total" : "-1",
"ksql.schema.registry.url" : "",
"ksql.properties.overrides.denylist" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.query.pull.table.scan.enabled" : "false",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.streams.topology.optimization" : "all",
"ksql.query.retry.backoff.max.ms" : "900000",
"ksql.streams.num.stream.threads" : "4",
"ksql.timestamp.throw.on.invalid" : "false",
"ksql.metrics.tags.custom" : "",
"ksql.persistence.default.format.value" : null,
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.udf.collect.metrics" : "false",
"ksql.query.pull.thread.pool.size" : "100",
"ksql.persistent.prefix" : "query_",
"ksql.metastore.backup.location" : "",
"ksql.error.classifier.regex" : "",
"ksql.suppress.buffer.size.bytes" : "-1"
}
}
Loading