Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for vector collection split brain configuration [AI-146] #713

Merged
merged 5 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion hazelcast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,9 @@ def create_vector_collection_config(
indexes: typing.List[IndexConfig],
backup_count: int = 1,
async_backup_count: int = 0,
split_brain_protection_name: typing.Optional[str] = None,
merge_policy: str = "PutIfAbsentMergePolicy",
merge_batch_size: int = 100,
) -> None:
# check that indexes have different names
if indexes:
Expand All @@ -392,7 +395,13 @@ def create_vector_collection_config(
raise AssertionError("index names must be unique")

request = dynamic_config_add_vector_collection_config_codec.encode_request(
name, indexes, backup_count, async_backup_count
name,
indexes,
backup_count,
async_backup_count,
split_brain_protection_name,
merge_policy,
merge_batch_size,
)
invocation = Invocation(request, response_handler=lambda m: m)
self._invocation_service.invoke(invocation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from hazelcast.protocol.builtin import StringCodec
from hazelcast.protocol.builtin import ListMultiFrameCodec
from hazelcast.protocol.codec.custom.vector_index_config_codec import VectorIndexConfigCodec
from hazelcast.protocol.builtin import CodecUtil

# hex: 0x1B1400
_REQUEST_MESSAGE_TYPE = 1774592
Expand All @@ -12,13 +13,17 @@

_REQUEST_BACKUP_COUNT_OFFSET = REQUEST_HEADER_SIZE
_REQUEST_ASYNC_BACKUP_COUNT_OFFSET = _REQUEST_BACKUP_COUNT_OFFSET + INT_SIZE_IN_BYTES
_REQUEST_INITIAL_FRAME_SIZE = _REQUEST_ASYNC_BACKUP_COUNT_OFFSET + INT_SIZE_IN_BYTES
_REQUEST_MERGE_BATCH_SIZE_OFFSET = _REQUEST_ASYNC_BACKUP_COUNT_OFFSET + INT_SIZE_IN_BYTES
_REQUEST_INITIAL_FRAME_SIZE = _REQUEST_MERGE_BATCH_SIZE_OFFSET + INT_SIZE_IN_BYTES


def encode_request(name, index_configs, backup_count, async_backup_count):
def encode_request(name, index_configs, backup_count, async_backup_count, split_brain_protection_name, merge_policy, merge_batch_size):
buf = create_initial_buffer(_REQUEST_INITIAL_FRAME_SIZE, _REQUEST_MESSAGE_TYPE)
FixSizedTypesCodec.encode_int(buf, _REQUEST_BACKUP_COUNT_OFFSET, backup_count)
FixSizedTypesCodec.encode_int(buf, _REQUEST_ASYNC_BACKUP_COUNT_OFFSET, async_backup_count)
FixSizedTypesCodec.encode_int(buf, _REQUEST_MERGE_BATCH_SIZE_OFFSET, merge_batch_size)
StringCodec.encode(buf, name)
ListMultiFrameCodec.encode(buf, index_configs, VectorIndexConfigCodec.encode, True)
ListMultiFrameCodec.encode(buf, index_configs, VectorIndexConfigCodec.encode)
CodecUtil.encode_nullable(buf, split_brain_protection_name, StringCodec.encode)
StringCodec.encode(buf, merge_policy, True)
return OutboundMessage(buf, False)
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,42 @@ def test_sync_and_async_backup_count_more_than_max_value_fail(self):
async_backup_count=3,
)

def test_merge_policy_can_be_sent(self):
skip_if_client_version_older_than(self, "6.0")
name = random_string()
self.client.create_vector_collection_config(
name,
[IndexConfig("vector", Metric.COSINE, 3)],
merge_policy="DiscardMergePolicy",
merge_batch_size=1000,
)
# validation happens when the collection proxy is created
self.client.get_vector_collection(name)

def test_wrong_merge_policy_fails(self):
skip_if_client_version_older_than(self, "6.0")
skip_if_server_version_older_than(self, self.client, "6.0")
name = random_string()
with self.assertRaises(hazelcast.errors.InvalidConfigurationError):
self.client.create_vector_collection_config(
name, [IndexConfig("vector", Metric.COSINE, 3)], merge_policy="non-existent"
)
# validation happens when the collection proxy is created
self.client.get_vector_collection(name)

def test_split_brain_name_can_be_sent(self):
skip_if_client_version_older_than(self, "6.0")
name = random_string()
self.client.create_vector_collection_config(
name,
[IndexConfig("vector", Metric.COSINE, 3)],
# wrong name will be ignored
split_brain_protection_name="non-existent",
)
col = self.client.get_vector_collection(name)
doc = Document("v1", Vector("vector", Type.DENSE, [0.1, 0.2, 0.3]))
col.set("k1", doc)

def assert_document_equal(self, doc1, doc2) -> None:
self.assertEqual(doc1.value, doc2.value)
self.assertEqual(len(doc1.vectors), len(doc2.vectors))
Expand Down
Loading