Skip to content

Commit

Permalink
Instrument consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
labianchin committed Jan 10, 2020
1 parent 32f7c7c commit fbb2924
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,15 @@

import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Takes ByteBuffer datums from a BlockingQueue and writes to a DataFileWriter.
*/
public class AvroWriter implements Runnable {

private static final Logger LOGGER = LoggerFactory.getLogger(AvroWriter.class);
private final DataFileWriter<GenericRecord> dataFileWriter;
private final JdbcAvroMetering metering;
private final BlockingQueue<ByteBuffer> queue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,15 @@ public void write(String query) throws Exception {
logger.info("jdbcavroio : Starting write...");
final ExecutorService executorService = Executors.newSingleThreadExecutor();
try (ResultSet resultSet = executeQuery(query)) {
final Future<?> future = executorService.submit(new AvroWriter(dataFileWriter, metering, queue));
final long startMs = metering.startWriteMeter();
final Future<?> future = executorService.submit(new AvroWriter(dataFileWriter, queue));
metering.startWriteMeter();
convertAllResultSet(resultSet, JdbcAvroRecordConverter.create(resultSet));
queue.put(ByteBuffer.allocate(0)); // write final record, so that consumer stops
final long startTime2 = System.nanoTime();
future.get();
executorService.shutdown();
LOGGER.info(String.format("jdbcavroio : Waited %5.2f seconds for finishing write operation",
(System.nanoTime() - startTime2) / (1000000000.0)));
this.dataFileWriter.flush();
this.metering.exposeWriteElapsed();
this.metering.exposeWrittenBytes(this.countingOutputStream.getCount());
Expand Down

0 comments on commit fbb2924

Please sign in to comment.