From 2bc27f4ad53f5f62f3aba3d99b891a85354953c5 Mon Sep 17 00:00:00 2001 From: juancarlosgg Date: Mon, 2 Oct 2023 18:26:33 +0200 Subject: [PATCH] [DSR/API] fix issues in join_delta_node and join_delta_edge handling unordered reception --- api/dsr_api.cpp | 42 ++++++++++++++++++++++++------------------ 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/api/dsr_api.cpp b/api/dsr_api.cpp index 842c5d9..be76b97 100644 --- a/api/dsr_api.cpp +++ b/api/dsr_api.cpp @@ -3,6 +3,7 @@ // #include +#include #include #include #include @@ -949,11 +950,13 @@ void DSRGraph::join_delta_node(IDL::MvregNode &&mvreg) decltype(unprocessed_delta_edge_from)::node_type node_handle = std::move(unprocessed_delta_edge_from.extract(id)); while (!node_handle.empty()) { - unprocessed_delta_edge_att.erase(std::tuple{id, std::get<0>(node_handle.mapped()), std::get<1>(node_handle.mapped())}); node_handle = std::move(unprocessed_delta_edge_from.extract(id)); } + std::erase_if(unprocessed_delta_edge_to, [&](auto &it){ return std::get<0>(it.second) == id;}); + std::erase_if(unprocessed_delta_edge_att, + [&](auto &it){ return std::get<0>(it.first) == id or std::get<1>(it.first) == id;}); }; @@ -977,7 +980,6 @@ void DSRGraph::join_delta_node(IDL::MvregNode &&mvreg) auto att_key = std::tuple{id, to, type}; //std::cout << "[OLD] Procesando delta edge almacenado (unprocessed_delta_edge_from) " << id<< ", " <{to, type}); } if (nodes.contains(id) and nodes.at(id).read_reg().fano().contains({to, type})) { @@ -992,13 +994,11 @@ void DSRGraph::join_delta_node(IDL::MvregNode &&mvreg) node_handle_edge_att = std::move(unprocessed_delta_edge_att.extract(att_key)); } } - //TODO: Check std::erase_if(unprocessed_delta_edge_to, [to = to, id = id, type = type](auto& it) { return it.first == to && std::get<0>(it.second) == id && std::get<1>(it.second) == type;}); node_handle_edge = std::move(unprocessed_delta_edge_from.extract(id)); } - //TODO: Check node_handle_edge = std::move(unprocessed_delta_edge_to.extract(id)); while (!node_handle_edge.empty()) { auto &[from, type, delta, timestamp_edge] = node_handle_edge.mapped(); @@ -1114,15 +1114,22 @@ void DSRGraph::join_delta_edge(IDL::MvregEdge &&mvreg) uint64_t timestamp = mvreg.timestamp(); + //Clean remaining delta edges. auto delete_unprocessed_deltas = [&](){ - unprocessed_delta_edge_from.erase(from); + //1. Delete all the delta attr for the edge. + //2. Delete all unprocessed delta edges with the same key (from and to) + //unprocessed_delta_edge_from.erase(from); // This should not be needed. unprocessed_delta_edge_att.erase(std::tuple{from, to, type}); std::erase_if(unprocessed_delta_edge_to, - [&](auto &it){ return it.first == to && std::get<0>(it.second) == from;}); + [&](auto &it){ return it.first == to && std::get<0>(it.second) == from && type == std::get<1>(it.second);}); + std::erase_if(unprocessed_delta_edge_to, + [&](auto &it){ return it.first == from && std::get<0>(it.second) == to && type == std::get<1>(it.second);}); }; - //Consumes all delta attributes and deletes a possible previous delta from unprocessed map. + //Consumes all delta attributes and deletes a possible previous delta from unprocessed map. auto consume_unprocessed_deltas = [&](){ + //1. Consume all the delta attributes for the edge key + //2. Delete all unprocessed delta edges with the same key (from and to) auto att_key = std::tuple{from, to, type}; decltype(unprocessed_delta_edge_att)::node_type node_handle_edge_att = std::move(unprocessed_delta_edge_att.extract(att_key)); while (!node_handle_edge_att.empty()) @@ -1134,11 +1141,10 @@ void DSRGraph::join_delta_edge(IDL::MvregEdge &&mvreg) node_handle_edge_att = std::move(unprocessed_delta_edge_att.extract(att_key)); } - //auto [begin, end] = unprocessed_delta_edge.equal_range(from); std::erase_if(unprocessed_delta_edge_to, [&](auto &it){ return it.first == to && std::get<0>(it.second) == from && std::get<1>(it.second) == type; }); std::erase_if(unprocessed_delta_edge_from, - [&](auto &it){ return std::get<0>(it.second) == to && std::get<1>(it.second) == type; }); + [&](auto &it){ return it.first == from && std::get<0>(it.second) == to && std::get<1>(it.second) == type; }); }; @@ -1159,12 +1165,12 @@ void DSRGraph::join_delta_edge(IDL::MvregEdge &&mvreg) } } else if (!dfrom and !dto) { - //We should receive the node later. + //We should receive the node later. To avoid storing more than one element on the unprocessed cache we use the mvreg join. bool find = false; for (auto [begin, end] = unprocessed_delta_edge_from.equal_range(from); begin != end; ++begin) { //There should not be many elements in this iteration if (std::get<0>(begin->second) == to && std::get<1>(begin->second) == type){ - std::get<2>(begin->second).join(std::move(crdt_delta)); - std::cout << "JOIN_DELTA_EDGE ID(" << from<< ", "<< to <<", "<< type << ") JOIN UNPROCESSED" << std::endl; + std::get<2>(begin->second).join(::mvreg(crdt_delta)); + //std::cout << "JOIN_DELTA_EDGE ID(" << from<< ", "<< to <<", "<< type << ") JOIN UNPROCESSED" << std::endl; find = true; break; } } @@ -1172,28 +1178,28 @@ void DSRGraph::join_delta_edge(IDL::MvregEdge &&mvreg) for (auto [begin, end] = unprocessed_delta_edge_from.equal_range(to); begin != end; ++begin) { //There should not be many elements in this iteration if (std::get<0>(begin->second) == from && std::get<1>(begin->second) == type){ std::get<2>(begin->second).join(std::move(crdt_delta)); - std::cout << "JOIN_DELTA_EDGE ID(" << from<< ", "<< to <<", "<< type << ") JOIN UNPROCESSED" << std::endl; + //std::cout << "JOIN_DELTA_EDGE ID(" << from<< ", "<< to <<", "<< type << ") JOIN UNPROCESSED" << std::endl; find = true; break; } } if (!find) { - std::cout << "JOIN_DELTA_EDGE ID(" << from<< ", "<< to <<", "<< type << ") INSERT UNPROCESSED " ; + //std::cout << "JOIN_DELTA_EDGE ID(" << from<< ", "<< to <<", "<< type << ") INSERT UNPROCESSED " ; if (!cfrom) { - std::cout << "No existe from (" << from << ") unprocessed_delta_edge_from" << std::endl; + //std::cout << "No existe from (" << from << ") unprocessed_delta_edge_from" << std::endl; unprocessed_delta_edge_from.emplace(from, std::tuple{to, type, crdt_delta, timestamp}); } if (cfrom && !cto) { - std::cout << "No existe to (" << to << ") unprocessed_delta_edge_to" << std::endl; + //std::cout << "No existe to (" << to << ") unprocessed_delta_edge_to" << std::endl; unprocessed_delta_edge_to.emplace(to, std::tuple{from, type, std::move(crdt_delta), timestamp}); } } else { // We should sync the deleted_nodes set too... - std::cout <<"TODO: Unhandled" << std::endl; + std::cout <<"TODO: Unhandled, we should sync the deleted_nodes set" << std::endl; } } else { - std::cout << "ELIMINADO, BORRAR DE UNPROCESSED" << std::endl; + //std::cout << "THE EDGE IS PART OF A DELETED NODE, CLEAN FROM UNPROCESSED" << std::endl; auto node_deleted = [&](uint64_t id){ unprocessed_delta_edge_from.erase(id); unprocessed_delta_node_att.erase(id);