diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/timestamp/MetadataTimestampExtractionPolicy.java b/ksql-common/src/main/java/io/confluent/ksql/util/timestamp/MetadataTimestampExtractionPolicy.java index d966da17bdab..d6f07c0ade69 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/util/timestamp/MetadataTimestampExtractionPolicy.java +++ b/ksql-common/src/main/java/io/confluent/ksql/util/timestamp/MetadataTimestampExtractionPolicy.java @@ -20,27 +20,38 @@ import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; import org.apache.kafka.streams.processor.TimestampExtractor; +import java.util.Objects; + @Immutable public class MetadataTimestampExtractionPolicy implements TimestampExtractionPolicy { + private final TimestampExtractor timestampExtractor; @JsonCreator - public MetadataTimestampExtractionPolicy(){} + public MetadataTimestampExtractionPolicy() { + this(new FailOnInvalidTimestamp()); + } + + public MetadataTimestampExtractionPolicy(final TimestampExtractor timestampExtractor) { + this.timestampExtractor = timestampExtractor; + } @Override public TimestampExtractor create(final int columnIndex) { - return new FailOnInvalidTimestamp(); + return timestampExtractor; } @Override public int hashCode() { - return this.getClass().hashCode(); + return Objects.hash(this.getClass(), timestampExtractor.getClass()); } @Override public boolean equals(final Object other) { - if (this == other) { - return true; + if (!(other instanceof MetadataTimestampExtractionPolicy)) { + return false; } - return other instanceof MetadataTimestampExtractionPolicy; + + final MetadataTimestampExtractionPolicy that = (MetadataTimestampExtractionPolicy)other; + return timestampExtractor.getClass() == that.timestampExtractor.getClass(); } } diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/timestamp/TimestampExtractionPolicyFactory.java b/ksql-common/src/main/java/io/confluent/ksql/util/timestamp/TimestampExtractionPolicyFactory.java index 509844fc7133..5b598c04e85f 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/util/timestamp/TimestampExtractionPolicyFactory.java +++ b/ksql-common/src/main/java/io/confluent/ksql/util/timestamp/TimestampExtractionPolicyFactory.java @@ -21,6 +21,8 @@ import io.confluent.ksql.schema.ksql.SqlBaseType; import io.confluent.ksql.util.KsqlException; import java.util.Optional; +import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; +import org.apache.kafka.streams.processor.TimestampExtractor; public final class TimestampExtractionPolicyFactory { @@ -31,9 +33,18 @@ public static TimestampExtractionPolicy create( final LogicalSchema schema, final Optional timestampColumnName, final Optional timestampFormat + ) { + return create(schema, timestampColumnName, timestampFormat, new FailOnInvalidTimestamp()); + } + + public static TimestampExtractionPolicy create( + final LogicalSchema schema, + final Optional timestampColumnName, + final Optional timestampFormat, + final TimestampExtractor defaultTimestampExtractor ) { if (!timestampColumnName.isPresent()) { - return new MetadataTimestampExtractionPolicy(); + return new MetadataTimestampExtractionPolicy(defaultTimestampExtractor); } final String fieldName = timestampColumnName.get().toUpperCase(); diff --git a/ksql-common/src/test/java/io/confluent/ksql/util/timestamp/MetadataTimestampExtractionPolicyTest.java b/ksql-common/src/test/java/io/confluent/ksql/util/timestamp/MetadataTimestampExtractionPolicyTest.java index 9befcf39b569..4f5e8252edd1 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/util/timestamp/MetadataTimestampExtractionPolicyTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/util/timestamp/MetadataTimestampExtractionPolicyTest.java @@ -16,6 +16,7 @@ package io.confluent.ksql.util.timestamp; import com.google.common.testing.EqualsTester; +import org.apache.kafka.streams.processor.UsePreviousTimeOnInvalidTimestamp; import org.junit.Test; public class MetadataTimestampExtractionPolicyTest { @@ -25,6 +26,10 @@ public void shouldTestEqualityCorrectly() { .addEqualityGroup( new MetadataTimestampExtractionPolicy(), new MetadataTimestampExtractionPolicy()) + .addEqualityGroup( + new MetadataTimestampExtractionPolicy(new UsePreviousTimeOnInvalidTimestamp()), + new MetadataTimestampExtractionPolicy(new UsePreviousTimeOnInvalidTimestamp()) + ) .testEquals(); } } diff --git a/ksql-common/src/test/java/io/confluent/ksql/util/timestamp/TimestampExtractionPolicyFactoryTest.java b/ksql-common/src/test/java/io/confluent/ksql/util/timestamp/TimestampExtractionPolicyFactoryTest.java index 365d1f223914..51b8a9251877 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/util/timestamp/TimestampExtractionPolicyFactoryTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/util/timestamp/TimestampExtractionPolicyFactoryTest.java @@ -24,6 +24,8 @@ import java.util.Optional; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; +import org.apache.kafka.streams.processor.UsePreviousTimeOnInvalidTimestamp; import org.junit.Test; public class TimestampExtractionPolicyFactoryTest { @@ -40,6 +42,23 @@ public void shouldCreateMetadataPolicyWhenTimestampFieldNotProvided() { // Then: assertThat(result, instanceOf(MetadataTimestampExtractionPolicy.class)); + assertThat(result.create(0), instanceOf(FailOnInvalidTimestamp.class)); + } + + @Test + public void shouldCreateMetadataPolicyWithCustomExtractorWhenTimestampFileNotProvided() { + // When: + final TimestampExtractionPolicy result = TimestampExtractionPolicyFactory + .create( + LogicalSchema.of(schemaBuilder.build()), + Optional.empty(), + Optional.empty(), + new UsePreviousTimeOnInvalidTimestamp() + ); + + // Then: + assertThat(result, instanceOf(MetadataTimestampExtractionPolicy.class)); + assertThat(result.create(0), instanceOf(UsePreviousTimeOnInvalidTimestamp.class)); } @Test diff --git a/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateSourceCommand.java b/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateSourceCommand.java index 07b8817e3d8c..e44ea69a950a 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateSourceCommand.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateSourceCommand.java @@ -40,6 +40,9 @@ import io.confluent.ksql.util.timestamp.TimestampExtractionPolicyFactory; import java.util.Optional; import java.util.Set; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; +import org.apache.kafka.streams.processor.TimestampExtractor; /** * Base class of create table/stream command @@ -99,7 +102,11 @@ abstract class CreateSourceCommand implements DdlCommand { this.keyField = KeyField.none(); } - this.timestampExtractionPolicy = buildTimestampExtractor(statement.getProperties(), schema); + this.timestampExtractionPolicy = buildTimestampExtractor( + getDefaultTimestampExtractor(ksqlConfig), + statement.getProperties(), + schema + ); this.serdeOptions = serdeOptionsSupplier.build( schema, @@ -125,6 +132,20 @@ KsqlTopic getTopic() { return topic; } + private TimestampExtractor getDefaultTimestampExtractor(final KsqlConfig ksqlConfig) { + try { + final Class defaultTimestampExtractor = (Class) ksqlConfig + .getKsqlStreamConfigProps().get( + StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG + ); + + return (TimestampExtractor) defaultTimestampExtractor.newInstance(); + } catch (final Exception e) { + // TODO: Need to log invalid timestamp configuration here? + return new FailOnInvalidTimestamp(); + } + } + private static LogicalSchema buildSchema(final TableElements tableElements) { if (Iterables.isEmpty(tableElements)) { throw new KsqlException("The statement does not define any columns."); @@ -146,13 +167,14 @@ private static KsqlTopic buildTopic( } private static TimestampExtractionPolicy buildTimestampExtractor( + final TimestampExtractor defaultTimestampExtractor, final CreateSourceProperties properties, final LogicalSchema schema ) { final Optional timestampName = properties.getTimestampColumnName(); final Optional timestampFormat = properties.getTimestampFormat(); return TimestampExtractionPolicyFactory - .create(schema, timestampName, timestampFormat); + .create(schema, timestampName, timestampFormat, defaultTimestampExtractor); } private static void validateSerdeCanHandleSchemas( diff --git a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateSourceCommandTest.java b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateSourceCommandTest.java index ad953dc696da..a3e30495413f 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateSourceCommandTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateSourceCommandTest.java @@ -21,6 +21,7 @@ import static io.confluent.ksql.serde.Format.JSON; import static io.confluent.ksql.serde.Format.KAFKA; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.verify; @@ -64,6 +65,9 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; +import org.apache.kafka.streams.processor.UsePreviousTimeOnInvalidTimestamp; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -104,6 +108,7 @@ public class CreateSourceCommandTest { @Mock private Serde serde; + @Mock private KsqlConfig ksqlConfig; @@ -114,8 +119,6 @@ public void setUp() { givenPropertiesWith(ImmutableMap.of()); when(kafkaTopicClient.isTopicExists(any())).thenReturn(true); - ksqlConfig = new KsqlConfig(ImmutableMap.of()); - when(serdeFactories.create(any(), any(), any(), any(), any(), any())).thenReturn(serde); when(serviceContext.getTopicClient()).thenReturn(kafkaTopicClient); @@ -230,6 +233,51 @@ public void shouldBuildSerdeOptions() { assertThat(cmd.getSerdeOptions(), is(SOME_SERDE_OPTIONS)); } + @Test + public void shouldCreateSourceWithDefaultFailOnInvalidTimestamp() { + // When: + final TestCmd cmd = createCmd(); + + // Then: + assertThat(cmd.timestampExtractionPolicy.create(0), instanceOf(FailOnInvalidTimestamp.class)); + } + + @Test + public void shouldCreateSourceWithOverrideTimestampFromConfig() { + // Given: + when(ksqlConfig.getKsqlStreamConfigProps()).thenReturn( + ImmutableMap.of(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, + UsePreviousTimeOnInvalidTimestamp.class) + ); + + // When: + final TestCmd cmd = createCmd(); + + // Then: + assertThat( + cmd.timestampExtractionPolicy.create(0), + instanceOf(UsePreviousTimeOnInvalidTimestamp.class) + ); + } + + @Test + public void shouldCreateSourceWithFailedOnInvalidTimestampIfConfigTimestampIsInvalid() { + // Given: + when(ksqlConfig.getKsqlStreamConfigProps()).thenReturn( + ImmutableMap.of(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, + "No timestamp class!") + ); + + // When: + final TestCmd cmd = createCmd(); + + // Then: + assertThat( + cmd.timestampExtractionPolicy.create(0), + instanceOf(FailOnInvalidTimestamp.class) + ); + } + @Test public void shouldBuildSchemaWithImplicitKeyField() { // Given: