-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathSinkRecordsProcessor.java
261 lines (238 loc) · 9.73 KB
/
SinkRecordsProcessor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
package com.ontotext.kafka.processor;
import com.ontotext.kafka.GraphDBSinkConfig;
import com.ontotext.kafka.error.LogErrorHandler;
import com.ontotext.kafka.processor.record.handler.RecordHandler;
import com.ontotext.kafka.processor.retry.GraphDBRetryWithToleranceOperator;
import com.ontotext.kafka.rdf.repository.RepositoryManager;
import com.ontotext.kafka.util.ValueUtil;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.runtime.errors.ProcessingContext;
import org.apache.kafka.connect.runtime.errors.Stage;
import org.apache.kafka.connect.sink.SinkRecord;
import org.eclipse.rdf4j.repository.RepositoryConnection;
import org.eclipse.rdf4j.repository.RepositoryException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* A processor which batches sink records and flushes the updates to a given
* GraphDB {@link org.eclipse.rdf4j.repository.http.HTTPRepository}.
* <p>
* Batches that do not meet the {@link com.ontotext.kafka.GraphDBSinkConfig#BATCH_SIZE} are flushed
* after passing {@link com.ontotext.kafka.GraphDBSinkConfig#RECORD_POLL_TIMEOUT} threshold.
*
* @author Tomas Kovachev tomas.kovachev@ontotext.com
*/
public final class SinkRecordsProcessor implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(SinkRecordsProcessor.class);
private final UUID id = UUID.randomUUID();
private final LinkedBlockingQueue<Collection<SinkRecord>> sinkRecords;
private final Queue<SinkRecord> recordsBatch;
private final RepositoryManager repositoryManager;
private final int batchSize;
private final long timeoutCommitMs;
private final LogErrorHandler errorHandler;
private final RecordHandler recordHandler;
private final GraphDBSinkConfig.TransactionType transactionType;
private final GraphDBSinkConfig config;
private final AtomicBoolean backOff = new AtomicBoolean(false);
private GraphDBRetryWithToleranceOperator<SinkRecord> singleRecordRetryOperator;
private GraphDBRetryWithToleranceOperator<Queue<SinkRecord>> batchRetryOperator;
public SinkRecordsProcessor(GraphDBSinkConfig config) {
this(config, new LinkedBlockingQueue<>(), new RepositoryManager(config));
}
public SinkRecordsProcessor(GraphDBSinkConfig config, LinkedBlockingQueue<Collection<SinkRecord>> sinkRecords, RepositoryManager repositoryManager) {
this.config = config;
this.sinkRecords = sinkRecords;
this.recordsBatch = new LinkedList<>();
this.repositoryManager = repositoryManager;
this.batchSize = config.getBatchSize();
this.timeoutCommitMs = config.getProcessorRecordPollTimeoutMs();
this.errorHandler = new LogErrorHandler(config);
this.transactionType = config.getTransactionType();
this.recordHandler = RecordHandler.getRecordHandler(config);
this.initOperators(config);
// repositoryUrl is used for logging purposes
MDC.put("RepositoryURL", repositoryManager.getRepositoryURL());
MDC.put("Connector", config.getConnectorName());
}
void initOperators(GraphDBSinkConfig config) {
this.batchRetryOperator = new GraphDBRetryWithToleranceOperator<Queue<SinkRecord>>(config);
this.singleRecordRetryOperator = new GraphDBRetryWithToleranceOperator<SinkRecord>(config);
}
@Override
public void run() {
while (shouldRun()) {
try {
if (backOff.getAndSet(false)) {
LOG.info("Retrying flush");
flushUpdates(this.recordsBatch);
LOG.info("Flush (on retry) successful");
}
Collection<SinkRecord> messages = pollForMessages();
if (messages != null) {
consumeRecords(messages);
} else {
LOG.trace("Did not receive any records (waited {} {} ) for repository {}. Flushing all records in batch.", timeoutCommitMs,
TimeUnit.MILLISECONDS, repositoryManager);
flushUpdates(this.recordsBatch);
}
} catch (RetriableException e) {
long backoffTimeoutMs = config.getBackOffTimeoutMs();
backOff.set(true);
LOG.warn("Caught a retriable exception while flushing the current batch. " +
"Will sleep for {}ms and will try to flush the records again", backoffTimeoutMs);
try {
Thread.sleep(backoffTimeoutMs);
} catch (InterruptedException iex) {
LOG.info("Thread was interrupted during backoff. Shutting down");
break;
}
} catch (Exception e) {
if (e instanceof InterruptedException) {
LOG.info("Thread was interrupted. Shutting down processor");
} else {
LOG.error("Caught an exception, cannot recover. Shutting down", e);
}
break;
}
}
Thread.interrupted();
shutdown();
}
boolean shouldRun() {
return !Thread.currentThread().isInterrupted();
}
Collection<SinkRecord> pollForMessages() throws InterruptedException {
return sinkRecords.poll(timeoutCommitMs, TimeUnit.MILLISECONDS);
}
void shutdown() {
// commit any records left before shutdown. Bypass batch checking, just add all records and flush downstream
LOG.info("Commiting any records left before shutdown");
Collection<SinkRecord> records;
while ((records = sinkRecords.poll()) != null) {
recordsBatch.addAll(records);
}
try {
flushUpdates(this.recordsBatch);
} catch (Exception e) {
// We did ou best. Just log the exception and shut down
LOG.warn("While shutting down, failed to flush updates due to exception", e);
}
repositoryManager.shutDownRepository();
}
void consumeRecords(Collection<SinkRecord> messages) {
for (SinkRecord message : messages) {
recordsBatch.add(message);
if (batchSize <= recordsBatch.size()) {
flushUpdates(recordsBatch);
}
}
}
/**
* Flush all records in batch downstream.
*
* @param recordsBatch The batch of records to flush downstream
*
* @throws RetriableException if the repository connection has failed, either during initialization, or commit
* @throws ConnectException if the flush has failed, but there is no tolerance for error
*/
void flushUpdates(Queue<SinkRecord> recordsBatch) {
if (!recordsBatch.isEmpty()) {
ProcessingContext<Queue<SinkRecord>> ctx = new ProcessingContext<>(recordsBatch);
batchRetryOperator.execute(ctx, () -> doFlush(recordsBatch), Stage.KAFKA_CONSUME, getClass());
if (ctx.failed()) {
LOG.error("Failed to flush batch updates. Underlying exception - {}", ctx.error().getMessage());
if (!batchRetryOperator.withinToleranceLimits()) {
throw new ConnectException("Error tolerance exceeded.");
}
LOG.warn("Errors are tolerated (tolerance = {}).", config.getTolerance());
throw new RetriableException("Failed to flush updates", ctx.error());
}
}
}
/**
* Perform flush with a retry mechanism, for both the entire batch, and every single record
*
* @param recordsBatch The batch of records to flush
*
* @throws RetriableException if the repository connection has failed, either during initialization, or commit
* @throws ConnectException if the flush has failed, but there is no tolerance for error
*/
Void doFlush(Queue<SinkRecord> recordsBatch) {
// Keep a copy of all consumed records, so that records are not lost if the transaction fails to commit
Collection<SinkRecord> consumedRecords = new ArrayList<>();
long start = System.currentTimeMillis();
try (RepositoryConnection connection = repositoryManager.newConnection()) {
connection.begin();
int recordsInCurrentBatch = recordsBatch.size();
LOG.trace("Transaction started, batch size: {} , records in current batch: {}", batchSize, recordsInCurrentBatch);
while (recordsBatch.peek() != null) {
SinkRecord record = recordsBatch.peek();
ProcessingContext<SinkRecord> ctx = new ProcessingContext<>(record);
singleRecordRetryOperator.execute(ctx, () -> handleRecord(record, connection), Stage.KAFKA_CONSUME, getClass());
if (ctx.failed()) {
LOG.warn("Failed to commit record. Will handle failure, and remove from the batch");
errorHandler.handleFailingRecord(record, ctx.error());
if (!singleRecordRetryOperator.withinToleranceLimits()) {
throw new ConnectException("Error tolerance exceeded.");
}
recordsBatch.poll();
} else {
consumedRecords.add(recordsBatch.poll());
}
}
try {
connection.commit();
} catch (RepositoryException e) {
LOG.error(
"Failed to commit transaction due to exception. Restoring consumed records so that they can be flushed later, and rolling back the transaction",
e);
recordsBatch.addAll(consumedRecords);
if (connection.isActive()) {
connection.rollback();
}
throw e;
}
LOG.trace("Transaction commited, Batch size: {} , Records in current batch: {}", batchSize, recordsInCurrentBatch);
if (LOG.isTraceEnabled()) {
long finish = System.currentTimeMillis();
LOG.trace("Finished batch processing for {} ms", finish - start);
}
return null;
} catch (RepositoryException e) {
throw new RetriableException(e);
}
}
Void handleRecord(SinkRecord record, RepositoryConnection connection) {
long start = System.currentTimeMillis();
try {
LOG.trace("Executing {} operation......", transactionType.toString().toLowerCase());
recordHandler.handle(record, connection, config);
return null;
} catch (IOException e) {
throw new RetriableException(e.getMessage(), e);
} finally {
if (LOG.isTraceEnabled()) {
LOG.trace("Record info: {}", ValueUtil.recordInfo(record));
long finish = System.currentTimeMillis();
LOG.trace("Converted the record and added it to the RDF4J connection for {} ms", finish - start);
}
}
}
public UUID getId() {
return id;
}
public LinkedBlockingQueue<Collection<SinkRecord>> getQueue() {
return sinkRecords;
}
public boolean shouldBackOff() {
return backOff.get();
}
}