From 288616513dddeb00cc8d99bf0c4e8767d08e4ae8 Mon Sep 17 00:00:00 2001 From: Giovanni Petrantoni <7008900+sinkingsugar@users.noreply.github.com> Date: Mon, 18 Nov 2024 09:33:30 +0800 Subject: [PATCH] Refactor CRDT merge logic to support contextual rules Refactor the CRDT implementation to enhance merge rule handling by introducing support for context-based rules. Updated the `MergeRule` concept to accommodate both void and non-void contexts, with specialized implementations in `DefaultMergeRule`. Added a `merge_context_` member for storing context and adjusted the CRDT constructor and copy/move operations accordingly. Introduced a helper function `should_accept_change` to streamline merge rule evaluations. Updated `sync_nodes` and other CRDT-related functions to align with the new design. --- crdt.hpp | 95 +++++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 66 insertions(+), 29 deletions(-) diff --git a/crdt.hpp b/crdt.hpp index ca8dd02..cdc42c7 100644 --- a/crdt.hpp +++ b/crdt.hpp @@ -78,14 +78,22 @@ template struct Change { node_id(nid), local_db_version(ldb_ver), flags(f) {} }; -// Define a concept for a custom merge rule -template -concept MergeRule = requires(Rule r, const Change &local, const Change &remote) { - { r(local, remote) } -> std::convertible_to; -}; - -// Default merge rule (current behavior) -template struct DefaultMergeRule { +// Update the MergeRule concept to properly handle void context +template +concept MergeRule = + // Case 1: No context (void) + (std::is_void_v && + requires(Rule r, const Change &local, const Change &remote) { + { r(local, remote) } -> std::convertible_to; + }) || + // Case 2: With context + (!std::is_void_v && requires(Rule r, const Change &local, const Change &remote, const Context &ctx) { + { r(local, remote, ctx) } -> std::convertible_to; + }); + +// Default merge rule with proper void handling +template struct DefaultMergeRule { + // Primary version without context constexpr bool operator()(const Change &local, const Change &remote) const { if (remote.col_version > local.col_version) { return true; @@ -103,6 +111,16 @@ template struct DefaultMergeRule { } }; +// Specialization for non-void context +template + requires(!std::is_void_v) +struct DefaultMergeRule { + constexpr bool operator()(const Change &local, const Change &remote, const Context &) const { + DefaultMergeRule default_rule; + return default_rule(local, remote); + } +}; + // Define a concept for a custom change comparator template concept ChangeComparator = requires(Comparator c, const Change &a, const Change &b) { @@ -202,24 +220,27 @@ template constexpr bool operator==(const Record &lhs, const Reco } /// Represents the CRDT structure, generic over key (`K`) and value (`V`) types. -template MergeRuleType = DefaultMergeRule, +template MergeRuleType = DefaultMergeRule, ChangeComparator ChangeComparatorType = DefaultChangeComparator, typename SortFunctionType = DefaultSort> -class CRDT : public std::enable_shared_from_this> { +class CRDT + : public std::enable_shared_from_this> { public: // Create a new empty CRDT // Complexity: O(1) - CRDT(CrdtNodeId node_id, std::shared_ptr> parent = nullptr, - MergeRuleType merge_rule = MergeRuleType(), ChangeComparatorType change_comparator = ChangeComparatorType(), - SortFunctionType sort_func = SortFunctionType()) + CRDT(CrdtNodeId node_id, std::shared_ptr parent = nullptr, MergeRuleType merge_rule = MergeRuleType(), + ChangeComparatorType change_comparator = ChangeComparatorType(), SortFunctionType sort_func = SortFunctionType(), + std::conditional_t, + std::monostate, // Use monostate for void case + MergeContext> + context = {}) : node_id_(node_id), clock_(), data_(), tombstones_(), parent_(parent), merge_rule_(std::move(merge_rule)), - change_comparator_(std::move(change_comparator)), sort_func_(std::move(sort_func)) { + change_comparator_(std::move(change_comparator)), sort_func_(std::move(sort_func)), merge_context_(std::move(context)) { if (parent_) { - // Set clock to parent's clock clock_ = parent_->clock_; - // Capture the base version from the parent base_version_ = parent_->clock_.current_time(); } else { - base_version_ = 0; // No parent, base_version_ is 0 + base_version_ = 0; } } @@ -287,7 +308,8 @@ class CRDT : public std::enable_shared_from_this> diff(const CRDT &other) const { + constexpr CrdtVector> + diff(const CRDT &other) const { // Find the common ancestor (lowest common db_version) uint64_t common_version = std::min(clock_.current_time(), other.clock_.current_time()); @@ -557,12 +579,11 @@ class CRDT : public std::enable_shared_from_this local_change(record_id, col_name ? *col_name : "", std::nullopt, local_col_info->col_version, local_col_info->db_version, local_col_info->node_id, flags); - should_accept = merge_rule_(local_change, change); + should_accept = should_accept_change(local_change, change); } if (should_accept) { @@ -573,8 +594,7 @@ class CRDT : public std::enable_shared_from_this deletion_clock; - deletion_clock.emplace("", - ColumnVersion(remote_col_version, remote_db_version, remote_node_id, new_local_db_version)); + deletion_clock.emplace("", ColumnVersion(remote_col_version, remote_db_version, remote_node_id, new_local_db_version)); // Store deletion info in the data map data_.emplace(record_id, Record(CrdtMap(), std::move(deletion_clock))); @@ -770,7 +790,7 @@ class CRDT : public std::enable_shared_from_this> parent_; + std::shared_ptr> parent_; uint64_t base_version_; // Tracks the parent's db_version at the time of child creation MergeRuleType merge_rule_; ChangeComparatorType change_comparator_; SortFunctionType sort_func_; + std::conditional_t, + std::monostate, // Use monostate for void case + MergeContext> + merge_context_; // Helper function to print values template static void print_value(const T &value) { @@ -952,7 +978,7 @@ class CRDT : public std::enable_shared_from_this> invert_changes(const CrdtVector> &changes, - const CRDT &reference_crdt) const { + const CRDT &reference_crdt) const { CrdtVector> inverse_changes; for (const auto &change : changes) { @@ -1106,6 +1132,15 @@ class CRDT : public std::enable_shared_from_this(record_id, std::nullopt, std::nullopt, 1, db_version, node_id_, db_version, flags)); } } + + // Helper to handle merge rule calls with/without context + constexpr bool should_accept_change(const Change &local, const Change &remote) { + if constexpr (std::is_void_v) { + return merge_rule_(local, remote); + } else { + return merge_rule_(local, remote, merge_context_); + } + } }; /// Synchronizes two CRDT nodes. @@ -1115,10 +1150,12 @@ class CRDT : public std::enable_shared_from_this MergeRuleType = DefaultMergeRule, +template MergeRuleType = DefaultMergeRule, ChangeComparator ChangeComparatorType = DefaultChangeComparator, typename SortFunctionType = DefaultSort> -constexpr void sync_nodes(CRDT &source, - CRDT &target, uint64_t &last_db_version) { +constexpr void sync_nodes(CRDT &source, + CRDT &target, + uint64_t &last_db_version) { auto changes = source.get_changes_since(last_db_version); // Update last_db_version to the current max db_version in source