Skip to content

Commit

Permalink
refactor: simplify how code builds value Serde (#3148)
Browse files Browse the repository at this point in the history
Simplifies how client code builds `Serde<GenericRow>`.

Previously, client code has to build a `KsqlSerdeFactory` and then pass
this to `GenericRowSerDe` to get a `Serde<GenericRow>`.

Now, client code only needs one call to `GenericRowSerDe.from`.
`GenericRowSerDe` also supports dependency injection via the new
`ValueSerdeFactory` interface.

`KsqlSerdeFactory` is now an implementation detail, encapsulated in the
serde module.

This code is highly in flux and will likely change again.
  • Loading branch information
big-andy-coates authored Aug 2, 2019
1 parent 268710a commit d5d2791
Show file tree
Hide file tree
Showing 41 changed files with 1,053 additions and 606 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,21 @@
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.FormatInfo;
import io.confluent.ksql.serde.GenericRowSerDe;
import io.confluent.ksql.serde.SerdeOption;
import io.confluent.ksql.serde.avro.KsqlAvroSerdeFactory;
import io.confluent.ksql.serde.json.KsqlJsonSerdeFactory;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.Pair;
import io.confluent.ksql.util.SchemaUtil;
import java.io.InputStream;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.avro.Schema;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
Expand Down Expand Up @@ -171,37 +173,43 @@ private static org.apache.kafka.connect.data.Schema convertFieldNamesToUppercase
private static Serde<GenericRow> getJsonSerdeHelper(
final org.apache.kafka.connect.data.Schema schema
) {
final PhysicalSchema physicalSchema = PhysicalSchema.from(
LogicalSchema.of(KEY_SCHEMA, schema),
SerdeOption.none()
return getGenericRowSerde(
FormatInfo.of(Format.JSON, Optional.empty()),
schema,
() -> null
);

return GenericRowSerDe.from(
new KsqlJsonSerdeFactory(),
physicalSchema,
new KsqlConfig(Collections.emptyMap()),
() -> null,
"benchmark",
ProcessingLogContext.create());
}

private static Serde<GenericRow> getAvroSerde(
final org.apache.kafka.connect.data.Schema schema
) {
final SchemaRegistryClient schemaRegistryClient = new MockSchemaRegistryClient();

return getGenericRowSerde(
FormatInfo.of(Format.AVRO, Optional.of("benchmarkSchema")),
schema,
() -> schemaRegistryClient
);
}

private static Serde<GenericRow> getGenericRowSerde(
final FormatInfo format,
final org.apache.kafka.connect.data.Schema schema,
final Supplier<SchemaRegistryClient> schemaRegistryClientFactory
) {
final PhysicalSchema physicalSchema = PhysicalSchema.from(
LogicalSchema.of(KEY_SCHEMA, schema),
SerdeOption.none()
);

return GenericRowSerDe.from(
new KsqlAvroSerdeFactory("benchmarkSchema"),
format,
physicalSchema,
new KsqlConfig(Collections.emptyMap()),
() -> schemaRegistryClient,
schemaRegistryClientFactory,
"benchmark",
ProcessingLogContext.create());
ProcessingLogContext.create()
);
}
}

Expand Down
60 changes: 6 additions & 54 deletions ksql-common/src/main/java/io/confluent/ksql/serde/KeyFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public static KeyFormat windowed(
) {
return new KeyFormat(
FormatInfo.of(format, Optional.empty()),
Optional.of(new WindowInfo(windowType, windowSize))
Optional.of(WindowInfo.of(windowType, windowSize))
);
}

Expand All @@ -68,7 +68,7 @@ public static KeyFormat windowed(
) {
return new KeyFormat(
FormatInfo.of(format, avroSchemaName),
Optional.of(new WindowInfo(windowType, windowSize))
Optional.of(WindowInfo.of(windowType, windowSize))
);
}

Expand All @@ -92,6 +92,10 @@ public boolean isWindowed() {
return window.isPresent();
}

public Optional<WindowInfo> getWindowInfo() {
return window;
}

public Optional<WindowType> getWindowType() {
return window.map(WindowInfo::getType);
}
Expand Down Expand Up @@ -126,56 +130,4 @@ public String toString() {
+ '}';
}

private static final class WindowInfo {

private final WindowType type;
private final Optional<Duration> size;

private WindowInfo(final WindowType type, final Optional<Duration> size) {
this.type = Objects.requireNonNull(type, "type");
this.size = Objects.requireNonNull(size, "size");

if (type.requiresWindowSize() && !size.isPresent()) {
throw new IllegalArgumentException("Size required");
}

if (!type.requiresWindowSize() && size.isPresent()) {
throw new IllegalArgumentException("Size not required");
}
}

public WindowType getType() {
return type;
}

public Optional<Duration> getSize() {
return size;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final WindowInfo that = (WindowInfo) o;
return type == that.type
&& Objects.equals(size, that.size);
}

@Override
public int hashCode() {
return Objects.hash(type, size);
}

@Override
public String toString() {
return "WindowInfo{"
+ "type=" + type
+ ", size=" + size.map(Duration::toMillis)
+ '}';
}
}
}
87 changes: 87 additions & 0 deletions ksql-common/src/main/java/io/confluent/ksql/serde/WindowInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.serde;

import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.model.WindowType;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;

/**
* Immutable pojo for storing info about a window.
*/
@Immutable
public final class WindowInfo {

private final WindowType type;
private final Optional<Duration> size;

public static WindowInfo of(final WindowType type, final Optional<Duration> size) {
return new WindowInfo(type, size);
}

private WindowInfo(final WindowType type, final Optional<Duration> size) {
this.type = Objects.requireNonNull(type, "type");
this.size = Objects.requireNonNull(size, "size");

if (type.requiresWindowSize() && !size.isPresent()) {
throw new IllegalArgumentException("Size required");
}

if (!type.requiresWindowSize() && size.isPresent()) {
throw new IllegalArgumentException("Size not required");
}

if (size.isPresent() && (size.get().isZero() || size.get().isNegative())) {
throw new IllegalArgumentException("Size must be positive");
}
}

public WindowType getType() {
return type;
}

public Optional<Duration> getSize() {
return size;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final WindowInfo that = (WindowInfo) o;
return type == that.type
&& Objects.equals(size, that.size);
}

@Override
public int hashCode() {
return Objects.hash(type, size);
}

@Override
public String toString() {
return "WindowInfo{"
+ "type=" + type
+ ", size=" + size.map(Duration::toMillis)
+ '}';
}
}
116 changes: 116 additions & 0 deletions ksql-common/src/test/java/io/confluent/ksql/serde/WindowInfoTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.serde;

import static io.confluent.ksql.model.WindowType.HOPPING;
import static io.confluent.ksql.model.WindowType.SESSION;
import static io.confluent.ksql.model.WindowType.TUMBLING;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;

import com.google.common.testing.EqualsTester;
import com.google.common.testing.NullPointerTester;
import io.confluent.ksql.model.WindowType;
import java.time.Duration;
import java.util.Optional;
import org.junit.Test;

public class WindowInfoTest {
@Test
public void shouldThrowNPEs() {
new NullPointerTester()
.testAllPublicStaticMethods(WindowInfo.class);
}

@Test
public void shouldImplementEquals() {
new EqualsTester()
.addEqualityGroup(
WindowInfo.of(SESSION, Optional.empty()),
WindowInfo.of(SESSION, Optional.empty())
)
.addEqualityGroup(
WindowInfo.of(TUMBLING, Optional.of(Duration.ofMillis(19))),
WindowInfo.of(TUMBLING, Optional.of(Duration.ofMillis(19)))
)
.addEqualityGroup(
WindowInfo.of(HOPPING, Optional.of(Duration.ofMillis(19))),
WindowInfo.of(HOPPING, Optional.of(Duration.ofMillis(19)))
)
.addEqualityGroup(
WindowInfo.of(TUMBLING, Optional.of(Duration.ofMillis(1010)))
)
.testEquals();
}

@Test
public void shouldImplementToString() {
// Given:
final WindowInfo windowInfo = WindowInfo.of(TUMBLING, Optional.of(Duration.ofMillis(19)));

// When:
final String result = windowInfo.toString();

// Then:
assertThat(result, containsString("TUMBLING"));
assertThat(result, containsString("19"));
}

@Test
public void shouldGetType() {
// Given:
final WindowInfo windowInfo = WindowInfo.of(SESSION, Optional.empty());

// When:
final WindowType result = windowInfo.getType();

// Then:
assertThat(result, is(SESSION));
}

@Test
public void shouldGetFormatInfo() {
// Given:
final WindowInfo windowInfo = WindowInfo.of(HOPPING, Optional.of(Duration.ofSeconds(10)));

// When:
final Optional<Duration> result = windowInfo.getSize();

// Then:
assertThat(result, is(Optional.of(Duration.ofSeconds(10))));
}

@Test(expected = IllegalArgumentException.class)
public void shouldThrowIfSizeProvidedButNotRequired() {
WindowInfo.of(SESSION, Optional.of(Duration.ofSeconds(10)));
}

@Test(expected = IllegalArgumentException.class)
public void shouldThrowIfSizeRequiredButNotProvided() {
WindowInfo.of(TUMBLING, Optional.empty());
}

@Test(expected = IllegalArgumentException.class)
public void shouldThrowIfSizeZero() {
WindowInfo.of(TUMBLING, Optional.of(Duration.ZERO));
}

@Test(expected = IllegalArgumentException.class)
public void shouldThrowIfSizeNegative() {
WindowInfo.of(TUMBLING, Optional.of(Duration.ofSeconds(-1)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private CreateStreamCommand handleCreateStream(
callInfo.sqlExpression,
statement,
callInfo.ksqlConfig,
serviceContext.getTopicClient());
serviceContext);
}

private CreateTableCommand handleCreateTable(
Expand All @@ -92,7 +92,7 @@ private CreateTableCommand handleCreateTable(
callInfo.sqlExpression,
statement,
callInfo.ksqlConfig,
serviceContext.getTopicClient());
serviceContext);
}

@SuppressWarnings("MethodMayBeStatic")
Expand Down
Loading

0 comments on commit d5d2791

Please sign in to comment.