Skip to content

Commit

Permalink
always increase query id
Browse files Browse the repository at this point in the history
  • Loading branch information
vpapavas committed Sep 23, 2020
1 parent 1b6b3dd commit 90366a1
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,7 @@ public void shouldListQueries() {

// Then
assertThat(queries.get(0).getQueryType(), is(QueryType.PERSISTENT));
assertThat(queries.get(0).getId(), is("CTAS_" + AGG_TABLE + "_0"));
assertThat(queries.get(0).getId(), is("CTAS_" + AGG_TABLE + "_5"));
assertThat(queries.get(0).getSql(), is(
"CREATE TABLE " + AGG_TABLE + " WITH (KAFKA_TOPIC='" + AGG_TABLE + "', PARTITIONS=1, REPLICAS=1) AS SELECT\n"
+ " " + TEST_STREAM + ".STR STR,\n"
Expand Down Expand Up @@ -894,7 +894,7 @@ public void shouldDescribeSource() throws Exception {
assertThat(description.valueFormat(), is("JSON"));
assertThat(description.readQueries(), hasSize(1));
assertThat(description.readQueries().get(0).getQueryType(), is(QueryType.PERSISTENT));
assertThat(description.readQueries().get(0).getId(), is("CTAS_" + AGG_TABLE + "_0"));
assertThat(description.readQueries().get(0).getId(), is("CTAS_" + AGG_TABLE + "_5"));
assertThat(description.readQueries().get(0).getSql(), is(
"CREATE TABLE " + AGG_TABLE + " WITH (KAFKA_TOPIC='" + AGG_TABLE + "', PARTITIONS=1, REPLICAS=1) AS SELECT\n"
+ " " + TEST_STREAM + ".STR STR,\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,15 +254,14 @@ public void processPriorCommands() {
}

final List<QueuedCommand> compacted = compactor.apply(compatibleCommands);
final QueryId greatestQueryId = RestoreCommandsCompactor.greatestQueryId;
compacted.forEach(
command -> {
currentCommandRef.set(new Pair<>(command, clock.instant()));
RetryUtil.retryWithBackoff(
maxRetries,
STATEMENT_RETRY_MS,
MAX_STATEMENT_RETRY_MS,
() -> statementExecutor.handleRestore(command, greatestQueryId),
() -> statementExecutor.handleRestore(command),
WakeupException.class
);
currentCommandRef.set(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,23 +125,19 @@ void handleStatement(final QueuedCommand queuedCommand) {
queuedCommand.getAndDeserializeCommandId(),
queuedCommand.getStatus(),
Mode.EXECUTE,
queuedCommand.getOffset(),
Optional.empty()
queuedCommand.getOffset()
);
}

void handleRestore(final QueuedCommand queuedCommand, final QueryId greatestQueryId) {
void handleRestore(final QueuedCommand queuedCommand) {
throwIfNotConfigured();

final Optional<QueryId> queryId = greatestQueryId == null
? Optional.empty() : Optional.of(greatestQueryId);
handleStatementWithTerminatedQueries(
queuedCommand.getAndDeserializeCommand(commandDeserializer),
queuedCommand.getAndDeserializeCommandId(),
queuedCommand.getStatus(),
Mode.RESTORE,
queuedCommand.getOffset(),
queryId
queuedCommand.getOffset()
);
}

Expand Down Expand Up @@ -195,8 +191,7 @@ private void handleStatementWithTerminatedQueries(
final CommandId commandId,
final Optional<CommandStatusFuture> commandStatusFuture,
final Mode mode,
final long offset,
final Optional<QueryId> greatestQueryId
final long offset
) {
try {
if (command.getPlan().isPresent()) {
Expand All @@ -206,8 +201,7 @@ private void handleStatementWithTerminatedQueries(
commandStatusFuture,
command.getPlan().get(),
mode,
offset,
greatestQueryId);
offset);
return;
}
final String statementString = command.getStatement();
Expand Down Expand Up @@ -240,8 +234,7 @@ private void executePlan(
final Optional<CommandStatusFuture> commandStatusFuture,
final KsqlPlan plan,
final Mode mode,
final long offset,
final Optional<QueryId> greatestQueryId
final long offset
) {
final KsqlConfig mergedConfig = buildMergedConfig(command);
final ConfiguredKsqlPlan configured = ConfiguredKsqlPlan.of(
Expand All @@ -254,15 +247,8 @@ private void executePlan(
new CommandStatus(CommandStatus.Status.EXECUTING, "Executing statement")
);
final ExecuteResult result = ksqlEngine.execute(serviceContext, configured);
long queryID = Long.MIN_VALUE;
if (greatestQueryId.isPresent() && mode == Mode.RESTORE) {
final String ltq = greatestQueryId.get().toString();
final int lastIndex = ltq.lastIndexOf("_");
queryID = Long.parseLong(ltq.substring(lastIndex + 1));
queryIdGenerator.setNextId(queryID + 1);
}
queryIdGenerator.setNextId(offset + 1);
if (result.getQuery().isPresent()) {
queryIdGenerator.setNextId(offset + 1);
if (mode == Mode.EXECUTE) {
result.getQuery().get().start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,12 @@
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Util for compacting the restore commands
*/
public final class RestoreCommandsCompactor {

static QueryId greatestQueryId;
private static final Logger LOG = LoggerFactory.getLogger(RestoreCommandsCompactor.class);

private RestoreCommandsCompactor() {
}

Expand Down Expand Up @@ -91,8 +86,6 @@ public static CompactedNode maybeAppend(
if (queued.getAndDeserializeCommandId().getType() == Type.TERMINATE) {
final QueryId queryId = new QueryId(queued.getAndDeserializeCommandId().getEntity());
markShouldSkip(queryId, latestNodeWithId);
//keep track of the last terminate command
//lastTerminateQueryId = queryId;

// terminate commands don't get added to the list of commands to execute
// because we "execute" them in this class by removing query plans from
Expand Down Expand Up @@ -135,12 +128,6 @@ private CompactedNode(
private static Optional<QueuedCommand> compact(final CompactedNode node) {
final Command command = node.command;

if (command.getPlan().get().getQueryPlan().isPresent()) {
if (greatestQueryId == null) {
greatestQueryId = command.getPlan().get().getQueryPlan().get().getQueryId();
}
}

if (!node.shouldSkip) {
return Optional.of(node.queued);
}
Expand All @@ -165,9 +152,4 @@ private static Optional<QueuedCommand> compact(final CompactedNode node) {
node.queued.getOffset()
));
}

static QueryId getGreatestQueryId() {
return greatestQueryId;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -960,7 +960,7 @@ private void handleStatement(

private void terminateQueries() {
final Command terminateCommand1 = new Command(
"TERMINATE CSAS_USER1PV_0;",
"TERMINATE CSAS_USER1PV_1;",
emptyMap(),
ksqlConfig.getAllConfigPropsWithSecretsObfuscated(),
Optional.empty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ public void shouldRecoverRecreates() {
server1.submitCommands(
"CREATE STREAM A (ROWKEY STRING KEY, C1 STRING, C2 INT) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');",
"CREATE STREAM B AS SELECT ROWKEY, C1 FROM A;",
"TERMINATE CsAs_b_0;",
"TERMINATE CsAs_b_1;",
"DROP STREAM B;",
"CREATE STREAM B AS SELECT ROWKEY, C2 FROM A;"
);
Expand All @@ -603,7 +603,7 @@ public void shouldRecoverReplacesWithTerminates() {
"CREATE STREAM A (ROWKEY STRING KEY, C1 STRING, C2 INT) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');",
"CREATE STREAM B AS SELECT ROWKEY, C1 FROM A;",
"CREATE OR REPLACE STREAM B AS SELECT ROWKEY, C1, C2 FROM A;",
"TERMINATE CSAS_B_0;",
"TERMINATE CSAS_B_1;",
"DROP STREAM B;",
"CREATE STREAM B AS SELECT ROWKEY, C1 FROM A;"
);
Expand All @@ -627,7 +627,7 @@ public void shouldRecoverInsertIntosRecreates() {
"CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');",
"CREATE STREAM B (COLUMN STRING) WITH (KAFKA_TOPIC='B', VALUE_FORMAT='JSON', PARTITIONS=1);",
"INSERT INTO B SELECT * FROM A;",
"TERMINATE InsertQuery_0;",
"TERMINATE InsertQuery_2;",
"INSERT INTO B SELECT * FROM A;"
);
shouldRecover(commands);
Expand All @@ -639,7 +639,7 @@ public void shouldRecoverTerminates() {
"CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');",
"CREATE STREAM B AS SELECT * FROM A;",
"INSERT INTO B SELECT * FROM A;",
"TERMINATE CSAS_B_0;",
"TERMINATE CSAS_B_1;",
"TERMINATE InsertQuery_2;"
);
shouldRecover(commands);
Expand All @@ -650,7 +650,7 @@ public void shouldRecoverDrop() {
server1.submitCommands(
"CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');",
"CREATE STREAM B AS SELECT * FROM A;",
"TERMINATE CSAS_B_0;",
"TERMINATE CSAS_B_1;",
"DROP STREAM B;"
);
shouldRecover(commands);
Expand All @@ -661,7 +661,7 @@ public void shouldRecoverWithDuplicateTerminateAndDrop() {
server1.submitCommands(
"CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');",
"CREATE STREAM B AS SELECT * FROM A;",
"TERMINATE CSAS_B_0;"
"TERMINATE CSAS_B_1;"
);

addDuplicateOfLastCommand(); // Add duplicate of "TERMINATE CSAS_B_0;"
Expand All @@ -681,7 +681,7 @@ public void shouldNotDeleteTopicsOnRecovery() {
server1.submitCommands(
"CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');",
"CREATE STREAM B AS SELECT * FROM A;",
"TERMINATE CSAS_B_0;",
"TERMINATE CSAS_B_1;",
"DROP STREAM B DELETE TOPIC;"
);

Expand All @@ -704,39 +704,39 @@ public void shouldRecoverQueryIDs() {
final Set<QueryId> queryIdNames = queriesById(server.ksqlEngine.getPersistentQueries())
.keySet();

assertThat(queryIdNames, contains(new QueryId("CSAS_C_0")));
assertThat(queryIdNames, contains(new QueryId("CSAS_C_1")));
}

@Test
public void shouldIncrementQueryIDsNoPlans() {
server1.submitCommands(
"CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');",
"CREATE STREAM B AS SELECT * FROM A;",
"TERMINATE CSAS_B_0;");
"TERMINATE CSAS_B_1;");

final KsqlServer server = new KsqlServer(commands);
server.recover();
server.submitCommands("CREATE STREAM C AS SELECT * FROM A;");
final Set<QueryId> queryIdNames = queriesById(server.ksqlEngine.getPersistentQueries())
.keySet();

assertThat(queryIdNames, contains(new QueryId("CSAS_C_1")));
assertThat(queryIdNames, contains(new QueryId("CSAS_C_2")));
}

@Test
public void shouldIncrementQueryIDsWithPlan() {
server1.submitCommands(
"CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');",
"CREATE STREAM B AS SELECT * FROM A;",
"TERMINATE CSAS_B_0;");
"TERMINATE CSAS_B_1;");

final KsqlServer server = new KsqlServer(commands);
server.recover();
server.submitCommands("CREATE STREAM C AS SELECT * FROM A;");
final Set<QueryId> queryIdNames = queriesById(server.ksqlEngine.getPersistentQueries())
.keySet();

assertThat(queryIdNames, contains(new QueryId("CSAS_C_1")));
assertThat(queryIdNames, contains(new QueryId("CSAS_C_2")));
}


Expand Down

0 comments on commit 90366a1

Please sign in to comment.