Skip to content

Commit

Permalink
fix: default timestamp extractor override is not working
Browse files Browse the repository at this point in the history
The use of ksql.streams.default.timestamp.extractor when creating
a stream/table is not working. The new value is persisted in the
command topic, but KSQL always use a default FailOnInvalidTimestamp

This patch fixes KSQL so it honors the new default specified.
  • Loading branch information
spena committed Aug 6, 2019
1 parent d5d2791 commit 7970a25
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,38 @@
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.TimestampExtractor;

import java.util.Objects;

@Immutable
public class MetadataTimestampExtractionPolicy implements TimestampExtractionPolicy {
private final TimestampExtractor timestampExtractor;

@JsonCreator
public MetadataTimestampExtractionPolicy(){}
public MetadataTimestampExtractionPolicy() {
this(new FailOnInvalidTimestamp());
}

public MetadataTimestampExtractionPolicy(final TimestampExtractor timestampExtractor) {
this.timestampExtractor = timestampExtractor;
}

@Override
public TimestampExtractor create(final int columnIndex) {
return new FailOnInvalidTimestamp();
return timestampExtractor;
}

@Override
public int hashCode() {
return this.getClass().hashCode();
return Objects.hash(this.getClass(), timestampExtractor.getClass());
}

@Override
public boolean equals(final Object other) {
if (this == other) {
return true;
if (!(other instanceof MetadataTimestampExtractionPolicy)) {
return false;
}
return other instanceof MetadataTimestampExtractionPolicy;

final MetadataTimestampExtractionPolicy that = (MetadataTimestampExtractionPolicy)other;
return timestampExtractor.getClass() == that.timestampExtractor.getClass();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import io.confluent.ksql.schema.ksql.SqlBaseType;
import io.confluent.ksql.util.KsqlException;
import java.util.Optional;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.TimestampExtractor;

public final class TimestampExtractionPolicyFactory {

Expand All @@ -31,9 +33,18 @@ public static TimestampExtractionPolicy create(
final LogicalSchema schema,
final Optional<String> timestampColumnName,
final Optional<String> timestampFormat
) {
return create(schema, timestampColumnName, timestampFormat, new FailOnInvalidTimestamp());
}

public static TimestampExtractionPolicy create(
final LogicalSchema schema,
final Optional<String> timestampColumnName,
final Optional<String> timestampFormat,
final TimestampExtractor defaultTimestampExtractor
) {
if (!timestampColumnName.isPresent()) {
return new MetadataTimestampExtractionPolicy();
return new MetadataTimestampExtractionPolicy(defaultTimestampExtractor);
}

final String fieldName = timestampColumnName.get().toUpperCase();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.util.timestamp;

import com.google.common.testing.EqualsTester;
import org.apache.kafka.streams.processor.UsePreviousTimeOnInvalidTimestamp;
import org.junit.Test;

public class MetadataTimestampExtractionPolicyTest {
Expand All @@ -25,6 +26,10 @@ public void shouldTestEqualityCorrectly() {
.addEqualityGroup(
new MetadataTimestampExtractionPolicy(),
new MetadataTimestampExtractionPolicy())
.addEqualityGroup(
new MetadataTimestampExtractionPolicy(new UsePreviousTimeOnInvalidTimestamp()),
new MetadataTimestampExtractionPolicy(new UsePreviousTimeOnInvalidTimestamp())
)
.testEquals();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.Optional;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.UsePreviousTimeOnInvalidTimestamp;
import org.junit.Test;

public class TimestampExtractionPolicyFactoryTest {
Expand All @@ -40,6 +42,23 @@ public void shouldCreateMetadataPolicyWhenTimestampFieldNotProvided() {

// Then:
assertThat(result, instanceOf(MetadataTimestampExtractionPolicy.class));
assertThat(result.create(0), instanceOf(FailOnInvalidTimestamp.class));
}

@Test
public void shouldCreateMetadataPolicyWithCustomExtractorWhenTimestampFileNotProvided() {
// When:
final TimestampExtractionPolicy result = TimestampExtractionPolicyFactory
.create(
LogicalSchema.of(schemaBuilder.build()),
Optional.empty(),
Optional.empty(),
new UsePreviousTimeOnInvalidTimestamp()
);

// Then:
assertThat(result, instanceOf(MetadataTimestampExtractionPolicy.class));
assertThat(result.create(0), instanceOf(UsePreviousTimeOnInvalidTimestamp.class));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
import io.confluent.ksql.util.timestamp.TimestampExtractionPolicyFactory;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.TimestampExtractor;

/**
* Base class of create table/stream command
Expand Down Expand Up @@ -99,7 +102,11 @@ abstract class CreateSourceCommand implements DdlCommand {
this.keyField = KeyField.none();
}

this.timestampExtractionPolicy = buildTimestampExtractor(statement.getProperties(), schema);
this.timestampExtractionPolicy = buildTimestampExtractor(
getDefaultTimestampExtractor(ksqlConfig),
statement.getProperties(),
schema
);

this.serdeOptions = serdeOptionsSupplier.build(
schema,
Expand All @@ -125,6 +132,20 @@ KsqlTopic getTopic() {
return topic;
}

private TimestampExtractor getDefaultTimestampExtractor(final KsqlConfig ksqlConfig) {
try {
final Class defaultTimestampExtractor = (Class) ksqlConfig
.getKsqlStreamConfigProps().get(
StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG
);

return (TimestampExtractor) defaultTimestampExtractor.newInstance();
} catch (final Exception e) {
// TODO: Need to log invalid timestamp configuration here?
return new FailOnInvalidTimestamp();
}
}

private static LogicalSchema buildSchema(final TableElements tableElements) {
if (Iterables.isEmpty(tableElements)) {
throw new KsqlException("The statement does not define any columns.");
Expand All @@ -146,13 +167,14 @@ private static KsqlTopic buildTopic(
}

private static TimestampExtractionPolicy buildTimestampExtractor(
final TimestampExtractor defaultTimestampExtractor,
final CreateSourceProperties properties,
final LogicalSchema schema
) {
final Optional<String> timestampName = properties.getTimestampColumnName();
final Optional<String> timestampFormat = properties.getTimestampFormat();
return TimestampExtractionPolicyFactory
.create(schema, timestampName, timestampFormat);
.create(schema, timestampName, timestampFormat, defaultTimestampExtractor);
}

private static void validateSerdeCanHandleSchemas(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static io.confluent.ksql.serde.Format.JSON;
import static io.confluent.ksql.serde.Format.KAFKA;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -64,6 +65,9 @@
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.UsePreviousTimeOnInvalidTimestamp;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -104,6 +108,7 @@ public class CreateSourceCommandTest {
@Mock
private Serde<GenericRow> serde;

@Mock
private KsqlConfig ksqlConfig;


Expand All @@ -114,8 +119,6 @@ public void setUp() {
givenPropertiesWith(ImmutableMap.of());
when(kafkaTopicClient.isTopicExists(any())).thenReturn(true);

ksqlConfig = new KsqlConfig(ImmutableMap.of());

when(serdeFactories.create(any(), any(), any(), any(), any(), any())).thenReturn(serde);

when(serviceContext.getTopicClient()).thenReturn(kafkaTopicClient);
Expand Down Expand Up @@ -230,6 +233,51 @@ public void shouldBuildSerdeOptions() {
assertThat(cmd.getSerdeOptions(), is(SOME_SERDE_OPTIONS));
}

@Test
public void shouldCreateSourceWithDefaultFailOnInvalidTimestamp() {
// When:
final TestCmd cmd = createCmd();

// Then:
assertThat(cmd.timestampExtractionPolicy.create(0), instanceOf(FailOnInvalidTimestamp.class));
}

@Test
public void shouldCreateSourceWithOverrideTimestampFromConfig() {
// Given:
when(ksqlConfig.getKsqlStreamConfigProps()).thenReturn(
ImmutableMap.of(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
UsePreviousTimeOnInvalidTimestamp.class)
);

// When:
final TestCmd cmd = createCmd();

// Then:
assertThat(
cmd.timestampExtractionPolicy.create(0),
instanceOf(UsePreviousTimeOnInvalidTimestamp.class)
);
}

@Test
public void shouldCreateSourceWithFailedOnInvalidTimestampIfConfigTimestampIsInvalid() {
// Given:
when(ksqlConfig.getKsqlStreamConfigProps()).thenReturn(
ImmutableMap.of(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
"No timestamp class!")
);

// When:
final TestCmd cmd = createCmd();

// Then:
assertThat(
cmd.timestampExtractionPolicy.create(0),
instanceOf(FailOnInvalidTimestamp.class)
);
}

@Test
public void shouldBuildSchemaWithImplicitKeyField() {
// Given:
Expand Down

0 comments on commit 7970a25

Please sign in to comment.