Skip to content

Commit

Permalink
Better logs
Browse files Browse the repository at this point in the history
  • Loading branch information
labianchin committed Sep 17, 2019
1 parent 508eca0 commit d55eeaa
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,18 @@ public class AvroWriter implements Runnable {

private static final Logger LOGGER = LoggerFactory.getLogger(AvroWriter.class);
private final DataFileWriter<GenericRecord> dataFileWriter;
private final JdbcAvroMetering metering;
private final BlockingQueue<ByteBuffer> queue;

public AvroWriter(
DataFileWriter<GenericRecord> dataFileWriter,
JdbcAvroMetering metering,
BlockingQueue<ByteBuffer> queue) {
this.dataFileWriter = dataFileWriter;
this.metering = metering;
this.queue = queue;
}

@Override
public void run() {
LOGGER.debug("AvroWriter started");
try {
int c = 0;
while (true) {
Expand All @@ -63,10 +61,9 @@ public void run() {
}
}
} catch (InterruptedException ex) {
System.out.println("CONSUMER INTERRUPTED");
LOGGER.warn("AvroWriter interrupted");
} catch (IOException e) {
e.printStackTrace();
LOGGER.error("Error on AvroWriter", e);
}

}
}
18 changes: 9 additions & 9 deletions dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public FileBasedSink.Writer<Void, String> createWriter() {
}

private static class JdbcAvroWriter extends FileBasedSink.Writer<Void, String> {
private final Logger logger = LoggerFactory.getLogger(JdbcAvroWriter.class);
private static final Logger LOGGER = LoggerFactory.getLogger(JdbcAvroWriter.class);
private final int syncInterval = DataFileConstants.DEFAULT_SYNC_INTERVAL * 16; // 1 MB
private final DynamicAvroDestinations<?, Void, String> dynamicDestinations;
private final JdbcAvroArgs jdbcAvroArgs;
Expand All @@ -163,7 +163,7 @@ public Void getDestination() {
@SuppressWarnings("deprecation") // uses internal test functionality.
@Override
protected void prepareWrite(WritableByteChannel channel) throws Exception {
logger.info("jdbcavroio : Preparing write...");
LOGGER.debug("jdbcavroio : Preparing write...");
connection = jdbcAvroArgs.jdbcConnectionConfiguration().createConnection();
Void destination = getDestination();
Schema schema = dynamicDestinations.getSchema(destination);
Expand All @@ -173,7 +173,7 @@ protected void prepareWrite(WritableByteChannel channel) throws Exception {
dataFileWriter.setMeta("created_by", this.getClass().getCanonicalName());
this.countingOutputStream = new CountingOutputStream(Channels.newOutputStream(channel));
dataFileWriter.create(schema, this.countingOutputStream);
logger.info("jdbcavroio : Write prepared");
LOGGER.debug("jdbcavroio : Write prepared");
}

private ResultSet executeQuery(String query) throws Exception {
Expand All @@ -189,7 +189,7 @@ private ResultSet executeQuery(String query) throws Exception {
}

long startTime = System.currentTimeMillis();
logger.info(
LOGGER.info(
"jdbcavroio : Executing query with fetchSize={} (this might take a few minutes) ...",
statement.getFetchSize());
ResultSet resultSet = statement.executeQuery();
Expand All @@ -203,17 +203,17 @@ private ResultSet executeQuery(String query) throws Exception {
public void write(String query) throws Exception {
checkArgument(dataFileWriter != null,
"Avro DataFileWriter was not properly created");
logger.info("jdbcavroio : Starting write...");
LOGGER.debug("jdbcavroio : Starting write...");
final ExecutorService executorService = Executors.newSingleThreadExecutor();
try (ResultSet resultSet = executeQuery(query)) {
final Future<?> future = executorService.submit(new AvroWriter(dataFileWriter, metering, queue));
final Future<?> future = executorService.submit(new AvroWriter(dataFileWriter, queue));
final long startMs = metering.startWriteMeter();
convertAllResultSet(resultSet, JdbcAvroRecordConverter.create(resultSet));
queue.put(ByteBuffer.allocate(0)); // write final record, so that consumer stops
final long startMs2 = metering.startWriteMeter();
future.get();
executorService.shutdown();
logger.info(String.format("jdbcavroio : Waited %5.2f seconds for finishing write operation",
LOGGER.info(String.format("jdbcavroio : Waited %5.2f seconds for finishing write operation",
(System.currentTimeMillis() - startMs2) / 1000.0));
this.dataFileWriter.flush();
this.metering.exposeWriteElapsedMs(System.currentTimeMillis() - startMs);
Expand All @@ -231,14 +231,14 @@ private void convertAllResultSet(ResultSet resultSet, JdbcAvroRecordConverter co

@Override
protected void finishWrite() throws Exception {
logger.info("jdbcavroio : Closing connection, flushing writer...");
LOGGER.debug("jdbcavroio : Closing connection, flushing writer...");
if (connection != null) {
connection.close();
}
if (dataFileWriter != null) {
dataFileWriter.close();
}
logger.info("jdbcavroio : Write finished");
LOGGER.info("jdbcavroio : Write finished");
}
}

Expand Down

0 comments on commit d55eeaa

Please sign in to comment.