Skip to content

Commit

Permalink
Use BlockingQueue to write.
Browse files Browse the repository at this point in the history
  • Loading branch information
labianchin committed Jan 10, 2020
1 parent c98adf0 commit 32f7c7c
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 7 deletions.
64 changes: 64 additions & 0 deletions dbeam-core/src/main/java/com/spotify/dbeam/avro/AvroWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*-
* -\-\-
* DBeam Core
* --
* Copyright (C) 2016 - 2019 Spotify AB
* --
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* -/-/-
*/

package com.spotify.dbeam.avro;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;

import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericRecord;

public class AvroWriter implements Runnable {

private final DataFileWriter<GenericRecord> dataFileWriter;
private final JdbcAvroMetering metering;
private final BlockingQueue<ByteBuffer> queue;

public AvroWriter(
DataFileWriter<GenericRecord> dataFileWriter,
JdbcAvroMetering metering,
BlockingQueue<ByteBuffer> queue) {
this.dataFileWriter = dataFileWriter;
this.metering = metering;
this.queue = queue;
}

@Override
public void run() {
try {
while (true) {
final ByteBuffer datum = queue.take();
if (datum.capacity() == 0) {
this.dataFileWriter.sync();
return;
} else {
this.dataFileWriter.appendEncoded(datum);
}
}
} catch (InterruptedException ex) {
System.out.println("CONSUMER INTERRUPTED");
} catch (IOException e) {
e.printStackTrace();
}

}
}
31 changes: 25 additions & 6 deletions dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,20 @@

import com.spotify.dbeam.args.JdbcAvroArgs;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.sql.SQLException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;

import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
Expand Down Expand Up @@ -139,6 +147,7 @@ private static class JdbcAvroWriter extends FileBasedSink.Writer<Void, String> {
private Connection connection;
private JdbcAvroMetering metering;
private CountingOutputStream countingOutputStream;
private BlockingQueue<ByteBuffer> queue;

JdbcAvroWriter(FileBasedSink.WriteOperation<Void, String> writeOperation,
DynamicAvroDestinations<?, Void, String> dynamicDestinations,
Expand All @@ -147,6 +156,7 @@ private static class JdbcAvroWriter extends FileBasedSink.Writer<Void, String> {
this.dynamicDestinations = dynamicDestinations;
this.jdbcAvroArgs = jdbcAvroArgs;
this.metering = JdbcAvroMetering.create();
this.queue = new LinkedBlockingDeque<>(jdbcAvroArgs.fetchSize() * 4);
}

public Void getDestination() {
Expand Down Expand Up @@ -204,19 +214,28 @@ public void write(String query) throws Exception {
checkArgument(dataFileWriter != null,
"Avro DataFileWriter was not properly created");
logger.info("jdbcavroio : Starting write...");
final ExecutorService executorService = Executors.newSingleThreadExecutor();
try (ResultSet resultSet = executeQuery(query)) {
metering.startWriteMeter();
final JdbcAvroRecordConverter converter = JdbcAvroRecordConverter.create(resultSet);
while (resultSet.next()) {
dataFileWriter.appendEncoded(converter.convertResultSetIntoAvroBytes());
this.metering.incrementRecordCount();
}
final Future<?> future = executorService.submit(new AvroWriter(dataFileWriter, metering, queue));
final long startMs = metering.startWriteMeter();
convertAllResultSet(resultSet, JdbcAvroRecordConverter.create(resultSet));
queue.put(ByteBuffer.allocate(0)); // write final record, so that consumer stops
future.get();
executorService.shutdown();
this.dataFileWriter.flush();
this.metering.exposeWriteElapsed();
this.metering.exposeWrittenBytes(this.countingOutputStream.getCount());
}
}

private void convertAllResultSet(ResultSet resultSet, JdbcAvroRecordConverter converter)
throws SQLException, InterruptedException, IOException {
while (resultSet.next()) {
queue.put(converter.convertResultSetIntoAvroBytes());
this.metering.incrementRecordCount();
}
}

@Override
protected void finishWrite() throws Exception {
logger.info("jdbcavroio : Closing connection, flushing writer...");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public static JdbcAvroRecordConverter create(ResultSet resultSet)
resultSet.getMetaData().getColumnCount(),
resultSet);
}

@SuppressWarnings("unchecked")
static JdbcAvroRecord.SqlFunction<ResultSet, Object>[] computeAllMappings(ResultSet resultSet)
throws SQLException {
Expand Down

0 comments on commit 32f7c7c

Please sign in to comment.