Skip to content

Commit

Permalink
feat: add serialization exceptions to processing logger (#6084)
Browse files Browse the repository at this point in the history
  • Loading branch information
vcrfxia authored Aug 25, 2020
1 parent 6ef7653 commit 8ab98a5
Show file tree
Hide file tree
Showing 12 changed files with 607 additions and 49 deletions.
70 changes: 57 additions & 13 deletions docs/developer-guide/test-and-debug/processing-log.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ writing the processing log to {{ site.ak }} and consuming it as ksqlDB stream.

!!! important
The processing log is not for server logging, but rather for per-record
logging on ksqlDB applications. If you want to configure a Kafka appender
logging on ksqlDB applications. If you want to configure an {{ site.ak }} appender
for the server logs, assign the `log4j.appender.kafka_appender.Topic`
and `log4j.logger.io.confluent.ksql` configuration settings in the ksqlDB
Server config file. For more information, see
Expand Down Expand Up @@ -114,12 +114,12 @@ message.type (INT)

: An int that describes the type of the log message. Currently, the
following types are defined: 0 (DESERIALIZATION_ERROR), 1
(RECORD_PROCESSING_ERROR), 2 (PRODUCTION_ERROR).
(RECORD_PROCESSING_ERROR), 2 (PRODUCTION_ERROR), 3 (SERIALIZATION_ERROR).

message.deserializationError (STRUCT)

: The contents of a message with type 0 (DESERIALIZATION_ERROR).
Logged when a deserializer fails to deserialize a Kafka record.
Logged when a deserializer fails to deserialize an {{ site.ak }} record.

message.deserializationError.errorMessage (STRING)

Expand All @@ -128,7 +128,16 @@ message.deserializationError.errorMessage (STRING)

message.deserializationError.recordB64 (STRING)

: The Kafka record, encoded in Base64.
: The {{ site.ak }} record, encoded in Base64.

message.deserializationError.cause (LIST<STRING>)

: A list of strings containing human-readable error messages
for the chain of exceptions that caused the main error.

message.deserializationError.topic (STRING)

: The {{ site.ak }} topic of the record for which deserialization failed.

message.recordProcessingError (STRUCT)

Expand All @@ -146,21 +155,50 @@ message.recordProcessingError.record (STRING)

: The SQL record, serialized as a JSON string.

message.recordProcessingError.cause (LIST<STRING>)

: A list of strings containing human-readable error messages
for the chain of exceptions that caused the main error.

message.productionError (STRUCT)

: The contents of a message with type 2 (PRODUCTION_ERROR). Logged
when a producer fails to publish a Kafka record.
when a producer fails to publish an {{ site.ak }} record.

message.productionError.errorMessage (STRING)

: A string containing a human-readable error message detailing the
error encountered.

message.serializationError (STRUCT)

: The contents of a message with type 3 (SERIALIZATION_ERROR).
Logged when a serializer fails to serialize a ksqlDB row.

message.serializationError.errorMessage (STRING)

: A string containing a human-readable error message detailing the
error encountered.

message.serializationError.record (STRING)

: The ksqlDB row, as a human-readable string.

message.serializationError.cause (LIST<STRING>)

: A list of strings containing human-readable error messages
for the chain of exceptions that caused the main error.

message.serializationError.topic (STRING)

: The {{ site.ak }} topic to which the ksqlDB row that failed to serialize
would have been produced.

Log Stream
----------

We recommend configuring the query processing log to write entries back
to Kafka. This way, you can configure ksqlDB to set up a stream over the
to {{ site.ak }}. This way, you can configure ksqlDB to set up a stream over the
topic automatically.

To log to Kafka, set up a Kafka appender and a special layout for
Expand Down Expand Up @@ -223,12 +261,12 @@ ksql> describe PROCESSING_LOG;
Name : PROCESSING_LOG
Field | Type
---------------------------------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
LOGGER | VARCHAR(STRING)
LEVEL | VARCHAR(STRING)
TIME | BIGINT
MESSAGE | STRUCT<type INTEGER, deserializationError STRUCT<errorMessage VARCHAR(STRING), recordB64 VARCHAR(STRING)>, ...>
---------------------------------------------------------------------------------------------------------------------------
MESSAGE | STRUCT<type INTEGER, deserializationError STRUCT<errorMessage VARCHAR(STRING), recordB64 VARCHAR(STRING), cause ARRAY<VARCHAR(STRING)>, topic VARCHAR(STRING)>, ...>
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
```

You can query the stream just like you would any other ksqlDB stream.
Expand All @@ -244,14 +282,20 @@ ksql> CREATE STREAM PROCESSING_LOG_STREAM (
`TYPE` INTEGER,
deserializationError STRUCT<
errorMessage STRING,
recordB64 STRING,
cause ARRAY<STRING>,
recordB64 STRING>,
`topic` STRING>,
recordProcessingError STRUCT<
errorMessage STRING,
cause ARRAY<STRING>,
record STRING>,
record STRING,
cause ARRAY<STRING>>,
productionError STRUCT<
errorMessage STRING>>)
errorMessage STRING>,
serializationError STRUCT<
errorMessage STRING,
record STRING,
cause ARRAY<STRING>,
`topic` STRING>>)
WITH (KAFKA_TOPIC='processing_log_topic', VALUE_FORMAT='JSON');
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,25 @@ public final class ProcessingLogMessageSchema {
.optional()
.build();

public static final String SERIALIZATION_ERROR_FIELD_MESSAGE = "errorMessage";
public static final String SERIALIZATION_ERROR_FIELD_RECORD = "record";
public static final String SERIALIZATION_ERROR_FIELD_CAUSE = "cause";
public static final String SERIALIZATION_ERROR_FIELD_TOPIC = "topic";

private static final Schema SERIALIZATION_ERROR_SCHEMA = SchemaBuilder.struct()
.name(NAMESPACE + "SerializationError")
.field(SERIALIZATION_ERROR_FIELD_MESSAGE, Schema.OPTIONAL_STRING_SCHEMA)
.field(SERIALIZATION_ERROR_FIELD_RECORD, Schema.OPTIONAL_STRING_SCHEMA)
.field(SERIALIZATION_ERROR_FIELD_CAUSE, CAUSE_SCHEMA)
.field(SERIALIZATION_ERROR_FIELD_TOPIC, Schema.OPTIONAL_STRING_SCHEMA)
.optional()
.build();

public enum MessageType {
DESERIALIZATION_ERROR(0, DESERIALIZATION_ERROR_SCHEMA),
RECORD_PROCESSING_ERROR(1, RECORD_PROCESSING_ERROR_SCHEMA),
PRODUCTION_ERROR(2, PRODUCTION_ERROR_SCHEMA);
PRODUCTION_ERROR(2, PRODUCTION_ERROR_SCHEMA),
SERIALIZATION_ERROR(3, SERIALIZATION_ERROR_SCHEMA);

private final int typeId;
private final Schema schema;
Expand All @@ -84,13 +99,15 @@ public Schema getSchema() {
public static final String DESERIALIZATION_ERROR = "deserializationError";
public static final String RECORD_PROCESSING_ERROR = "recordProcessingError";
public static final String PRODUCTION_ERROR = "productionError";
public static final String SERIALIZATION_ERROR = "serializationError";

public static final Schema PROCESSING_LOG_SCHEMA = SchemaBuilder.struct()
.name(NAMESPACE + "ProcessingLogRecord")
.field(TYPE, Schema.OPTIONAL_INT32_SCHEMA)
.field(DESERIALIZATION_ERROR, DESERIALIZATION_ERROR_SCHEMA)
.field(RECORD_PROCESSING_ERROR, RECORD_PROCESSING_ERROR_SCHEMA)
.field(PRODUCTION_ERROR, PRODUCTION_ERROR_SCHEMA)
.field(SERIALIZATION_ERROR, SERIALIZATION_ERROR_SCHEMA)
.optional()
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ public void shouldBuildCorrectStreamCreateDDL() {
+ "type INT, "
+ "deserializationError STRUCT<errorMessage VARCHAR, recordB64 VARCHAR, cause ARRAY<VARCHAR>, `topic` VARCHAR>, "
+ "recordProcessingError STRUCT<errorMessage VARCHAR, record VARCHAR, cause ARRAY<VARCHAR>>, "
+ "productionError STRUCT<errorMessage VARCHAR>"
+ "productionError STRUCT<errorMessage VARCHAR>, "
+ "serializationError STRUCT<errorMessage VARCHAR, record VARCHAR, cause ARRAY<VARCHAR>, `topic` VARCHAR>"
+ ">"
+ ") WITH(KAFKA_TOPIC='processing_log_topic', VALUE_FORMAT='JSON');"));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.logging.processing;

import static java.util.Objects.requireNonNull;

import java.util.Map;
import java.util.Optional;
import org.apache.kafka.common.serialization.Serializer;

public final class LoggingSerializer<T> implements Serializer<T> {

private final Serializer<T> delegate;
private final ProcessingLogger processingLogger;

public LoggingSerializer(
final Serializer<T> delegate,
final ProcessingLogger processingLogger
) {
this.delegate = requireNonNull(delegate, "delegate");
this.processingLogger = requireNonNull(processingLogger, "processingLogger");
}

@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
delegate.configure(configs, isKey);
}

@Override
public byte[] serialize(final String topic, final T data) {
try {
return delegate.serialize(topic, data);
} catch (final RuntimeException e) {
processingLogger.error(new SerializationError<>(e, Optional.of(data), topic));
throw e;
}
}

@Override
public void close() {
delegate.close();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.logging.processing;

import static java.util.Objects.requireNonNull;

import io.confluent.ksql.logging.processing.ProcessingLogMessageSchema.MessageType;
import io.confluent.ksql.util.ErrorMessageUtil;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;

public class SerializationError<T> implements ProcessingLogger.ErrorMessage {

private final Throwable exception;
private final Optional<T> record;
private final String topic;

public SerializationError(
final Throwable exception,
final Optional<T> record,
final String topic
) {
this.exception = requireNonNull(exception, "exception");
this.record = requireNonNull(record, "record");
this.topic = requireNonNull(topic, "topic");
}

@Override
public SchemaAndValue get(final ProcessingLogConfig config) {
final Struct struct = new Struct(ProcessingLogMessageSchema.PROCESSING_LOG_SCHEMA)
.put(ProcessingLogMessageSchema.TYPE, MessageType.SERIALIZATION_ERROR.getTypeId())
.put(ProcessingLogMessageSchema.SERIALIZATION_ERROR, serializationError(config));

return new SchemaAndValue(ProcessingLogMessageSchema.PROCESSING_LOG_SCHEMA, struct);
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final SerializationError<?> that = (SerializationError) o;
return Objects.equals(exception, that.exception)
&& Objects.equals(record, that.record)
&& Objects.equals(topic, that.topic);
}

@Override
public int hashCode() {
return Objects.hash(exception, record, topic);
}

private Struct serializationError(final ProcessingLogConfig config) {
final Struct serializationError = new Struct(MessageType.SERIALIZATION_ERROR.getSchema())
.put(
ProcessingLogMessageSchema.SERIALIZATION_ERROR_FIELD_MESSAGE,
exception.getMessage())
.put(
ProcessingLogMessageSchema.SERIALIZATION_ERROR_FIELD_CAUSE,
getCause()
)
.put(
ProcessingLogMessageSchema.SERIALIZATION_ERROR_FIELD_TOPIC,
topic
);

if (config.getBoolean(ProcessingLogConfig.INCLUDE_ROWS)) {
serializationError.put(
ProcessingLogMessageSchema.SERIALIZATION_ERROR_FIELD_RECORD,
record.map(Object::toString).orElse(null)
);
}

return serializationError;
}

private List<String> getCause() {
final List<String> cause = ErrorMessageUtil.getErrorMessages(exception);
cause.remove(0);
return cause;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.SchemaNotSupportedException;
import io.confluent.ksql.logging.processing.LoggingDeserializer;
import io.confluent.ksql.logging.processing.LoggingSerializer;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.schema.ksql.PersistenceSchema;
Expand All @@ -45,6 +46,7 @@

public final class GenericKeySerDe implements KeySerdeFactory {

static final String SERIALIZER_LOGGER_NAME = "serializer";
static final String DESERIALIZER_LOGGER_NAME = "deserializer";

private final SerdeFactories serdeFactories;
Expand Down Expand Up @@ -130,16 +132,18 @@ private <T> Serde<Struct> createInner(
final Serde<T> serde = serdeFactories
.create(format, schema, ksqlConfig, schemaRegistryClientFactory, targetType);

final ProcessingLogger processingLogger = processingLogContext.getLoggerFactory()
final ProcessingLogger serializerProcessingLogger = processingLogContext.getLoggerFactory()
.getLogger(join(loggerNamePrefix, SERIALIZER_LOGGER_NAME));
final ProcessingLogger deserializerProcessingLogger = processingLogContext.getLoggerFactory()
.getLogger(join(loggerNamePrefix, DESERIALIZER_LOGGER_NAME));

final Serde<Struct> inner = schema.isUnwrapped()
? unwrapped(serde, schema)
: wrapped(serde, targetType);

final Serde<Struct> result = Serdes.serdeFrom(
inner.serializer(),
new LoggingDeserializer<>(inner.deserializer(), processingLogger)
new LoggingSerializer<>(inner.serializer(), serializerProcessingLogger),
new LoggingDeserializer<>(inner.deserializer(), deserializerProcessingLogger)
);

result.configure(Collections.emptyMap(), true);
Expand Down Expand Up @@ -172,7 +176,7 @@ private static <T> Serde<Struct> wrapped(
final Class<T> type
) {
if (type != Struct.class) {
throw new IllegalArgumentException("Unwrapped must be of type Struct");
throw new IllegalArgumentException("Wrapped must be of type Struct");
}

return (Serde) innerSerde;
Expand Down
Loading

0 comments on commit 8ab98a5

Please sign in to comment.