Skip to content

Commit

Permalink
Better logs
Browse files Browse the repository at this point in the history
  • Loading branch information
labianchin committed Oct 15, 2019
1 parent 9fb66a7 commit 0499368
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,18 @@ public class AvroWriter implements Runnable {

private static final Logger LOGGER = LoggerFactory.getLogger(AvroWriter.class);
private final DataFileWriter<GenericRecord> dataFileWriter;
private final JdbcAvroMetering metering;
private final BlockingQueue<ByteBuffer> queue;

public AvroWriter(
DataFileWriter<GenericRecord> dataFileWriter,
JdbcAvroMetering metering,
BlockingQueue<ByteBuffer> queue) {
this.dataFileWriter = dataFileWriter;
this.metering = metering;
this.queue = queue;
}

@Override
public void run() {
LOGGER.debug("AvroWriter started");
try {
while (true) {
final ByteBuffer datum = queue.take();
Expand All @@ -61,10 +59,9 @@ public void run() {
}
}
} catch (InterruptedException ex) {
System.out.println("CONSUMER INTERRUPTED");
LOGGER.warn("AvroWriter interrupted");
} catch (IOException e) {
e.printStackTrace();
LOGGER.error("Error on AvroWriter", e);
}

}
}
14 changes: 7 additions & 7 deletions dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public FileBasedSink.Writer<Void, String> createWriter() {
}

private static class JdbcAvroWriter extends FileBasedSink.Writer<Void, String> {
private final Logger logger = LoggerFactory.getLogger(JdbcAvroWriter.class);
private static final Logger LOGGER = LoggerFactory.getLogger(JdbcAvroWriter.class);
private final int syncInterval = DataFileConstants.DEFAULT_SYNC_INTERVAL * 16; // 1 MB
private final DynamicAvroDestinations<?, Void, String> dynamicDestinations;
private final JdbcAvroArgs jdbcAvroArgs;
Expand All @@ -165,7 +165,7 @@ public Void getDestination() {
@SuppressWarnings("deprecation") // uses internal test functionality.
@Override
protected void prepareWrite(WritableByteChannel channel) throws Exception {
logger.info("jdbcavroio : Preparing write...");
LOGGER.debug("jdbcavroio : Preparing write...");
connection = jdbcAvroArgs.jdbcConnectionConfiguration().createConnection();
Void destination = getDestination();
Schema schema = dynamicDestinations.getSchema(destination);
Expand All @@ -175,7 +175,7 @@ protected void prepareWrite(WritableByteChannel channel) throws Exception {
dataFileWriter.setMeta("created_by", this.getClass().getCanonicalName());
this.countingOutputStream = new CountingOutputStream(Channels.newOutputStream(channel));
dataFileWriter.create(schema, this.countingOutputStream);
logger.info("jdbcavroio : Write prepared");
LOGGER.debug("jdbcavroio : Write prepared");
}

private ResultSet executeQuery(String query) throws Exception {
Expand All @@ -191,7 +191,7 @@ private ResultSet executeQuery(String query) throws Exception {
}

long startTime = System.nanoTime();
logger.info(
LOGGER.info(
"jdbcavroio : Executing query with fetchSize={} (this might take a few minutes) ...",
statement.getFetchSize());
ResultSet resultSet = statement.executeQuery();
Expand All @@ -205,7 +205,7 @@ private ResultSet executeQuery(String query) throws Exception {
public void write(String query) throws Exception {
checkArgument(dataFileWriter != null,
"Avro DataFileWriter was not properly created");
logger.info("jdbcavroio : Starting write...");
LOGGER.debug("jdbcavroio : Starting write...");
final ExecutorService executorService = Executors.newSingleThreadExecutor();
try (ResultSet resultSet = executeQuery(query)) {
final Future<?> future = executorService.submit(new AvroWriter(dataFileWriter, queue));
Expand Down Expand Up @@ -233,14 +233,14 @@ private void convertAllResultSet(ResultSet resultSet, JdbcAvroRecordConverter co

@Override
protected void finishWrite() throws Exception {
logger.info("jdbcavroio : Closing connection, flushing writer...");
LOGGER.debug("jdbcavroio : Closing connection, flushing writer...");
if (connection != null) {
connection.close();
}
if (dataFileWriter != null) {
dataFileWriter.close();
}
logger.info("jdbcavroio : Write finished");
LOGGER.info("jdbcavroio : Write finished");
}
}

Expand Down

0 comments on commit 0499368

Please sign in to comment.