From e3ac7c9a85bb9dada72e5697649c716ef008c83c Mon Sep 17 00:00:00 2001 From: Luis Bianchin Date: Mon, 15 Apr 2019 16:16:30 +0200 Subject: [PATCH] Refactor instrumentation --- .../src/main/java/com/spotify/dbeam/avro/AvroWriter.java | 8 +++----- .../src/main/java/com/spotify/dbeam/avro/JdbcAvroIO.java | 2 +- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/dbeam-core/src/main/java/com/spotify/dbeam/avro/AvroWriter.java b/dbeam-core/src/main/java/com/spotify/dbeam/avro/AvroWriter.java index b3c84a0e..fff647ae 100644 --- a/dbeam-core/src/main/java/com/spotify/dbeam/avro/AvroWriter.java +++ b/dbeam-core/src/main/java/com/spotify/dbeam/avro/AvroWriter.java @@ -29,6 +29,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Takes ByteBuffer datums from a BlockingQueue and writes to a DataFileWriter. + */ public class AvroWriter implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(AvroWriter.class); @@ -46,7 +49,6 @@ public AvroWriter( public void run() { LOGGER.debug("AvroWriter started"); try { - int c = 0; while (true) { final ByteBuffer datum = queue.take(); if (datum.capacity() == 0) { @@ -54,10 +56,6 @@ public void run() { return; } else { this.dataFileWriter.appendEncoded(datum); - c++; - } - if ((c % 100000) == 0) { - LOGGER.info("Size={} remainingCapacity={}", queue.size(), queue.remainingCapacity()); } } } catch (InterruptedException ex) { diff --git a/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroIO.java b/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroIO.java index ed7825d7..f4fcc2aa 100644 --- a/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroIO.java +++ b/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroIO.java @@ -210,7 +210,7 @@ public void write(String query) throws Exception { final long startMs = metering.startWriteMeter(); convertAllResultSet(resultSet, JdbcAvroRecordConverter.create(resultSet)); queue.put(ByteBuffer.allocate(0)); // write final record, so that consumer stops - final long startMs2 = metering.startWriteMeter(); + final long startMs2 = System.currentTimeMillis(); future.get(); executorService.shutdown(); LOGGER.info(String.format("jdbcavroio : Waited %5.2f seconds for finishing write operation",