diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/Command.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/Command.java index d95d234aa89f..c89a0f8246d3 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/Command.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/Command.java @@ -20,10 +20,12 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import io.confluent.ksql.engine.KsqlPlan; import io.confluent.ksql.planner.plan.ConfiguredKsqlPlan; import io.confluent.ksql.statement.ConfiguredStatement; +import io.confluent.ksql.util.KsqlException; import java.util.Collections; import java.util.Map; import java.util.Objects; @@ -31,31 +33,59 @@ @JsonSubTypes({}) public class Command { + + @VisibleForTesting + static final int VERSION = 0; + private final String statement; private final Map overwriteProperties; private final Map originalProperties; private final Optional plan; + private final Optional version; @JsonCreator public Command( @JsonProperty(value = "statement", required = true) final String statement, @JsonProperty("streamsProperties") final Optional> overwriteProperties, @JsonProperty("originalProperties") final Optional> originalProperties, - @JsonProperty("plan") final Optional plan + @JsonProperty("plan") final Optional plan, + @JsonProperty("version") final Optional version ) { this( statement, overwriteProperties.orElseGet(ImmutableMap::of), originalProperties.orElseGet(ImmutableMap::of), - plan + plan, + version, + VERSION ); } + @VisibleForTesting public Command( final String statement, final Map overwriteProperties, final Map originalProperties, final Optional plan + ) { + this( + statement, + overwriteProperties, + originalProperties, + plan, + Optional.of(VERSION), + VERSION + ); + } + + @VisibleForTesting + Command( + final String statement, + final Map overwriteProperties, + final Map originalProperties, + final Optional plan, + final Optional version, + final int expectedVersion ) { this.statement = requireNonNull(statement, "statement"); this.overwriteProperties = Collections.unmodifiableMap( @@ -63,6 +93,13 @@ public Command( this.originalProperties = Collections.unmodifiableMap( requireNonNull(originalProperties, "originalProperties")); this.plan = requireNonNull(plan, "plan"); + this.version = requireNonNull(version, "version"); + + if (expectedVersion != version.orElse(0)) { + throw new KsqlException( + "Received a command from an incompatible command topic version. " + + "Expected " + expectedVersion + " but got " + version.orElse(0)); + } } public String getStatement() { @@ -82,12 +119,18 @@ public Optional getPlan() { return plan; } + public Optional getVersion() { + return version; + } + public static Command of(final ConfiguredKsqlPlan configuredPlan) { return new Command( configuredPlan.getPlan().getStatementText(), configuredPlan.getOverrides(), configuredPlan.getConfig().getAllConfigPropsWithSecretsObfuscated(), - Optional.of(configuredPlan.getPlan()) + Optional.of(configuredPlan.getPlan()), + Optional.of(VERSION), + VERSION ); } @@ -96,7 +139,9 @@ public static Command of(final ConfiguredStatement configuredStatement) { configuredStatement.getStatementText(), configuredStatement.getConfigOverrides(), configuredStatement.getConfig().getAllConfigPropsWithSecretsObfuscated(), - Optional.empty() + Optional.empty(), + Optional.of(VERSION), + VERSION ); } @@ -119,6 +164,8 @@ public String toString() { return "Command{" + "statement='" + statement + '\'' + ", overwriteProperties=" + overwriteProperties + + ", version=" + version + '}'; } + } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/RestoreCommandsCompactor.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/RestoreCommandsCompactor.java index 6b0ffb88e7c3..bb930876e795 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/RestoreCommandsCompactor.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/RestoreCommandsCompactor.java @@ -120,9 +120,10 @@ private static Optional buildNewCmdWithoutQuery(final QueuedComma final Command newCommand = new Command( command.getStatement(), - command.getOverwriteProperties(), - command.getOriginalProperties(), - command.getPlan().map(KsqlPlan::withoutQuery) + Optional.of(command.getOverwriteProperties()), + Optional.of(command.getOriginalProperties()), + command.getPlan().map(KsqlPlan::withoutQuery), + command.getVersion() ); return Optional.of(new QueuedCommand( diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandTest.java index f80607dde42c..78ef448c61fe 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandTest.java @@ -21,10 +21,12 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.confluent.ksql.execution.json.PlanJsonMapper; +import io.confluent.ksql.util.Version; import java.io.IOException; import java.util.Collections; import java.util.Map; import java.util.Optional; +import org.junit.Ignore; import org.junit.Test; public class CommandTest { @@ -46,6 +48,26 @@ public void shouldDeserializeCorrectly() throws IOException { assertThat(command.getOriginalProperties(), equalTo(expectedOriginalProperties)); } + @Test + public void shouldDeserializeCorrectlyWithVersion() throws IOException { + final String commandStr = "{" + + "\"statement\": \"test statement;\", " + + "\"streamsProperties\": {\"foo\": \"bar\"}, " + + "\"originalProperties\": {\"biz\": \"baz\"}, " + + "\"version\": " + Command.VERSION + + "}"; + final ObjectMapper mapper = PlanJsonMapper.INSTANCE.get(); + final Command command = mapper.readValue(commandStr, Command.class); + assertThat(command.getStatement(), equalTo("test statement;")); + final Map expecteOverwriteProperties + = Collections.singletonMap("foo", "bar"); + assertThat(command.getOverwriteProperties(), equalTo(expecteOverwriteProperties)); + final Map expectedOriginalProperties + = Collections.singletonMap("biz", "baz"); + assertThat(command.getOriginalProperties(), equalTo(expectedOriginalProperties)); + assertThat(command.getVersion(), is(Optional.of(Command.VERSION))); + } + private void grep(final String string, final String regex) { assertThat(string.matches(regex), is(true)); } @@ -61,6 +83,7 @@ public void shouldSerializeDeserializeCorrectly() throws IOException { grep(serialized, ".*\"streamsProperties\" *: *\\{ *\"foo\" *: *\"bar\" *\\}.*"); grep(serialized, ".*\"statement\" *: *\"test statement;\".*"); grep(serialized, ".*\"originalProperties\" *: *\\{ *\"biz\" *: *\"baz\" *\\}.*"); + grep(serialized, ".*\"version\" *: *" + Command.VERSION + ".*"); final Command deserialized = mapper.readValue(serialized, Command.class); assertThat(deserialized, equalTo(command)); }