Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release 1.15.0 #567

Merged
merged 11 commits into from
Apr 18, 2024
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
## Release History
### 1.15.0 (2024-04-18)
#### Key Bug Fixes
* Fixed an issue where using `CosmosDBSinkConnector` in bulk mode failed to write items for container with nested partition key path - [PR 565](https://github.com/microsoft/kafka-connect-cosmosdb/pull/565)

### 1.14.2 (2024-03-12)
#### Key Bug Fixes
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<groupId>com.azure.cosmos.kafka</groupId>
<artifactId>kafka-connect-cosmos</artifactId>
<version>1.14.2</version>
<version>1.15.0</version>

<name> kafka-connect-cosmos</name>
<url>https://github.com/microsoft/kafka-connect-cosmosdb</url>
Expand Down Expand Up @@ -48,7 +48,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-cosmos</artifactId>
<version>4.56.0</version>
<version>4.58.0</version>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.LinkedHashMap;

import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument;
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;
Expand Down Expand Up @@ -120,16 +120,8 @@ protected SinkWriteResponse writeCore(List<SinkRecord> sinkRecords) {
private PartitionKey getPartitionKeyValue(Object recordValue) {
checkArgument(recordValue instanceof Map, "Argument 'recordValue' is not valid map format.");

//TODO: examine the code here for sub-partition
String partitionKeyPath = StringUtils.join(this.partitionKeyDefinition.getPaths(), "");
Map<String, Object> recordMap = (Map<String, Object>) recordValue;
Object partitionKeyValue = recordMap.get(partitionKeyPath.substring(1));
PartitionKeyInternal partitionKeyInternal = PartitionKeyInternal.fromObjectArray(Collections.singletonList(partitionKeyValue), false);

return ImplementationBridgeHelpers
.PartitionKeyHelper
.getPartitionKeyAccessor()
.toPartitionKey(partitionKeyInternal);
return PartitionKey.fromItem(recordMap, this.partitionKeyDefinition);
}

BulkOperationFailedException handleErrorStatusCode(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.azure.cosmos.implementation.BadRequestException;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.RequestTimeoutException;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.models.CosmosBulkItemResponse;
import com.azure.cosmos.models.CosmosBulkOperationResponse;
import com.azure.cosmos.models.CosmosBulkOperations;
Expand All @@ -16,6 +17,9 @@
import com.azure.cosmos.models.CosmosItemOperation;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.PartitionKeyDefinition;
import com.azure.cosmos.models.PartitionKind;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Schema;
Expand All @@ -31,11 +35,13 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

import static junit.framework.Assert.assertNotNull;
import static junit.framework.TestCase.assertEquals;
import static junit.framework.TestCase.assertFalse;
import static junit.framework.TestCase.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
Expand All @@ -59,6 +65,7 @@ public void setup(){
PartitionKeyDefinition mockedPartitionKeyDefinition = Mockito.mock(PartitionKeyDefinition.class);
Mockito.when(mockedContainerProperties.getPartitionKeyDefinition()).thenReturn(mockedPartitionKeyDefinition);
Mockito.when(mockedPartitionKeyDefinition.getPaths()).thenReturn(Arrays.asList("/id"));
Mockito.when(mockedPartitionKeyDefinition.getKind()).thenReturn(PartitionKind.HASH);

bulkWriter = new BulkWriter(container, MAX_RETRY_COUNT, COMPRESSION_ENABLED);
}
Expand Down Expand Up @@ -207,6 +214,65 @@ public void testBulkWriteFailedWithTransientException() {
assertEquals(HttpConstants.StatusCodes.REQUEST_TIMEOUT, ((CosmosException)response.getFailedRecordResponses().get(0).getException()).getStatusCode());
}

@Test
public void testBulkWriteForContainerWithNestedPartitionKey() {
CosmosContainer containerWithNestedPartitionKey = Mockito.mock(CosmosContainer.class);

CosmosContainerResponse mockedContainerResponse = Mockito.mock(CosmosContainerResponse.class);
Mockito.when(containerWithNestedPartitionKey.read()).thenReturn(mockedContainerResponse);
CosmosContainerProperties mockedContainerProperties = Mockito.mock(CosmosContainerProperties.class);
Mockito.when(mockedContainerResponse.getProperties()).thenReturn(mockedContainerProperties);
PartitionKeyDefinition mockedPartitionKeyDefinition = Mockito.mock(PartitionKeyDefinition.class);
Mockito.when(mockedContainerProperties.getPartitionKeyDefinition()).thenReturn(mockedPartitionKeyDefinition);
Mockito.when(mockedPartitionKeyDefinition.getPaths()).thenReturn(Arrays.asList("/location/city/zipCode"));
Mockito.when(mockedPartitionKeyDefinition.getKind()).thenReturn(PartitionKind.HASH);

BulkWriter testWriter = new BulkWriter(containerWithNestedPartitionKey, MAX_RETRY_COUNT, COMPRESSION_ENABLED);

String itemId = UUID.randomUUID().toString();
String pkValue = "1234";

ObjectNode objectNode = Utils.getSimpleObjectMapper().createObjectNode();
objectNode.put("id", itemId);

ObjectNode locationNode = Utils.getSimpleObjectMapper().createObjectNode();
ObjectNode cityNode = Utils.getSimpleObjectMapper().createObjectNode();
cityNode.put("zipCode", pkValue);
locationNode.put("city", cityNode);
objectNode.put("location", locationNode);

SinkRecord sinkRecord =
new SinkRecord(
TOPIC_NAME,
1,
new ConnectSchema(org.apache.kafka.connect.data.Schema.Type.STRING),
objectNode.get("id"),
new ConnectSchema(org.apache.kafka.connect.data.Schema.Type.MAP),
Utils.getSimpleObjectMapper().convertValue(objectNode, new TypeReference<Map<String, Object>>() {}),
0L);

// setup successful item response
List<CosmosBulkOperationResponse<Object>> mockedBulkOperationResponseList = new ArrayList<>();
mockedBulkOperationResponseList.add(mockSuccessfulBulkOperationResponse(sinkRecord, itemId));

ArgumentCaptor<Iterable<CosmosItemOperation>> parameters = ArgumentCaptor.forClass(Iterable.class);
Mockito
.when(containerWithNestedPartitionKey.executeBulkOperations(parameters.capture()))
.thenReturn(() -> mockedBulkOperationResponseList.iterator());

testWriter.write(Arrays.asList(sinkRecord));

Iterator<CosmosItemOperation> bulkExecutionParameters = parameters.getValue().iterator();

assertTrue(bulkExecutionParameters.hasNext());
CosmosItemOperation bulkItemOperation = bulkExecutionParameters.next();
assertNotNull(bulkItemOperation.getPartitionKeyValue());
assertEquals(bulkItemOperation.getPartitionKeyValue(), new PartitionKey(pkValue));

// there should only be 1 operation
assertFalse(bulkExecutionParameters.hasNext());
}

private SinkRecord createSinkRecord(String id) {
Schema stringSchema = new ConnectSchema(Schema.Type.STRING);
Schema mapSchema = new ConnectSchema(Schema.Type.MAP);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,24 @@
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.CosmosContainer;
import com.azure.cosmos.CosmosDatabase;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.kafka.connect.ConnectorTestConfigurations;
import com.azure.cosmos.kafka.connect.sink.BulkWriter;
import com.azure.cosmos.kafka.connect.sink.id.strategy.ProvidedInKeyStrategy;
import com.azure.cosmos.kafka.connect.sink.id.strategy.ProvidedInValueStrategy;
import com.azure.cosmos.kafka.connect.sink.id.strategy.TemplateStrategy;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.ThroughputProperties;
import com.azure.cosmos.util.CosmosPagedIterable;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
Expand All @@ -29,7 +34,9 @@
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.json.JsonSerializer;
import org.apache.kafka.connect.sink.SinkRecord;
import org.sourcelab.kafka.connect.apiclient.Configuration;
import org.sourcelab.kafka.connect.apiclient.KafkaConnectClient;
import org.sourcelab.kafka.connect.apiclient.request.dto.NewConnectorDefinition;
Expand All @@ -48,10 +55,14 @@
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;

import static junit.framework.Assert.fail;
import static org.apache.kafka.common.utils.Utils.sleep;
import static org.sourcelab.kafka.connect.apiclient.request.dto.NewConnectorDefinition.Builder;

Expand Down Expand Up @@ -86,7 +97,7 @@ public class SinkConnectorIT {
* Create an embedded Kafka Connect cluster.
*/
@Before
public void before() throws URISyntaxException, IOException {
public void before() throws IOException {

// Load the sink.config.json config file
URL configFileUrl = SinkConnectorIT.class.getClassLoader().getResource("sink.config.json");
Expand Down Expand Up @@ -590,6 +601,60 @@ public void testPostJsonMessageWithTTL() throws InterruptedException, ExecutionE
Assert.assertFalse("Record still in DB", retrievedPerson.isPresent());
}

@Test
public void testBulkWriteForContainerWithNestedPartitionKey() {
// verify bulk writer can create records successfully for container with nested partition key path
CosmosDatabase database = null;
try {
// create a container with nested partition key
database = cosmosClient.getDatabase(UUID.randomUUID().toString());
cosmosClient.createDatabaseIfNotExists(database.getId());

String containerWithNestedPartitionKey = UUID.randomUUID().toString();
cosmosClient
.getDatabase(database.getId())
.createContainerIfNotExists(containerWithNestedPartitionKey, "/location/city/zipCode");
CosmosContainer testContainer = cosmosClient.getDatabase(database.getId()).getContainer(containerWithNestedPartitionKey);

String itemId = UUID.randomUUID().toString();
String pkValue = "1234";

ObjectNode objectNode = Utils.getSimpleObjectMapper().createObjectNode();
objectNode.put("id", itemId);
ObjectNode locationNode = Utils.getSimpleObjectMapper().createObjectNode();
ObjectNode cityNode = Utils.getSimpleObjectMapper().createObjectNode();
cityNode.put("zipCode", pkValue);
locationNode.put("city", cityNode);
objectNode.put("location", locationNode);

SinkRecord sinkRecord =
new SinkRecord(
kafkaTopicJson,
1,
new ConnectSchema(org.apache.kafka.connect.data.Schema.Type.STRING),
objectNode.get("id"),
new ConnectSchema(org.apache.kafka.connect.data.Schema.Type.MAP),
Utils.getSimpleObjectMapper().convertValue(objectNode, new TypeReference<Map<String, Object>>() {}),
0L);

BulkWriter testWriter = new BulkWriter(testContainer, 1, false);
testWriter.write(Arrays.asList(sinkRecord));

// verify the item is created successfully
try {
testContainer.readItem(itemId, new PartitionKey(pkValue), ObjectNode.class).getItem();
} catch (Exception e) {
fail("Should be able to read item " + e.getMessage());
}
} finally {
if (cosmosClient != null) {
if (database != null) {
database.delete();
}
}
}
}

/**
* A simple entity to serialize to/deserialize from JSON in tests.
*/
Expand Down
Loading