Skip to content

Commit

Permalink
Refactor CRDT merge logic to support contextual rules
Browse files Browse the repository at this point in the history
Refactor the CRDT implementation to enhance merge rule handling by introducing support for context-based rules. Updated the `MergeRule` concept to accommodate both void and non-void contexts, with specialized implementations in `DefaultMergeRule`. Added a `merge_context_` member for storing context and adjusted the CRDT constructor and copy/move operations accordingly. Introduced a helper function `should_accept_change` to streamline merge rule evaluations. Updated `sync_nodes` and other CRDT-related functions to align with the new design.
  • Loading branch information
sinkingsugar committed Nov 18, 2024
1 parent 139ae4a commit 2886165
Showing 1 changed file with 66 additions and 29 deletions.
95 changes: 66 additions & 29 deletions crdt.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,22 @@ template <typename K, typename V> struct Change {
node_id(nid), local_db_version(ldb_ver), flags(f) {}
};

// Define a concept for a custom merge rule
template <typename Rule, typename K, typename V>
concept MergeRule = requires(Rule r, const Change<K, V> &local, const Change<K, V> &remote) {
{ r(local, remote) } -> std::convertible_to<bool>;
};

// Default merge rule (current behavior)
template <typename K, typename V> struct DefaultMergeRule {
// Update the MergeRule concept to properly handle void context
template <typename Rule, typename K, typename V, typename Context = void>
concept MergeRule =
// Case 1: No context (void)
(std::is_void_v<Context> &&
requires(Rule r, const Change<K, V> &local, const Change<K, V> &remote) {
{ r(local, remote) } -> std::convertible_to<bool>;
}) ||
// Case 2: With context
(!std::is_void_v<Context> && requires(Rule r, const Change<K, V> &local, const Change<K, V> &remote, const Context &ctx) {
{ r(local, remote, ctx) } -> std::convertible_to<bool>;
});

// Default merge rule with proper void handling
template <typename K, typename V, typename Context = void> struct DefaultMergeRule {
// Primary version without context
constexpr bool operator()(const Change<K, V> &local, const Change<K, V> &remote) const {
if (remote.col_version > local.col_version) {
return true;
Expand All @@ -103,6 +111,16 @@ template <typename K, typename V> struct DefaultMergeRule {
}
};

// Specialization for non-void context
template <typename K, typename V, typename Context>
requires(!std::is_void_v<Context>)
struct DefaultMergeRule<K, V, Context> {
constexpr bool operator()(const Change<K, V> &local, const Change<K, V> &remote, const Context &) const {
DefaultMergeRule<K, V, void> default_rule;
return default_rule(local, remote);
}
};

// Define a concept for a custom change comparator
template <typename Comparator, typename K, typename V>
concept ChangeComparator = requires(Comparator c, const Change<K, V> &a, const Change<K, V> &b) {
Expand Down Expand Up @@ -202,24 +220,27 @@ template <typename V> constexpr bool operator==(const Record<V> &lhs, const Reco
}

/// Represents the CRDT structure, generic over key (`K`) and value (`V`) types.
template <typename K, typename V, MergeRule<K, V> MergeRuleType = DefaultMergeRule<K, V>,
template <typename K, typename V, typename MergeContext = void,
MergeRule<K, V, MergeContext> MergeRuleType = DefaultMergeRule<K, V, MergeContext>,
ChangeComparator<K, V> ChangeComparatorType = DefaultChangeComparator<K, V>, typename SortFunctionType = DefaultSort>
class CRDT : public std::enable_shared_from_this<CRDT<K, V, MergeRuleType, ChangeComparatorType, SortFunctionType>> {
class CRDT
: public std::enable_shared_from_this<CRDT<K, V, MergeContext, MergeRuleType, ChangeComparatorType, SortFunctionType>> {
public:
// Create a new empty CRDT
// Complexity: O(1)
CRDT(CrdtNodeId node_id, std::shared_ptr<CRDT<K, V, MergeRuleType, ChangeComparatorType, SortFunctionType>> parent = nullptr,
MergeRuleType merge_rule = MergeRuleType(), ChangeComparatorType change_comparator = ChangeComparatorType(),
SortFunctionType sort_func = SortFunctionType())
CRDT(CrdtNodeId node_id, std::shared_ptr<CRDT> parent = nullptr, MergeRuleType merge_rule = MergeRuleType(),
ChangeComparatorType change_comparator = ChangeComparatorType(), SortFunctionType sort_func = SortFunctionType(),
std::conditional_t<std::is_void_v<MergeContext>,
std::monostate, // Use monostate for void case
MergeContext>
context = {})
: node_id_(node_id), clock_(), data_(), tombstones_(), parent_(parent), merge_rule_(std::move(merge_rule)),
change_comparator_(std::move(change_comparator)), sort_func_(std::move(sort_func)) {
change_comparator_(std::move(change_comparator)), sort_func_(std::move(sort_func)), merge_context_(std::move(context)) {
if (parent_) {
// Set clock to parent's clock
clock_ = parent_->clock_;
// Capture the base version from the parent
base_version_ = parent_->clock_.current_time();
} else {
base_version_ = 0; // No parent, base_version_ is 0
base_version_ = 0;
}
}

Expand Down Expand Up @@ -287,7 +308,8 @@ class CRDT : public std::enable_shared_from_this<CRDT<K, V, MergeRuleType, Chang
/// # Complexity
///
/// O(c), where c is the number of changes since the common ancestor
constexpr CrdtVector<Change<K, V>> diff(const CRDT<K, V, MergeRuleType, ChangeComparatorType, SortFunctionType> &other) const {
constexpr CrdtVector<Change<K, V>>
diff(const CRDT<K, V, MergeContext, MergeRuleType, ChangeComparatorType, SortFunctionType> &other) const {
// Find the common ancestor (lowest common db_version)
uint64_t common_version = std::min(clock_.current_time(), other.clock_.current_time());

Expand Down Expand Up @@ -557,12 +579,11 @@ class CRDT : public std::enable_shared_from_this<CRDT<K, V, MergeRuleType, Chang
bool should_accept = false;

if (local_col_info == nullptr) {
// No local version exists; accept the remote change
should_accept = true;
} else {
Change<K, V> local_change(record_id, col_name ? *col_name : "", std::nullopt, local_col_info->col_version,
local_col_info->db_version, local_col_info->node_id, flags);
should_accept = merge_rule_(local_change, change);
should_accept = should_accept_change(local_change, change);
}

if (should_accept) {
Expand All @@ -573,8 +594,7 @@ class CRDT : public std::enable_shared_from_this<CRDT<K, V, MergeRuleType, Chang

// Update deletion clock info
CrdtMap<CrdtString, ColumnVersion> deletion_clock;
deletion_clock.emplace("",
ColumnVersion(remote_col_version, remote_db_version, remote_node_id, new_local_db_version));
deletion_clock.emplace("", ColumnVersion(remote_col_version, remote_db_version, remote_node_id, new_local_db_version));

// Store deletion info in the data map
data_.emplace(record_id, Record<V>(CrdtMap<CrdtString, V>(), std::move(deletion_clock)));
Expand Down Expand Up @@ -770,7 +790,7 @@ class CRDT : public std::enable_shared_from_this<CRDT<K, V, MergeRuleType, Chang
CRDT(const CRDT &other)
: node_id_(other.node_id_), clock_(other.clock_), data_(other.data_), tombstones_(other.tombstones_),
parent_(other.parent_), base_version_(other.base_version_), merge_rule_(other.merge_rule_),
change_comparator_(other.change_comparator_), sort_func_(other.sort_func_) {
change_comparator_(other.change_comparator_), sort_func_(other.sort_func_), merge_context_(other.merge_context_) {
// Note: This creates a shallow copy of the parent pointer
}

Expand All @@ -785,6 +805,7 @@ class CRDT : public std::enable_shared_from_this<CRDT<K, V, MergeRuleType, Chang
merge_rule_ = other.merge_rule_;
change_comparator_ = other.change_comparator_;
sort_func_ = other.sort_func_;
merge_context_ = other.merge_context_;
}
return *this;
}
Expand All @@ -794,7 +815,7 @@ class CRDT : public std::enable_shared_from_this<CRDT<K, V, MergeRuleType, Chang
: node_id_(other.node_id_), clock_(std::move(other.clock_)), data_(std::move(other.data_)),
tombstones_(std::move(other.tombstones_)), parent_(std::move(other.parent_)), base_version_(other.base_version_),
merge_rule_(std::move(other.merge_rule_)), change_comparator_(std::move(other.change_comparator_)),
sort_func_(std::move(other.sort_func_)) {}
sort_func_(std::move(other.sort_func_)), merge_context_(std::move(other.merge_context_)) {}

// Move assignment operator
CRDT &operator=(CRDT &&other) noexcept {
Expand All @@ -808,6 +829,7 @@ class CRDT : public std::enable_shared_from_this<CRDT<K, V, MergeRuleType, Chang
merge_rule_ = std::move(other.merge_rule_);
change_comparator_ = std::move(other.change_comparator_);
sort_func_ = std::move(other.sort_func_);
merge_context_ = std::move(other.merge_context_);
}
return *this;
}
Expand All @@ -820,11 +842,15 @@ class CRDT : public std::enable_shared_from_this<CRDT<K, V, MergeRuleType, Chang

// our clock won't be shared with the parent
// we optionally allow to merge from the parent or push to the parent
std::shared_ptr<CRDT<K, V, MergeRuleType, ChangeComparatorType, SortFunctionType>> parent_;
std::shared_ptr<CRDT<K, V, MergeContext, MergeRuleType, ChangeComparatorType, SortFunctionType>> parent_;
uint64_t base_version_; // Tracks the parent's db_version at the time of child creation
MergeRuleType merge_rule_;
ChangeComparatorType change_comparator_;
SortFunctionType sort_func_;
std::conditional_t<std::is_void_v<MergeContext>,
std::monostate, // Use monostate for void case
MergeContext>
merge_context_;

// Helper function to print values
template <typename T> static void print_value(const T &value) {
Expand Down Expand Up @@ -952,7 +978,7 @@ class CRDT : public std::enable_shared_from_this<CRDT<K, V, MergeRuleType, Chang
/// A vector of inverse `Change` objects.
CrdtVector<Change<K, V>>
invert_changes(const CrdtVector<Change<K, V>> &changes,
const CRDT<K, V, MergeRuleType, ChangeComparatorType, SortFunctionType> &reference_crdt) const {
const CRDT<K, V, MergeContext, MergeRuleType, ChangeComparatorType, SortFunctionType> &reference_crdt) const {
CrdtVector<Change<K, V>> inverse_changes;

for (const auto &change : changes) {
Expand Down Expand Up @@ -1106,6 +1132,15 @@ class CRDT : public std::enable_shared_from_this<CRDT<K, V, MergeRuleType, Chang
add_to_container(*changes, Change<K, V>(record_id, std::nullopt, std::nullopt, 1, db_version, node_id_, db_version, flags));
}
}

// Helper to handle merge rule calls with/without context
constexpr bool should_accept_change(const Change<K, V> &local, const Change<K, V> &remote) {
if constexpr (std::is_void_v<MergeContext>) {
return merge_rule_(local, remote);
} else {
return merge_rule_(local, remote, merge_context_);
}
}
};

/// Synchronizes two CRDT nodes.
Expand All @@ -1115,10 +1150,12 @@ class CRDT : public std::enable_shared_from_this<CRDT<K, V, MergeRuleType, Chang
///
/// Complexity: O(c + m), where c is the number of changes since last_db_version,
/// and m is the complexity of merge_changes
template <typename K, typename V, MergeRule<K, V> MergeRuleType = DefaultMergeRule<K, V>,
template <typename K, typename V, typename MergeContext = void,
MergeRule<K, V, MergeContext> MergeRuleType = DefaultMergeRule<K, V, MergeContext>,
ChangeComparator<K, V> ChangeComparatorType = DefaultChangeComparator<K, V>, typename SortFunctionType = DefaultSort>
constexpr void sync_nodes(CRDT<K, V, MergeRuleType, ChangeComparatorType, SortFunctionType> &source,
CRDT<K, V, MergeRuleType, ChangeComparatorType, SortFunctionType> &target, uint64_t &last_db_version) {
constexpr void sync_nodes(CRDT<K, V, MergeContext, MergeRuleType, ChangeComparatorType, SortFunctionType> &source,
CRDT<K, V, MergeContext, MergeRuleType, ChangeComparatorType, SortFunctionType> &target,
uint64_t &last_db_version) {
auto changes = source.get_changes_since(last_db_version);

// Update last_db_version to the current max db_version in source
Expand Down

0 comments on commit 2886165

Please sign in to comment.