Skip to content

Commit

Permalink
feat: fail startup if command contains incompatible version (#5104)
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra authored Apr 23, 2020
1 parent e10a6c0 commit a1751b1
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,49 +20,86 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.engine.KsqlPlan;
import io.confluent.ksql.planner.plan.ConfiguredKsqlPlan;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

@JsonSubTypes({})
public class Command {

@VisibleForTesting
static final int VERSION = 0;

private final String statement;
private final Map<String, Object> overwriteProperties;
private final Map<String, String> originalProperties;
private final Optional<KsqlPlan> plan;
private final Optional<Integer> version;

@JsonCreator
public Command(
@JsonProperty(value = "statement", required = true) final String statement,
@JsonProperty("streamsProperties") final Optional<Map<String, Object>> overwriteProperties,
@JsonProperty("originalProperties") final Optional<Map<String, String>> originalProperties,
@JsonProperty("plan") final Optional<KsqlPlan> plan
@JsonProperty("plan") final Optional<KsqlPlan> plan,
@JsonProperty("version") final Optional<Integer> version
) {
this(
statement,
overwriteProperties.orElseGet(ImmutableMap::of),
originalProperties.orElseGet(ImmutableMap::of),
plan
plan,
version,
VERSION
);
}

@VisibleForTesting
public Command(
final String statement,
final Map<String, Object> overwriteProperties,
final Map<String, String> originalProperties,
final Optional<KsqlPlan> plan
) {
this(
statement,
overwriteProperties,
originalProperties,
plan,
Optional.of(VERSION),
VERSION
);
}

@VisibleForTesting
Command(
final String statement,
final Map<String, Object> overwriteProperties,
final Map<String, String> originalProperties,
final Optional<KsqlPlan> plan,
final Optional<Integer> version,
final int expectedVersion
) {
this.statement = requireNonNull(statement, "statement");
this.overwriteProperties = Collections.unmodifiableMap(
requireNonNull(overwriteProperties, "overwriteProperties"));
this.originalProperties = Collections.unmodifiableMap(
requireNonNull(originalProperties, "originalProperties"));
this.plan = requireNonNull(plan, "plan");
this.version = requireNonNull(version, "version");

if (expectedVersion != version.orElse(0)) {
throw new KsqlException(
"Received a command from an incompatible command topic version. "
+ "Expected " + expectedVersion + " but got " + version.orElse(0));
}
}

public String getStatement() {
Expand All @@ -82,12 +119,18 @@ public Optional<KsqlPlan> getPlan() {
return plan;
}

public Optional<Integer> getVersion() {
return version;
}

public static Command of(final ConfiguredKsqlPlan configuredPlan) {
return new Command(
configuredPlan.getPlan().getStatementText(),
configuredPlan.getOverrides(),
configuredPlan.getConfig().getAllConfigPropsWithSecretsObfuscated(),
Optional.of(configuredPlan.getPlan())
Optional.of(configuredPlan.getPlan()),
Optional.of(VERSION),
VERSION
);
}

Expand All @@ -96,7 +139,9 @@ public static Command of(final ConfiguredStatement<?> configuredStatement) {
configuredStatement.getStatementText(),
configuredStatement.getConfigOverrides(),
configuredStatement.getConfig().getAllConfigPropsWithSecretsObfuscated(),
Optional.empty()
Optional.empty(),
Optional.of(VERSION),
VERSION
);
}

Expand All @@ -119,6 +164,8 @@ public String toString() {
return "Command{"
+ "statement='" + statement + '\''
+ ", overwriteProperties=" + overwriteProperties
+ ", version=" + version
+ '}';
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,10 @@ private static Optional<QueuedCommand> buildNewCmdWithoutQuery(final QueuedComma

final Command newCommand = new Command(
command.getStatement(),
command.getOverwriteProperties(),
command.getOriginalProperties(),
command.getPlan().map(KsqlPlan::withoutQuery)
Optional.of(command.getOverwriteProperties()),
Optional.of(command.getOriginalProperties()),
command.getPlan().map(KsqlPlan::withoutQuery),
command.getVersion()
);

return Optional.of(new QueuedCommand(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.ksql.execution.json.PlanJsonMapper;
import io.confluent.ksql.util.Version;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import org.junit.Ignore;
import org.junit.Test;

public class CommandTest {
Expand All @@ -46,6 +48,26 @@ public void shouldDeserializeCorrectly() throws IOException {
assertThat(command.getOriginalProperties(), equalTo(expectedOriginalProperties));
}

@Test
public void shouldDeserializeCorrectlyWithVersion() throws IOException {
final String commandStr = "{" +
"\"statement\": \"test statement;\", " +
"\"streamsProperties\": {\"foo\": \"bar\"}, " +
"\"originalProperties\": {\"biz\": \"baz\"}, " +
"\"version\": " + Command.VERSION +
"}";
final ObjectMapper mapper = PlanJsonMapper.INSTANCE.get();
final Command command = mapper.readValue(commandStr, Command.class);
assertThat(command.getStatement(), equalTo("test statement;"));
final Map<String, Object> expecteOverwriteProperties
= Collections.singletonMap("foo", "bar");
assertThat(command.getOverwriteProperties(), equalTo(expecteOverwriteProperties));
final Map<String, Object> expectedOriginalProperties
= Collections.singletonMap("biz", "baz");
assertThat(command.getOriginalProperties(), equalTo(expectedOriginalProperties));
assertThat(command.getVersion(), is(Optional.of(Command.VERSION)));
}

private void grep(final String string, final String regex) {
assertThat(string.matches(regex), is(true));
}
Expand All @@ -61,6 +83,7 @@ public void shouldSerializeDeserializeCorrectly() throws IOException {
grep(serialized, ".*\"streamsProperties\" *: *\\{ *\"foo\" *: *\"bar\" *\\}.*");
grep(serialized, ".*\"statement\" *: *\"test statement;\".*");
grep(serialized, ".*\"originalProperties\" *: *\\{ *\"biz\" *: *\"baz\" *\\}.*");
grep(serialized, ".*\"version\" *: *" + Command.VERSION + ".*");
final Command deserialized = mapper.readValue(serialized, Command.class);
assertThat(deserialized, equalTo(command));
}
Expand Down

0 comments on commit a1751b1

Please sign in to comment.