Skip to content

Commit

Permalink
refactor(crdt): Improve CRDT operations and add diff method
Browse files Browse the repository at this point in the history
Simplify revert() method, move invert_changes() to private, add diff()
method to compute differences between CRDTs, and make minor adjustments
to copy constructor and assignment operator.
  • Loading branch information
sinkingsugar committed Oct 10, 2024
1 parent e68d948 commit 4046d65
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 117 deletions.
185 changes: 111 additions & 74 deletions crdt.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,98 +225,63 @@ class CRDT : public std::enable_shared_from_this<CRDT<K, V, MergeRuleType, Chang
apply_changes(std::move(changes));
}

/// Generates inverse changes for a given set of changes based on a reference CRDT state.
/// Reverts all changes made by this CRDT since it was created from the parent.
///
/// # Arguments
/// # Returns
///
/// * `changes` - A vector of changes to invert.
/// * `reference_crdt` - A reference CRDT to use as the base state for inversion.
/// A vector of `Change` objects representing the inverse changes needed to undo the child's changes.
///
/// # Returns
/// # Complexity
///
/// A vector of inverse `Change` objects.
CrdtVector<Change<K, V>>
invert_changes(const CrdtVector<Change<K, V>> &changes,
const CRDT<K, V, MergeRuleType, ChangeComparatorType, SortFunctionType> &reference_crdt) {
CrdtVector<Change<K, V>> inverse_changes;

for (const auto &change : changes) {
const K &record_id = change.record_id;
const std::optional<CrdtString> &col_name = change.col_name;
const std::optional<V> &value = change.value;

if (!col_name.has_value()) {
// The change was a record deletion (tombstone)
// To revert, restore the record's state from the reference CRDT
auto record_ptr = reference_crdt.get_record(record_id);
if (record_ptr) {
// Restore all fields from the record
for (const auto &[ref_col, ref_val] : record_ptr->fields) {
inverse_changes.emplace_back(Change<K, V>(record_id, ref_col, ref_val,
record_ptr->column_versions.at(ref_col).col_version,
record_ptr->column_versions.at(ref_col).db_version, node_id_,
record_ptr->column_versions.at(ref_col).local_db_version));
}
// Remove the tombstone
inverse_changes.emplace_back(Change<K, V>(record_id, std::nullopt, std::nullopt,
0, // Column version 0 signifies removal of tombstone
clock_.current_time(), node_id_));
}
} else {
// The change was an insertion or update of a column
CrdtString col = *col_name;
auto record_ptr = reference_crdt.get_record(record_id);
if (record_ptr) {
auto field_it = record_ptr->fields.find(col);
if (field_it != record_ptr->fields.end()) {
// The record has a value for this column in the reference; set it back to the reference's value
inverse_changes.emplace_back(Change<K, V>(
record_id, col, field_it->second, record_ptr->column_versions.at(col).col_version,
record_ptr->column_versions.at(col).db_version, node_id_, record_ptr->column_versions.at(col).local_db_version));
} else {
// The record does not have this column in the reference; delete it to revert
inverse_changes.emplace_back(Change<K, V>(record_id, col,
std::nullopt, // Indicates deletion
0, // Column version 0 signifies deletion
clock_.current_time(), node_id_));
}
} else {
// The record does not exist in the reference; remove the entire record to revert
inverse_changes.emplace_back(Change<K, V>(record_id, std::nullopt, std::nullopt,
0, // Column version 0 signifies a tombstone
clock_.current_time(), node_id_));
}
}
/// O(c), where c is the number of changes since `base_version_`
constexpr CrdtVector<Change<K, V>> revert() {
if (!parent_) {
throw std::runtime_error("Cannot revert without a parent CRDT.");
}

return inverse_changes;
// Step 1: Retrieve all changes made by the child since base_version_
CrdtVector<Change<K, V>> child_changes = this->get_changes_since(base_version_);

// Step 2: Generate inverse changes using the parent CRDT
return invert_changes(child_changes, *parent_);
}

/// Reverts all changes made by this CRDT since it was created from the parent.
/// Computes the difference between this CRDT and another CRDT.
///
/// # Arguments
///
/// * `other` - The CRDT to compare against.
///
/// # Returns
///
/// A vector of `Change` objects representing the inverse changes needed to undo the child's changes.
/// A vector of `Change` objects representing the changes needed to transform this CRDT into the other CRDT.
///
/// # Complexity
///
/// O(c), where c is the number of changes since `base_version_`
constexpr CrdtVector<Change<K, V>>
revert(const CRDT<K, V, MergeRuleType, ChangeComparatorType, SortFunctionType> *override_parent = nullptr) {
const CRDT<K, V, MergeRuleType, ChangeComparatorType, SortFunctionType> *reference_crdt =
override_parent ? override_parent : parent_;
/// O(c), where c is the number of changes since the common ancestor
constexpr CrdtVector<Change<K, V>> diff(const CRDT<K, V, MergeRuleType, ChangeComparatorType, SortFunctionType> &other) const {
// Find the common ancestor (lowest common db_version)
uint64_t common_version = std::min(clock_.current_time(), other.clock_.current_time());

if (!reference_crdt) {
throw std::runtime_error("Cannot revert without a parent CRDT or override parent.");
}
// Get changes from this CRDT since the common ancestor
CrdtVector<Change<K, V>> this_changes = this->get_changes_since(common_version);

// Step 1: Retrieve all changes made by the child since base_version_
CrdtVector<Change<K, V>> child_changes = this->get_changes_since(base_version_);
// Get changes from the other CRDT since the common ancestor
CrdtVector<Change<K, V>> other_changes = other.get_changes_since(common_version);

// Step 2: Generate inverse changes using the reference CRDT
CrdtVector<Change<K, V>> inverse_changes = invert_changes(child_changes, *reference_crdt);
// Invert the changes from this CRDT
CrdtVector<Change<K, V>> inverted_this_changes = invert_changes(this_changes, other);

return inverse_changes;
// Combine the inverted changes from this CRDT with the changes from the other CRDT
CrdtVector<Change<K, V>> diff_changes;
diff_changes.reserve(inverted_this_changes.size() + other_changes.size());
diff_changes.insert(diff_changes.end(), inverted_this_changes.begin(), inverted_this_changes.end());
diff_changes.insert(diff_changes.end(), other_changes.begin(), other_changes.end());

// Compress the changes to remove redundant operations
compress_changes(diff_changes);

return diff_changes;
}

/// Inserts a new record or updates an existing record in the CRDT.
Expand Down Expand Up @@ -355,6 +320,7 @@ class CRDT : public std::enable_shared_from_this<CRDT<K, V, MergeRuleType, Chang
col_version = ++col_it->second.col_version;
col_it->second.db_version = db_version;
col_it->second.node_id = node_id_;
col_it->second.local_db_version = db_version;
} else {
col_version = 1;
record.column_versions.emplace(col_name, ColumnVersion(col_version, db_version, node_id_, db_version));
Expand Down Expand Up @@ -875,6 +841,77 @@ class CRDT : public std::enable_shared_from_this<CRDT<K, V, MergeRuleType, Chang
return parent_ ? parent_->get_record_ptr(record_id) : nullptr;
}
}

/// Generates inverse changes for a given set of changes based on a reference CRDT state.
///
/// # Arguments
///
/// * `changes` - A vector of changes to invert.
/// * `reference_crdt` - A reference CRDT to use as the base state for inversion.
///
/// # Returns
///
/// A vector of inverse `Change` objects.
CrdtVector<Change<K, V>>
invert_changes(const CrdtVector<Change<K, V>> &changes,
const CRDT<K, V, MergeRuleType, ChangeComparatorType, SortFunctionType> &reference_crdt) const {
CrdtVector<Change<K, V>> inverse_changes;

for (const auto &change : changes) {
const K &record_id = change.record_id;
const std::optional<CrdtString> &col_name = change.col_name;
const std::optional<V> &value = change.value;

if (!col_name.has_value()) {
// The change was a record deletion (tombstone)
// To revert, restore the record's state from the reference CRDT
auto record_ptr = reference_crdt.get_record(record_id);
if (record_ptr) {
// Restore all fields from the record, sorted by db_version
std::vector<std::pair<CrdtString, V>> sorted_fields(record_ptr->fields.begin(), record_ptr->fields.end());
std::sort(sorted_fields.begin(), sorted_fields.end(), [&](const auto &a, const auto &b) {
return record_ptr->column_versions.at(a.first).db_version < record_ptr->column_versions.at(b.first).db_version;
});
for (const auto &[ref_col, ref_val] : sorted_fields) {
inverse_changes.emplace_back(Change<K, V>(record_id, ref_col, ref_val,
record_ptr->column_versions.at(ref_col).col_version,
record_ptr->column_versions.at(ref_col).db_version, node_id_,
record_ptr->column_versions.at(ref_col).local_db_version));
}
// Remove the tombstone
inverse_changes.emplace_back(Change<K, V>(record_id, std::nullopt, std::nullopt,
0, // Column version 0 signifies removal of tombstone
clock_.current_time(), node_id_));
}
} else {
// The change was an insertion or update of a column
CrdtString col = *col_name;
auto record_ptr = reference_crdt.get_record(record_id);
if (record_ptr) {
auto field_it = record_ptr->fields.find(col);
if (field_it != record_ptr->fields.end()) {
// The record has a value for this column in the reference; set it back to the reference's value
inverse_changes.emplace_back(Change<K, V>(
record_id, col, field_it->second, record_ptr->column_versions.at(col).col_version,
record_ptr->column_versions.at(col).db_version, node_id_, record_ptr->column_versions.at(col).local_db_version));
} else {
// The record does not have this column in the reference; delete it to revert
inverse_changes.emplace_back(Change<K, V>(record_id, col,
std::nullopt, // Indicates deletion
0, // Column version 0 signifies deletion
clock_.current_time(), node_id_));
}
} else {
// The record does not exist in the reference; remove the entire record to revert
inverse_changes.emplace_back(Change<K, V>(record_id, std::nullopt, std::nullopt,
0, // Column version 0 signifies a tombstone
clock_.current_time(), node_id_));
}
}
}

return inverse_changes;
}
};

/// Synchronizes two CRDT nodes.
Expand Down
95 changes: 52 additions & 43 deletions tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1108,49 +1108,6 @@ int main() {
std::cout << "Test 'Parent Deletion Prevents Child Insertions' passed." << std::endl;
}

// Test Case 1: Reverting a Child CRDT Restores Parent's State
{
// Step 1: Initialize Parent CRDT
CRDT<CrdtString, CrdtString> parent_crdt(1);
CrdtString record_id_parent = generate_uuid();
CrdtMap<CrdtString, CrdtString> parent_fields = {{"id", record_id_parent}, {"parent_field", "parent_value"}};
parent_crdt.insert_or_update(record_id_parent, std::move(parent_fields));

// Step 2: Initialize Child CRDT with Parent
auto parent_ptr = std::make_shared<CRDT<CrdtString, CrdtString>>(parent_crdt);
CRDT<CrdtString, CrdtString> child_crdt(2, parent_ptr);

// Step 3: Modify Child CRDT
CrdtMap<CrdtString, CrdtString> child_fields = {{"child_field1", "child_value1"}, {"child_field2", "child_value2"}};
child_crdt.insert_or_update(record_id_parent, std::move(child_fields));

// Verify Child has additional fields
assert_true(child_crdt.get_data().at(record_id_parent).fields.at("child_field1") == "child_value1",
"Revert Test 1: Child should have 'child_field1' with 'child_value1'");
assert_true(child_crdt.get_data().at(record_id_parent).fields.at("child_field2") == "child_value2",
"Revert Test 1: Child should have 'child_field2' with 'child_value2'");

// // Step 4: Revert Child CRDT
// CrdtVector<Change<CrdtString, CrdtString>> inverse_changes = child_crdt.revert();

//! Cannot work because inverse_changes is in a special format that cannot be simply merged back into the CRDT
//! it is meant to be used by the application layer to revert changes, not by the CRDT itself for now

// // Apply inverse changes to child CRDT to undo modifications
// child_crdt.merge_changes(std::move(inverse_changes), true);

// // Step 5: Validate States
// // Child should now match the parent
// assert_true(child_crdt.get_data().at(record_id_parent).fields == parent_crdt.get_data().at(record_id_parent).fields,
// "Revert Test 1: Child's fields should match parent's fields after revert");

// // Parent remains unchanged
// assert_true(parent_crdt.get_data().at(record_id_parent).fields.at("parent_field") == "parent_value",
// "Revert Test 1: Parent's 'parent_field' should remain 'parent_value'");

// std::cout << "Test 'Reverting a Child CRDT Restores Parent's State' passed." << std::endl;
}

// Test Case 1: Compress with No Changes
{
CrdtVector<Change<CrdtString, CrdtString>> changes;
Expand Down Expand Up @@ -1337,6 +1294,58 @@ int main() {
std::cout << "Test 'Mixed Inserts, Updates, and Deletions Across Multiple Records' passed." << std::endl;
}

// Test Case: Diffing between CRDTs using revert
{
// Step 1: Initialize Parent CRDT
CRDT<CrdtString, CrdtString> parent_crdt(1);
CrdtString record_id = generate_uuid();
CrdtMap<CrdtString, CrdtString> parent_fields = {{"id", record_id}, {"parent_field", "parent_value"}};
parent_crdt.insert_or_update(record_id, std::move(parent_fields));

// Step 2: Initialize Child CRDT with Parent
CRDT<CrdtString, CrdtString> child_crdt(parent_crdt);

// Step 3: Modify Child CRDT
CrdtMap<CrdtString, CrdtString> child_fields = {
{"child_field1", "child_value1"},
{"child_field2", "child_value2"},
{"parent_field", "updated_parent_value"}
};
child_crdt.insert_or_update(record_id, std::move(child_fields));

// Step 4: Generate diff using revert
auto diff = child_crdt.diff(parent_crdt);

// Step 5: Assert on the contents of the diff
assert_true(diff.size() == 3, "Diff should contain 3 changes");

// Helper function to find a change in the diff
auto find_change = [&diff](const CrdtString& field_name) -> const Change<CrdtString, CrdtString>* {
auto it = std::find_if(diff.begin(), diff.end(), [&field_name](const auto& change) {
return change.col_name && *change.col_name == field_name;
});
return it != diff.end() ? &(*it) : nullptr;
};

// Check for child_field1
auto child_field1_change = find_change("child_field1");
assert_true(child_field1_change != nullptr, "Diff should contain change for child_field1");
assert_true(!child_field1_change->value.has_value(), "child_field1 should be marked for deletion");

// Check for child_field2
auto child_field2_change = find_change("child_field2");
assert_true(child_field2_change != nullptr, "Diff should contain change for child_field2");
assert_true(!child_field2_change->value.has_value(), "child_field2 should be marked for deletion");

// Check for parent_field
auto parent_field_change = find_change("parent_field");
assert_true(parent_field_change != nullptr, "Diff should contain change for parent_field");
assert_true(parent_field_change->value.has_value() && *parent_field_change->value == "parent_value",
"parent_field should be reverted to 'parent_value'");

std::cout << "Test 'Diffing between CRDTs using revert' passed." << std::endl;
}

// Test Case 10: Compression Order Verification
{
CrdtVector<Change<CrdtString, CrdtString>> changes;
Expand Down

0 comments on commit 4046d65

Please sign in to comment.