From 1d24259e527d11d67ce0a06db0c2c8025da0ead9 Mon Sep 17 00:00:00 2001 From: Luis Bianchin Date: Mon, 15 Apr 2019 16:01:07 +0200 Subject: [PATCH] Instrument consumer --- .../src/main/java/com/spotify/dbeam/avro/AvroWriter.java | 6 ++++++ .../src/main/java/com/spotify/dbeam/avro/JdbcAvroIO.java | 7 +++++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/dbeam-core/src/main/java/com/spotify/dbeam/avro/AvroWriter.java b/dbeam-core/src/main/java/com/spotify/dbeam/avro/AvroWriter.java index e1d3d7e1..285b1bf8 100644 --- a/dbeam-core/src/main/java/com/spotify/dbeam/avro/AvroWriter.java +++ b/dbeam-core/src/main/java/com/spotify/dbeam/avro/AvroWriter.java @@ -26,9 +26,15 @@ import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +/** + * Takes ByteBuffer datums from a BlockingQueue and writes to a DataFileWriter. + */ public class AvroWriter implements Runnable { + private static final Logger LOGGER = LoggerFactory.getLogger(AvroWriter.class); private final DataFileWriter dataFileWriter; private final JdbcAvroMetering metering; private final BlockingQueue queue; diff --git a/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroIO.java b/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroIO.java index d20cdb0e..301b655a 100644 --- a/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroIO.java +++ b/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroIO.java @@ -213,12 +213,15 @@ public void write(final String query) throws Exception { LOGGER.info("jdbcavroio : Starting write..."); final ExecutorService executorService = Executors.newSingleThreadExecutor(); try (ResultSet resultSet = executeQuery(query)) { - final Future future = executorService.submit(new AvroWriter(dataFileWriter, metering, queue)); - final long startMs = metering.startWriteMeter(); + final Future future = executorService.submit(new AvroWriter(dataFileWriter, queue)); + metering.startWriteMeter(); convertAllResultSet(resultSet, JdbcAvroRecordConverter.create(resultSet)); queue.put(ByteBuffer.allocate(0)); // write final record, so that consumer stops + final long startTime2 = System.nanoTime(); future.get(); executorService.shutdown(); + LOGGER.info(String.format("jdbcavroio : Waited %5.2f seconds for finishing write operation", + (System.nanoTime() - startTime2) / (1000000000.0))); this.dataFileWriter.flush(); this.metering.exposeWriteElapsed(); this.metering.exposeWrittenBytes(this.countingOutputStream.getCount());