Skip to content

Commit

Permalink
change
Browse files Browse the repository at this point in the history
  • Loading branch information
annie-mac committed Feb 28, 2024
1 parent 6c8bcf5 commit cb89e69
Showing 1 changed file with 15 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ public String version() {

@Override
public void start(Map<String, String> map) {
logger.info("Starting CosmosDBSourceTask.");
logger.info("Worker {} Starting CosmosDBSourceTask.", this.config.getWorkerName());
config = new CosmosDBSourceConfig(map);
this.queue = new LinkedTransferQueue<>();

logger.info("Creating the client.");
logger.info("Worker {} Creating the client.", this.config.getWorkerName());
client = getCosmosClient(config);

// Initialize the database, feed and lease containers
Expand Down Expand Up @@ -102,7 +102,7 @@ public void start(Map<String, String> map) {
}
} // Wait for ChangeFeedProcessor to start.

logger.info("Started CosmosDB source task.");
logger.info("Worker {} Started CosmosDB source task.", this.config.getWorkerName());
}

private String getItemLsn(JsonNode item) {
Expand Down Expand Up @@ -172,9 +172,6 @@ private void fillRecords(List<SourceRecord> records, String topic) throws Interr
// Get the latest lsn and record as offset
Map<String, Object> sourceOffset = singletonMap(OFFSET_KEY, getItemLsn(node));

if (logger.isDebugEnabled()) {
logger.debug("Latest offset is {}.", sourceOffset.get(OFFSET_KEY));
}
// Convert JSON to Kafka Connect struct and JSON schema
SchemaAndValue schemaAndValue = jsonToStruct.recordToSchemaAndValue(node);

Expand All @@ -193,21 +190,24 @@ private void fillRecords(List<SourceRecord> records, String topic) throws Interr
} else {
// If the buffer Size exceeds then do not remove the node.
if (logger.isDebugEnabled()) {
logger.debug("Adding record back to the queue since adding it exceeds the allowed buffer size {}", config.getTaskBufferSize());
logger.debug(
"Worker {} Adding record back to the queue since adding it exceeds the allowed buffer size {}",
this.config.getWorkerName(),
config.getTaskBufferSize());
}
this.queue.add(node);
break;
}
} catch (Exception e) {
logger.error("Failed to fill Source Records for Topic {}", topic);
logger.error("Worker {} Failed to fill Source Records for Topic {}", this.config.getWorkerName(), topic);
throw e;
}
}
}

@Override
public void stop() {
logger.info("Stopping CosmosDB source task.");
logger.info("Worker {} Stopping CosmosDB source task.", this.config.getWorkerName());
// NOTE: poll() method and stop() method are both called from the same thread,
// so it is important not to include any changes which may block both places forever
running.set(false);
Expand All @@ -217,10 +217,14 @@ public void stop() {
changeFeedProcessor.stop().block();
changeFeedProcessor = null;
}

if (this.client != null) {
this.client.close();
}
}

private CosmosAsyncClient getCosmosClient(CosmosDBSourceConfig config) {
logger.info("Creating Cosmos Client.");
logger.info("Worker {} Creating Cosmos Client.", this.config.getWorkerName());

CosmosClientBuilder cosmosClientBuilder = new CosmosClientBuilder()
.endpoint(config.getConnEndpoint())
Expand Down Expand Up @@ -271,7 +275,7 @@ protected void handleCosmosDbChanges(List<JsonNode> docs) {
.map(jsonNode -> jsonNode.get("id").asText())
.collect(Collectors.toList());
logger.debug(
"handleCosmosDbChanges - Worker {}, docIds {}, Details [{}].",
"handleCosmosDbChanges - Worker {}, total docs {}, Details [{}].",
this.config.getWorkerName(),
docIds.size(),
docIds);
Expand Down

0 comments on commit cb89e69

Please sign in to comment.