Skip to content

Commit

Permalink
Unique(ish) query Id per pull query (#3479)
Browse files Browse the repository at this point in the history
* feat(static): unique id per static query

Add a unique(ish) id for each static query, based on the current time.

Any entries in the processing log will be tagged with this query id, which will aid debugging.

There's obviously still potential for two queries to end up with the same id, but this is unlikely and will not cause any issues if it does happen, except for maybe making it a little harder to debug any issues.

* feat(static): expose generated id over rest API
  • Loading branch information
big-andy-coates authored Oct 10, 2019
1 parent be2bdcc commit 804c578
Show file tree
Hide file tree
Showing 40 changed files with 164 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.collect.ImmutableList;
import io.confluent.ksql.cli.console.table.Table;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.rest.entity.TableRowsEntity;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
Expand Down Expand Up @@ -74,6 +75,8 @@ public class TableRowsTableBuilderTest {
10L, 5.1D, 123456L, 23456L, "x", 5
);

private static final QueryId QUERY_ID = new QueryId("bob");

private TableRowsTableBuilder builder;

@Before
Expand All @@ -86,6 +89,7 @@ public void shouldBuildTableHeadings() {
// Given:
final TableRowsEntity entity = new TableRowsEntity(
SOME_SQL,
QUERY_ID,
SCHEMA,
ImmutableList.of(VALUES)
);
Expand All @@ -107,6 +111,7 @@ public void shouldBuildTimeWindowedTableHeadings() {
// Given:
final TableRowsEntity entity = new TableRowsEntity(
SOME_SQL,
QUERY_ID,
TIME_WINDOW_SCHEMA,
ImmutableList.of(TIME_WINDOW_VALUES)
);
Expand All @@ -129,6 +134,7 @@ public void shouldBuildSessionWindowedTableHeadings() {
// Given:
final TableRowsEntity entity = new TableRowsEntity(
SOME_SQL,
QUERY_ID,
SESSION_WINDOW_SCHEMA,
ImmutableList.of(SESSION_WINDOW_VALUES)
);
Expand All @@ -152,6 +158,7 @@ public void shouldBuildTableRows() {
// Given:
final TableRowsEntity entity = new TableRowsEntity(
SOME_SQL,
QUERY_ID,
SCHEMA,
ImmutableList.of(VALUES)
);
Expand All @@ -169,6 +176,7 @@ public void shouldBuildTimeWindowedTableRows() {
// Given:
final TableRowsEntity entity = new TableRowsEntity(
SOME_SQL,
QUERY_ID,
TIME_WINDOW_SCHEMA,
ImmutableList.of(TIME_WINDOW_VALUES)
);
Expand All @@ -186,6 +194,7 @@ public void shouldBuildSessionWindowedTableRows() {
// Given:
final TableRowsEntity entity = new TableRowsEntity(
SOME_SQL,
QUERY_ID,
SESSION_WINDOW_SCHEMA,
ImmutableList.of(SESSION_WINDOW_VALUES)
);
Expand All @@ -203,6 +212,7 @@ public void shouldHandleNullFields() {
// Given:
final TableRowsEntity entity = new TableRowsEntity(
SOME_SQL,
QUERY_ID,
SCHEMA,
ImmutableList.of(Arrays.asList(10L, null, "x", null))
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,19 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.ArgumentMatchers.any;

import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.util.KsqlConfig;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import io.confluent.ksql.util.KsqlConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand Down
51 changes: 51 additions & 0 deletions ksql-common/src/test/java/io/confluent/ksql/query/QueryIdTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.query;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.ksql.json.JsonMapper;
import org.junit.Test;

public class QueryIdTest {

private final ObjectMapper objectMapper = JsonMapper.INSTANCE.mapper;

@Test
public void shouldSerializeCorrectly() throws Exception {
// Given:
final QueryId id = new QueryId("query-id");

// When:
final String serialized = objectMapper.writeValueAsString(id);

assertThat(serialized, is("\"query-id\""));
}

@Test
public void shouldDeserializeCorrectly() throws Exception {
// Given:
final String serialized = "\"an-id\"";

// When:
final QueryId deserialized = objectMapper.readValue(serialized, QueryId.class);

// Then:
assertThat(deserialized, is(new QueryId("an-id")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.notNullValue;

import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.function.GenericsUtil;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlStatementException;

import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.TransientQueryMetadata;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,23 @@
*/
public class HybridQueryIdGenerator implements QueryIdGenerator {

private final SequentialQueryIdGenerator legacyGenerator;
private final SpecificQueryIdGenerator newGenerator;
private AtomicReference<QueryIdGenerator> activeGenerator;
private final AtomicReference<QueryIdGenerator> activeGenerator;

public HybridQueryIdGenerator() {
this(0L);
}

public HybridQueryIdGenerator(final long initialValue) {
this.legacyGenerator = new SequentialQueryIdGenerator(initialValue);
this.newGenerator = new SpecificQueryIdGenerator();
this.activeGenerator = new AtomicReference<>(this.legacyGenerator);
this(
new SequentialQueryIdGenerator(),
new SpecificQueryIdGenerator()
);
}

@VisibleForTesting
HybridQueryIdGenerator(
final SequentialQueryIdGenerator sequentialQueryIdGenerator,
final SpecificQueryIdGenerator specificQueryIdGenerator
) {
this.legacyGenerator = sequentialQueryIdGenerator;
this.newGenerator = specificQueryIdGenerator;
this.activeGenerator = new AtomicReference<>(this.legacyGenerator);
this.activeGenerator = new AtomicReference<>(sequentialQueryIdGenerator);
}

public void activateNewGenerator(final long nextId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.SchemaUtil;
import io.confluent.ksql.util.TestDataProvider;
import java.time.Duration;
import java.util.Arrays;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@
import com.google.common.collect.Range;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.integration.IntegrationTestHarness;
import io.confluent.ksql.integration.Retry;
import io.confluent.ksql.integration.TestKsqlContext;
import io.confluent.ksql.execution.streams.materialization.Materialization;
import io.confluent.ksql.execution.streams.materialization.MaterializedTable;
import io.confluent.ksql.execution.streams.materialization.MaterializedWindowedTable;
import io.confluent.ksql.execution.streams.materialization.Row;
import io.confluent.ksql.execution.streams.materialization.Window;
import io.confluent.ksql.execution.streams.materialization.WindowedRow;
import io.confluent.ksql.integration.IntegrationTestHarness;
import io.confluent.ksql.integration.Retry;
import io.confluent.ksql.integration.TestKsqlContext;
import io.confluent.ksql.model.WindowType;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.query.QueryId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,8 @@
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.PersistenceSchema;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import io.confluent.ksql.serde.KeySerde;
import io.confluent.ksql.serde.WindowInfo;
import io.confluent.ksql.structured.SchemaKStream;
import io.confluent.ksql.structured.SchemaKTable;
import io.confluent.ksql.testutils.AnalysisTestUtil;
Expand Down Expand Up @@ -87,7 +85,6 @@
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.Windowed;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static io.confluent.ksql.planner.plan.PlanTestUtil.verifyProcessorNode;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
Expand All @@ -29,7 +28,6 @@
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand All @@ -38,7 +36,6 @@
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.builder.KsqlQueryBuilder;
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.execution.context.QueryLoggerUtil;
import io.confluent.ksql.execution.ddl.commands.KsqlTopic;
import io.confluent.ksql.execution.plan.StreamSource;
import io.confluent.ksql.execution.streams.KSPlanBuilder;
Expand All @@ -47,11 +44,9 @@
import io.confluent.ksql.metastore.model.KeyField;
import io.confluent.ksql.metastore.model.KsqlStream;
import io.confluent.ksql.metastore.model.KsqlTable;
import io.confluent.ksql.model.WindowType;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.ColumnRef;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
Expand All @@ -62,37 +57,28 @@
import io.confluent.ksql.serde.KeySerde;
import io.confluent.ksql.serde.SerdeOption;
import io.confluent.ksql.serde.ValueFormat;
import io.confluent.ksql.serde.WindowInfo;
import io.confluent.ksql.structured.SchemaKStream;
import io.confluent.ksql.structured.SchemaKTable;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.SchemaUtil;
import io.confluent.ksql.util.timestamp.LongColumnTimestampExtractionPolicy;
import io.confluent.ksql.util.timestamp.TimestampExtractionPolicy;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.ListIterator;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.Topology.AutoOffsetReset;
import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package io.confluent.ksql.query;

import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same;
Expand All @@ -19,18 +19,18 @@
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.execution.context.QueryContext.Stacker;
import io.confluent.ksql.execution.ddl.commands.KsqlTopic;
import io.confluent.ksql.execution.materialization.MaterializationInfo;
import io.confluent.ksql.execution.plan.ExecutionStep;
import io.confluent.ksql.execution.plan.KStreamHolder;
import io.confluent.ksql.execution.plan.KTableHolder;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.logging.processing.ProcessingLoggerFactory;
import io.confluent.ksql.execution.streams.materialization.KsqlMaterializationFactory;
import io.confluent.ksql.execution.streams.materialization.Materialization;
import io.confluent.ksql.execution.materialization.MaterializationInfo;
import io.confluent.ksql.execution.streams.materialization.ks.KsMaterialization;
import io.confluent.ksql.execution.streams.materialization.ks.KsMaterializationFactory;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.logging.processing.ProcessingLoggerFactory;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.metastore.model.DataSource.DataSourceType;
import io.confluent.ksql.metrics.ConsumerCollector;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@

import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.Assert.assertThat;

import com.google.common.collect.ImmutableSet;
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.query.QueryId;
import org.junit.Test;

public class QueryContextTest {
Expand Down
Loading

0 comments on commit 804c578

Please sign in to comment.