Skip to content

Commit

Permalink
feat: serde for TIME type (#7664)
Browse files Browse the repository at this point in the history
* feat: add TIME and DATE types

* Add the DateType and TimeType files

* feat: serde for TIME type

* turn off ClassDataAbstractionCoupling for json deserializer

* use ints instead of long

* add check for negative and times bigger than one day

* remove unused import

* add QTT test
  • Loading branch information
Zara Lim authored Jun 22, 2021
1 parent b465236 commit 7537d87
Show file tree
Hide file tree
Showing 30 changed files with 1,504 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.storage.Converter;

Expand Down Expand Up @@ -123,7 +124,11 @@ private Object specToConnect(final Object spec, final Schema schema) {

switch (schema.type()) {
case INT32:
return Integer.valueOf(spec.toString());
final Integer intVal = Integer.valueOf(spec.toString());
if (Time.LOGICAL_NAME.equals(schema.name())) {
return new java.sql.Time(intVal);
}
return intVal;
case INT64:
final Long longVal = Long.valueOf(spec.toString());
if (Timestamp.LOGICAL_NAME.equals(schema.name())) {
Expand Down Expand Up @@ -237,6 +242,10 @@ private Object connectToSpec(
}
return data;
case INT32:
if (Time.LOGICAL_NAME.equals(schema.name())) {
return Time.fromLogical(schema, (Date) data);
}
return data;
case FLOAT32:
case FLOAT64:
case BOOLEAN:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM TEST (ID STRING KEY, TIME TIME) WITH (KAFKA_TOPIC='test', KEY_FORMAT='KAFKA', VALUE_FORMAT='AVRO');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "TEST",
"schema" : "`ID` STRING KEY, `TIME` TIME",
"topicName" : "test",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "AVRO"
}
},
"orReplace" : false
}
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM TEST2 AS SELECT *\nFROM TEST TEST\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "TEST2",
"schema" : "`ID` STRING KEY, `TIME` TIME",
"topicName" : "TEST2",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "AVRO"
}
},
"orReplace" : false
},
"queryPlan" : {
"sources" : [ "TEST" ],
"sink" : "TEST2",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "TEST2"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "test",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "AVRO"
}
},
"sourceSchema" : "`ID` STRING KEY, `TIME` TIME"
},
"keyColumnNames" : [ "ID" ],
"selectExpressions" : [ "TIME AS TIME" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "AVRO"
}
},
"topicName" : "TEST2"
},
"queryId" : "CSAS_TEST2_0"
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"metric.reporters" : "",
"ksql.transient.prefix" : "transient_",
"ksql.query.status.running.threshold.seconds" : "300",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.persistence.default.format.key" : "KAFKA",
"ksql.query.persistent.max.bytes.buffering.total" : "-1",
"ksql.queryanonymizer.logs_enabled" : "true",
"ksql.query.error.max.queue.size" : "10",
"ksql.variable.substitution.enable" : "true",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.query.pull.max.qps" : "2147483647",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.queryanonymizer.cluster_namespace" : null,
"ksql.query.pull.metrics.enabled" : "true",
"ksql.create.or.replace.enabled" : "true",
"ksql.metrics.extension" : null,
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.cast.strings.preserve.nulls" : "true",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.pull.queries.enable" : "true",
"ksql.lambdas.enabled" : "true",
"ksql.suppress.enabled" : "false",
"ksql.query.push.scalable.enabled" : "false",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.query.persistent.active.limit" : "2147483647",
"ksql.persistence.wrap.single.values" : null,
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.query.retry.backoff.initial.ms" : "15000",
"ksql.query.transient.max.bytes.buffering.total" : "-1",
"ksql.schema.registry.url" : "",
"ksql.properties.overrides.denylist" : "",
"ksql.query.pull.max.concurrent.requests" : "2147483647",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.query.pull.interpreter.enabled" : "true",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.query.pull.table.scan.enabled" : "false",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.streams.topology.optimization" : "all",
"ksql.query.retry.backoff.max.ms" : "900000",
"ksql.streams.num.stream.threads" : "4",
"ksql.timestamp.throw.on.invalid" : "false",
"ksql.metrics.tags.custom" : "",
"ksql.persistence.default.format.value" : null,
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.nested.error.set.null" : "true",
"ksql.udf.collect.metrics" : "false",
"ksql.query.pull.thread.pool.size" : "100",
"ksql.persistent.prefix" : "query_",
"ksql.metastore.backup.location" : "",
"ksql.error.classifier.regex" : "",
"ksql.suppress.buffer.size.bytes" : "-1"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
{
"version" : "7.0.0",
"timestamp" : 1624323349243,
"path" : "query-validation-tests/time.json",
"schemas" : {
"CSAS_TEST2_0.TEST2" : {
"schema" : "`ID` STRING KEY, `TIME` TIME",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "AVRO"
}
},
"CSAS_TEST2_0.KsqlTopic.Source" : {
"schema" : "`ID` STRING KEY, `TIME` TIME",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "AVRO"
}
}
},
"testCase" : {
"name" : "AVRO in/out",
"inputs" : [ {
"topic" : "test",
"key" : null,
"value" : {
"TIME" : 10
}
} ],
"outputs" : [ {
"topic" : "TEST2",
"key" : null,
"value" : {
"TIME" : 10
}
} ],
"topics" : [ {
"name" : "TEST2",
"replicas" : 1,
"numPartitions" : 4
}, {
"name" : "test",
"valueSchema" : {
"type" : "record",
"name" : "KsqlDataSourceSchema",
"namespace" : "io.confluent.ksql.avro_schemas",
"fields" : [ {
"name" : "TIME",
"type" : [ "null", {
"type" : "int",
"connect.version" : 1,
"connect.name" : "org.apache.kafka.connect.data.Time",
"logicalType" : "time-millis"
} ],
"default" : null
} ],
"connect.name" : "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema"
},
"valueFormat" : "AVRO",
"replicas" : 1,
"numPartitions" : 4
} ],
"statements" : [ "CREATE STREAM TEST (ID STRING KEY, time TIME) WITH (kafka_topic='test', value_format='AVRO');", "CREATE STREAM TEST2 AS SELECT * FROM TEST;" ],
"post" : {
"sources" : [ {
"name" : "TEST",
"type" : "STREAM",
"schema" : "`ID` STRING KEY, `TIME` TIME",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : "AVRO",
"keyFeatures" : [ ],
"valueFeatures" : [ ]
}, {
"name" : "TEST2",
"type" : "STREAM",
"schema" : "`ID` STRING KEY, `TIME` TIME",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : "AVRO",
"keyFeatures" : [ ],
"valueFeatures" : [ ]
} ],
"topics" : {
"topics" : [ {
"name" : "test",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "AVRO"
},
"partitions" : 4,
"valueSchema" : {
"type" : "record",
"name" : "KsqlDataSourceSchema",
"namespace" : "io.confluent.ksql.avro_schemas",
"fields" : [ {
"name" : "TIME",
"type" : [ "null", {
"type" : "int",
"connect.version" : 1,
"connect.name" : "org.apache.kafka.connect.data.Time",
"logicalType" : "time-millis"
} ],
"default" : null
} ],
"connect.name" : "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema"
}
}, {
"name" : "TEST2",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "AVRO"
},
"partitions" : 4,
"valueSchema" : {
"type" : "record",
"name" : "KsqlDataSourceSchema",
"namespace" : "io.confluent.ksql.avro_schemas",
"fields" : [ {
"name" : "TIME",
"type" : [ "null", {
"type" : "int",
"logicalType" : "time-millis"
} ],
"default" : null
} ]
}
} ]
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [test])
--> KSTREAM-TRANSFORMVALUES-0000000001
Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
--> Project
<-- KSTREAM-SOURCE-0000000000
Processor: Project (stores: [])
--> KSTREAM-SINK-0000000003
<-- KSTREAM-TRANSFORMVALUES-0000000001
Sink: KSTREAM-SINK-0000000003 (topic: TEST2)
<-- Project

Loading

0 comments on commit 7537d87

Please sign in to comment.