Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Error Handling to Kafka IO #29546

Merged
merged 105 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
105 commits
Select commit Hold shift + click to select a range
c22b8c1
Update 2.50 release notes to include new Kafka topicPattern feature
johnjcasey Jul 5, 2023
f7cf5de
Merge remote-tracking branch 'origin/master'
johnjcasey Jul 24, 2023
ab68ecb
Merge remote-tracking branch 'origin/master'
johnjcasey Aug 9, 2023
40ad1d5
Merge remote-tracking branch 'origin/master'
johnjcasey Aug 31, 2023
6c9c28d
Create groovy class for io performance tests
johnjcasey Aug 31, 2023
520c9d1
delete unnecessary class
johnjcasey Aug 31, 2023
062de23
fix env call
johnjcasey Aug 31, 2023
01fc25b
Merge pull request #181 from johnjcasey/feature/automate-performance-…
johnjcasey Aug 31, 2023
9c9f86b
fix call to gradle
johnjcasey Aug 31, 2023
92306fa
Merge pull request #182 from johnjcasey/feature/automate-performance-…
johnjcasey Aug 31, 2023
925ce55
run on hosted runner for testing
johnjcasey Aug 31, 2023
2dcfb70
Merge pull request #183 from johnjcasey/feature/automate-performance-…
johnjcasey Aug 31, 2023
117ef8b
add additional checkout
johnjcasey Aug 31, 2023
1f73cda
Merge pull request #184 from johnjcasey/feature/automate-performance-…
johnjcasey Aug 31, 2023
cb6e01b
add destination for triggered tests
johnjcasey Aug 31, 2023
a9e86aa
Merge pull request #185 from johnjcasey/feature/automate-performance-…
johnjcasey Aug 31, 2023
8ea6c51
move env variables to correct location
johnjcasey Sep 1, 2023
d8822d7
Merge pull request #186 from johnjcasey/feature/automate-performance-…
johnjcasey Sep 1, 2023
320a4cc
try uploading against separate dataset
johnjcasey Sep 1, 2023
e89b59e
Merge pull request #187 from johnjcasey/feature/automate-performance-…
johnjcasey Sep 1, 2023
1cd4e55
try without a user
johnjcasey Sep 1, 2023
4473f17
Merge pull request #188 from johnjcasey/feature/automate-performance-…
johnjcasey Sep 1, 2023
4fc5b8e
update branch checkout, try to view the failure log
johnjcasey Sep 5, 2023
706650d
Merge pull request #189 from johnjcasey/feature/automate-performance-…
johnjcasey Sep 5, 2023
59069f2
run on failure
johnjcasey Sep 5, 2023
7f79b62
Merge pull request #190 from johnjcasey/feature/automate-performance-…
johnjcasey Sep 5, 2023
6f51976
update to use correct BigQuery instance
johnjcasey Sep 5, 2023
e95d920
Merge pull request #191 from johnjcasey/feature/automate-performance-…
johnjcasey Sep 5, 2023
df716cb
convert to matrix
johnjcasey Sep 5, 2023
4d8eded
Merge pull request #192 from johnjcasey/feature/automate-performance-…
johnjcasey Sep 5, 2023
4bf0826
add result reporting
johnjcasey Sep 5, 2023
403f054
Merge pull request #193 from johnjcasey/feature/automate-performance-…
johnjcasey Sep 5, 2023
d40d04b
add failure clause
johnjcasey Sep 5, 2023
aca4b2e
Merge pull request #194 from johnjcasey/feature/automate-performance-…
johnjcasey Sep 5, 2023
2739e92
remove failure clause, update to run on self-hosted
johnjcasey Sep 5, 2023
bd6efeb
address comments, clean up build
johnjcasey Sep 6, 2023
226a655
clarify branching
johnjcasey Sep 6, 2023
9c7286b
Merge pull request #195 from johnjcasey/feature/automate-performance-…
johnjcasey Sep 6, 2023
5b1b2c2
Merge branch 'apache:master' into master
johnjcasey Sep 15, 2023
f8c2b06
Merge remote-tracking branch 'origin/master'
johnjcasey Oct 24, 2023
d058ac9
Add error handling base implementation & test DLQ enabled class
johnjcasey Oct 27, 2023
8c9dd94
Add test cases
johnjcasey Oct 27, 2023
4d23fe8
apply spotless
johnjcasey Oct 27, 2023
31432b7
Fix Checkstyles
johnjcasey Oct 30, 2023
a9dae91
Fix Checkstyles
johnjcasey Oct 30, 2023
64dde49
make DLH serializable
johnjcasey Oct 30, 2023
c82185d
rename dead letter to bad record
johnjcasey Oct 30, 2023
78d45a8
make DLH serializable
johnjcasey Oct 31, 2023
6c36549
Change bad record router name, and use multioutputreceiver instead of…
johnjcasey Oct 31, 2023
44036be
Refactor BadRecord to be nested
johnjcasey Oct 31, 2023
06ca166
clean up checkstyle
johnjcasey Oct 31, 2023
a26d605
Update error handler test
johnjcasey Oct 31, 2023
5a6e8d0
Add metrics for counting error records, and for measuring feature usage
johnjcasey Nov 1, 2023
70c8991
apply spotless
johnjcasey Nov 1, 2023
36baf98
fix checkstyle
johnjcasey Nov 1, 2023
2119c76
make metric reporting static
johnjcasey Nov 1, 2023
798cfc3
spotless
johnjcasey Nov 1, 2023
c03bb2b
Rework annotations to be an explicit label on a PTransform, instead o…
johnjcasey Nov 2, 2023
bf99363
fix checkstyle
johnjcasey Nov 2, 2023
881f9d8
Address comments
johnjcasey Nov 8, 2023
f8c6d8c
Address comments
johnjcasey Nov 14, 2023
a1b112c
Fix test cases, spotless
johnjcasey Nov 15, 2023
ad1684a
remove flatting without error collections
johnjcasey Nov 15, 2023
074faf2
fix nullness
johnjcasey Nov 16, 2023
17bf295
spotless + encoding issues
johnjcasey Nov 16, 2023
e2ec57f
spotless
johnjcasey Nov 16, 2023
8b3f052
throw error when error handler isn't used
johnjcasey Nov 21, 2023
525d912
add concrete bad record error handler class
johnjcasey Nov 21, 2023
9b4a348
spotless, fix test category
johnjcasey Nov 22, 2023
d6f4097
fix checkstyle
johnjcasey Nov 22, 2023
a067238
clean up comments
johnjcasey Nov 27, 2023
408bc26
fix test case
johnjcasey Nov 27, 2023
9766ea0
initial wiring of error handler into KafkaIO Read
johnjcasey Nov 27, 2023
0246a80
Merge remote-tracking branch 'upstream/master' into feature/dead-lett…
johnjcasey Nov 27, 2023
ec50bff
remove "failing transform" field on bad record, add note to CHANGES.md
johnjcasey Nov 27, 2023
b092709
fix failing test cases
johnjcasey Nov 27, 2023
3f1e97c
fix failing test cases
johnjcasey Nov 27, 2023
4356f27
apply spotless
johnjcasey Nov 28, 2023
8be71b5
Merge branch 'feature/dead-letter-queue-core' into feature/kafka-read…
johnjcasey Nov 28, 2023
c106698
Add tests
johnjcasey Nov 28, 2023
2724a66
Add tests
johnjcasey Nov 28, 2023
686a5e7
fix test case
johnjcasey Nov 28, 2023
7b11704
add documentation
johnjcasey Nov 28, 2023
4957b8a
wire error handler into kafka write
johnjcasey Nov 29, 2023
7e3135e
fix failing test case
johnjcasey Nov 29, 2023
4759f72
Add tests for writing to kafka with exception handling
johnjcasey Nov 30, 2023
d7b48f5
fix sdf testing
johnjcasey Nov 30, 2023
e978f0c
fix sdf testing
johnjcasey Nov 30, 2023
2d76070
spotless
johnjcasey Nov 30, 2023
a9eb5af
deflake tests
johnjcasey Nov 30, 2023
edd725d
add error handling to kafka streaming example
johnjcasey Dec 1, 2023
5067bd1
Merge remote-tracking branch 'upstream/master' into feature/dead-lett…
johnjcasey Dec 4, 2023
864c429
apply final comments
johnjcasey Dec 5, 2023
8307b63
apply final comments
johnjcasey Dec 5, 2023
fad9d56
apply final comments
johnjcasey Dec 5, 2023
8b02c86
Merge branch 'feature/dead-letter-queue-core' into feature/kafka-read…
johnjcasey Dec 5, 2023
ad37dda
add line to CHANGES.md
johnjcasey Dec 5, 2023
70437c1
fix spotless
johnjcasey Dec 5, 2023
6a2801f
fix checkstyle
johnjcasey Dec 5, 2023
0c1ab35
make sink transform static for serialization
johnjcasey Dec 5, 2023
a0781c4
Merge remote-tracking branch 'upstream/master' into feature/kafka-rea…
johnjcasey Dec 5, 2023
c589fd2
spotless
johnjcasey Dec 5, 2023
4b1a671
fix typo
johnjcasey Dec 6, 2023
edb419b
fix typo
johnjcasey Dec 6, 2023
58f6ca2
fix spotbugs
johnjcasey Dec 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
* TextIO now supports skipping multiple header lines (Java) ([#17990](https://github.com/apache/beam/issues/17990)).
* Python GCSIO is now implemented with GCP GCS Client instead of apitools ([#25676](https://github.com/apache/beam/issues/25676))
* Adding support for LowCardinality DataType in ClickHouse (Java) ([#29533](https://github.com/apache/beam/pull/29533)).
* Added support for handling bad records to KafkaIO (Java) ([#29546](https://github.com/apache/beam/pull/29546))

## New Features / Improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class KafkaTestUtilities {
'"keySizeBytes": "10",' +
'"valueSizeBytes": "90"' +
'}',
"--readTimeout=120",
"--readTimeout=60",
"--kafkaTopic=beam",
"--withTestcontainers=true",
"--kafkaContainerVersion=5.5.2",
Expand All @@ -56,6 +56,7 @@ class KafkaTestUtilities {
excludeTestsMatching "*SDFResumesCorrectly" //Kafka SDF does not work for kafka versions <2.0.1
excludeTestsMatching "*StopReadingFunction" //Kafka SDF does not work for kafka versions <2.0.1
excludeTestsMatching "*WatermarkUpdateWithSparseMessages" //Kafka SDF does not work for kafka versions <2.0.1
excludeTestsMatching "*KafkaIOSDFReadWithErrorHandler"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,11 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler.BadRecordErrorHandler;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
Expand All @@ -60,6 +63,8 @@
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
Expand Down Expand Up @@ -97,7 +102,7 @@ public interface KafkaStreamingOptions extends PipelineOptions {
* to use your own Kafka server.
*/
@Description("Kafka server host")
@Default.String("kafka_server:9092")
@Default.String("localhost:9092")
String getKafkaHost();

void setKafkaHost(String value);
Expand Down Expand Up @@ -208,15 +213,22 @@ public void run() {
// Start reading form Kafka with the latest offset
consumerConfig.put("auto.offset.reset", "latest");

PCollection<KV<String, Integer>> pCollection =
pipeline.apply(
KafkaIO.<String, Integer>read()
.withBootstrapServers(options.getKafkaHost())
.withTopic(TOPIC_NAME)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(IntegerDeserializer.class)
.withConsumerConfigUpdates(consumerConfig)
.withoutMetadata());
// Register an error handler for any deserialization errors.
// Errors are simulated with an intentionally failing deserializer
PCollection<KV<String, Integer>> pCollection;
try (BadRecordErrorHandler<PCollection<BadRecord>> errorHandler =
pipeline.registerBadRecordErrorHandler(new LogErrors())) {
pCollection =
pipeline.apply(
KafkaIO.<String, Integer>read()
.withBootstrapServers(options.getKafkaHost())
.withTopic(TOPIC_NAME)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(IntermittentlyFailingIntegerDeserializer.class)
.withConsumerConfigUpdates(consumerConfig)
.withBadRecordErrorHandler(errorHandler)
.withoutMetadata());
}

pCollection
// Apply a window and a trigger ourput repeatedly.
Expand Down Expand Up @@ -317,4 +329,39 @@ public void processElement(ProcessContext c, IntervalWindow w) throws Exception
c.output(c.element());
}
}

// Simple PTransform to log Error information
static class LogErrors extends PTransform<PCollection<BadRecord>, PCollection<BadRecord>> {

@Override
public PCollection<BadRecord> expand(PCollection<BadRecord> input) {
return input.apply("Log Errors", ParDo.of(new LogErrorFn()));
}

static class LogErrorFn extends DoFn<BadRecord, BadRecord> {
@ProcessElement
public void processElement(@Element BadRecord record, OutputReceiver<BadRecord> receiver) {
System.out.println(record);
receiver.output(record);
}
}
}

// Intentionally failing deserializer to simulate bad data from Kafka
public static class IntermittentlyFailingIntegerDeserializer implements Deserializer<Integer> {

public static final IntegerDeserializer INTEGER_DESERIALIZER = new IntegerDeserializer();
public int deserializeCount = 0;

public IntermittentlyFailingIntegerDeserializer() {}

@Override
public Integer deserialize(String topic, byte[] data) {
deserializeCount++;
if (deserializeCount % 10 == 0) {
throw new SerializationException("Expected Serialization Exception");
}
return INTEGER_DESERIALIZER.deserialize(topic, data);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.beam.sdk.transforms.errorhandling;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -49,22 +52,24 @@
* <p>Simple usage with one DLQ
* <pre>{@code
* PCollection<?> records = ...;
* try (ErrorHandler<E,T> errorHandler = pipeline.registerErrorHandler(SomeSink.write())) {
* PCollection<?> results = records.apply(SomeIO.write().withDeadLetterQueue(errorHandler));
* try (BadRecordErrorHandler<T> errorHandler = pipeline.registerBadRecordErrorHandler(SomeSink.write())) {
* PCollection<?> results = records.apply(SomeIO.write().withErrorHandler(errorHandler));
* }
* results.apply(SomeOtherTransform);
* }</pre>
* Usage with multiple DLQ stages
* <pre>{@code
* PCollection<?> records = ...;
* try (ErrorHandler<E,T> errorHandler = pipeline.registerErrorHandler(SomeSink.write())) {
* PCollection<?> results = records.apply(SomeIO.write().withDeadLetterQueue(errorHandler))
* .apply(OtherTransform.builder().withDeadLetterQueue(errorHandler));
* try (BadRecordErrorHandler<T> errorHandler = pipeline.registerBadRecordErrorHandler(SomeSink.write())) {
* PCollection<?> results = records.apply(SomeIO.write().withErrorHandler(errorHandler))
* .apply(OtherTransform.builder().withErrorHandler(errorHandler));
* }
* results.apply(SomeOtherTransform);
* }</pre>
* This is marked as serializable despite never being needed on the runner, to enable it to be a
* parameter of an Autovalue configured PTransform.
*/
public interface ErrorHandler<ErrorT, OutputT extends POutput> extends AutoCloseable {
public interface ErrorHandler<ErrorT, OutputT extends POutput> extends AutoCloseable, Serializable {

void addErrorCollection(PCollection<ErrorT> errorCollection);

Expand All @@ -79,13 +84,16 @@ class PTransformErrorHandler<ErrorT, OutputT extends POutput>
private static final Logger LOG = LoggerFactory.getLogger(PTransformErrorHandler.class);
private final PTransform<PCollection<ErrorT>, OutputT> sinkTransform;

private final Pipeline pipeline;
// transient as Pipelines are not serializable
private final transient Pipeline pipeline;

private final Coder<ErrorT> coder;

private final List<PCollection<ErrorT>> errorCollections = new ArrayList<>();
// transient as PCollections are not serializable
private transient List<PCollection<ErrorT>> errorCollections = new ArrayList<>();

private @Nullable OutputT sinkOutput = null;
// transient as PCollections are not serializable
private transient @Nullable OutputT sinkOutput = null;

private boolean closed = false;

Expand All @@ -103,6 +111,12 @@ public PTransformErrorHandler(
this.coder = coder;
}

private void readObject(ObjectInputStream aInputStream)
throws ClassNotFoundException, IOException {
aInputStream.defaultReadObject();
errorCollections = new ArrayList<>();
}

@Override
public void addErrorCollection(PCollection<ErrorT> errorCollection) {
errorCollections.add(errorCollection);
Expand Down
1 change: 1 addition & 0 deletions sdks/java/io/kafka/kafka-01103/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
project.ext {
delimited="0.11.0.3"
undelimited="01103"
sdfCompatible=false
}

apply from: "../kafka-integration-test.gradle"
3 changes: 2 additions & 1 deletion sdks/java/io/kafka/kafka-100/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
project.ext {
delimited="1.0.0"
undelimited="100"
sdfCompatible=false
}

apply from: "../kafka-integration-test.gradle"
apply from: "../kafka-integration-test.gradle"
1 change: 1 addition & 0 deletions sdks/java/io/kafka/kafka-111/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
project.ext {
delimited="1.1.1"
undelimited="111"
sdfCompatible=false
}

apply from: "../kafka-integration-test.gradle"
1 change: 1 addition & 0 deletions sdks/java/io/kafka/kafka-201/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
project.ext {
delimited="2.0.1"
undelimited="201"
sdfCompatible=true
}

apply from: "../kafka-integration-test.gradle"
1 change: 1 addition & 0 deletions sdks/java/io/kafka/kafka-211/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
project.ext {
delimited="2.1.1"
undelimited="211"
sdfCompatible=true
}

apply from: "../kafka-integration-test.gradle"
1 change: 1 addition & 0 deletions sdks/java/io/kafka/kafka-222/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
project.ext {
delimited="2.2.2"
undelimited="222"
sdfCompatible=true
}

apply from: "../kafka-integration-test.gradle"
1 change: 1 addition & 0 deletions sdks/java/io/kafka/kafka-231/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
project.ext {
delimited="2.3.1"
undelimited="231"
sdfCompatible=true
}

apply from: "../kafka-integration-test.gradle"
1 change: 1 addition & 0 deletions sdks/java/io/kafka/kafka-241/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
project.ext {
delimited="2.4.1"
undelimited="241"
sdfCompatible=true
}

apply from: "../kafka-integration-test.gradle"
1 change: 1 addition & 0 deletions sdks/java/io/kafka/kafka-251/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
project.ext {
delimited="2.5.1"
undelimited="251"
sdfCompatible=true
}

apply from: "../kafka-integration-test.gradle"
2 changes: 1 addition & 1 deletion sdks/java/io/kafka/kafka-integration-test.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ dependencies {

configurations.create("kafkaVersion$undelimited")

tasks.register("kafkaVersion${undelimited}BatchIT",KafkaTestUtilities.KafkaBatchIT, project.ext.delimited, project.ext.undelimited, false, configurations, project)
tasks.register("kafkaVersion${undelimited}BatchIT",KafkaTestUtilities.KafkaBatchIT, project.ext.delimited, project.ext.undelimited, project.ext.sdfCompatible, configurations, project)
Loading