Skip to content

Commit

Permalink
[DSR/API] fix issues in join_delta_node and join_delta_edge handling …
Browse files Browse the repository at this point in the history
…unordered reception
  • Loading branch information
JuanCarlosgg committed Oct 2, 2023
1 parent decd148 commit 2bc27f4
Showing 1 changed file with 24 additions and 18 deletions.
42 changes: 24 additions & 18 deletions api/dsr_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//

#include <dsr/api/dsr_api.h>
#include <dsr/core/types/crdt_types.h>
#include <iostream>
#include <unistd.h>
#include <algorithm>
Expand Down Expand Up @@ -949,11 +950,13 @@ void DSRGraph::join_delta_node(IDL::MvregNode &&mvreg)
decltype(unprocessed_delta_edge_from)::node_type node_handle = std::move(unprocessed_delta_edge_from.extract(id));
while (!node_handle.empty())
{
unprocessed_delta_edge_att.erase(std::tuple{id, std::get<0>(node_handle.mapped()), std::get<1>(node_handle.mapped())});
node_handle = std::move(unprocessed_delta_edge_from.extract(id));
}

std::erase_if(unprocessed_delta_edge_to,
[&](auto &it){ return std::get<0>(it.second) == id;});
std::erase_if(unprocessed_delta_edge_att,
[&](auto &it){ return std::get<0>(it.first) == id or std::get<1>(it.first) == id;});
};


Expand All @@ -977,7 +980,6 @@ void DSRGraph::join_delta_node(IDL::MvregNode &&mvreg)
auto att_key = std::tuple{id, to, type};
//std::cout << "[OLD] Procesando delta edge almacenado (unprocessed_delta_edge_from) " << id<< ", " <<to << ", " <<type <<", "<<std::boolalpha << (timestamp < timestamp_edge) << std::endl;
if (timestamp < timestamp_edge) {
//TODO: este se debería hacer después de insertar el nodo?
if (process_delta_edge(id, to, type, std::move(delta))) map_new_to_edges.emplace(std::pair<uint64_t, std::string>{to, type});
}
if (nodes.contains(id) and nodes.at(id).read_reg().fano().contains({to, type})) {
Expand All @@ -992,13 +994,11 @@ void DSRGraph::join_delta_node(IDL::MvregNode &&mvreg)
node_handle_edge_att = std::move(unprocessed_delta_edge_att.extract(att_key));
}
}
//TODO: Check
std::erase_if(unprocessed_delta_edge_to,
[to = to, id = id, type = type](auto& it) { return it.first == to && std::get<0>(it.second) == id && std::get<1>(it.second) == type;});
node_handle_edge = std::move(unprocessed_delta_edge_from.extract(id));
}

//TODO: Check
node_handle_edge = std::move(unprocessed_delta_edge_to.extract(id));
while (!node_handle_edge.empty()) {
auto &[from, type, delta, timestamp_edge] = node_handle_edge.mapped();
Expand Down Expand Up @@ -1114,15 +1114,22 @@ void DSRGraph::join_delta_edge(IDL::MvregEdge &&mvreg)

uint64_t timestamp = mvreg.timestamp();

//Clean remaining delta edges.
auto delete_unprocessed_deltas = [&](){
unprocessed_delta_edge_from.erase(from);
//1. Delete all the delta attr for the edge.
//2. Delete all unprocessed delta edges with the same key (from and to)
//unprocessed_delta_edge_from.erase(from); // This should not be needed.
unprocessed_delta_edge_att.erase(std::tuple{from, to, type});
std::erase_if(unprocessed_delta_edge_to,
[&](auto &it){ return it.first == to && std::get<0>(it.second) == from;});
[&](auto &it){ return it.first == to && std::get<0>(it.second) == from && type == std::get<1>(it.second);});
std::erase_if(unprocessed_delta_edge_to,
[&](auto &it){ return it.first == from && std::get<0>(it.second) == to && type == std::get<1>(it.second);});
};

//Consumes all delta attributes and deletes a possible previous delta from unprocessed map.
//Consumes all delta attributes and deletes a possible previous delta from unprocessed map.
auto consume_unprocessed_deltas = [&](){
//1. Consume all the delta attributes for the edge key
//2. Delete all unprocessed delta edges with the same key (from and to)
auto att_key = std::tuple{from, to, type};
decltype(unprocessed_delta_edge_att)::node_type node_handle_edge_att = std::move(unprocessed_delta_edge_att.extract(att_key));
while (!node_handle_edge_att.empty())
Expand All @@ -1134,11 +1141,10 @@ void DSRGraph::join_delta_edge(IDL::MvregEdge &&mvreg)
node_handle_edge_att = std::move(unprocessed_delta_edge_att.extract(att_key));
}

//auto [begin, end] = unprocessed_delta_edge.equal_range(from);
std::erase_if(unprocessed_delta_edge_to,
[&](auto &it){ return it.first == to && std::get<0>(it.second) == from && std::get<1>(it.second) == type; });
std::erase_if(unprocessed_delta_edge_from,
[&](auto &it){ return std::get<0>(it.second) == to && std::get<1>(it.second) == type; });
[&](auto &it){ return it.first == from && std::get<0>(it.second) == to && std::get<1>(it.second) == type; });
};


Expand All @@ -1159,41 +1165,41 @@ void DSRGraph::join_delta_edge(IDL::MvregEdge &&mvreg)
}

} else if (!dfrom and !dto) {
//We should receive the node later.
//We should receive the node later. To avoid storing more than one element on the unprocessed cache we use the mvreg join.
bool find = false;
for (auto [begin, end] = unprocessed_delta_edge_from.equal_range(from); begin != end; ++begin) { //There should not be many elements in this iteration
if (std::get<0>(begin->second) == to && std::get<1>(begin->second) == type){
std::get<2>(begin->second).join(std::move(crdt_delta));
std::cout << "JOIN_DELTA_EDGE ID(" << from<< ", "<< to <<", "<< type << ") JOIN UNPROCESSED" << std::endl;
std::get<2>(begin->second).join(::mvreg<CRDTEdge>(crdt_delta));
//std::cout << "JOIN_DELTA_EDGE ID(" << from<< ", "<< to <<", "<< type << ") JOIN UNPROCESSED" << std::endl;
find = true; break;
}
}

for (auto [begin, end] = unprocessed_delta_edge_from.equal_range(to); begin != end; ++begin) { //There should not be many elements in this iteration
if (std::get<0>(begin->second) == from && std::get<1>(begin->second) == type){
std::get<2>(begin->second).join(std::move(crdt_delta));
std::cout << "JOIN_DELTA_EDGE ID(" << from<< ", "<< to <<", "<< type << ") JOIN UNPROCESSED" << std::endl;
//std::cout << "JOIN_DELTA_EDGE ID(" << from<< ", "<< to <<", "<< type << ") JOIN UNPROCESSED" << std::endl;
find = true; break;
}
}

if (!find) {
std::cout << "JOIN_DELTA_EDGE ID(" << from<< ", "<< to <<", "<< type << ") INSERT UNPROCESSED " ;
//std::cout << "JOIN_DELTA_EDGE ID(" << from<< ", "<< to <<", "<< type << ") INSERT UNPROCESSED " ;
if (!cfrom) {
std::cout << "No existe from (" << from << ") unprocessed_delta_edge_from" << std::endl;
//std::cout << "No existe from (" << from << ") unprocessed_delta_edge_from" << std::endl;
unprocessed_delta_edge_from.emplace(from, std::tuple{to, type, crdt_delta, timestamp});
}
if (cfrom && !cto)
{
std::cout << "No existe to (" << to << ") unprocessed_delta_edge_to" << std::endl;
//std::cout << "No existe to (" << to << ") unprocessed_delta_edge_to" << std::endl;
unprocessed_delta_edge_to.emplace(to, std::tuple{from, type, std::move(crdt_delta), timestamp});
}
} else {
// We should sync the deleted_nodes set too...
std::cout <<"TODO: Unhandled" << std::endl;
std::cout <<"TODO: Unhandled, we should sync the deleted_nodes set" << std::endl;
}
} else {
std::cout << "ELIMINADO, BORRAR DE UNPROCESSED" << std::endl;
//std::cout << "THE EDGE IS PART OF A DELETED NODE, CLEAN FROM UNPROCESSED" << std::endl;
auto node_deleted = [&](uint64_t id){
unprocessed_delta_edge_from.erase(id);
unprocessed_delta_node_att.erase(id);
Expand Down

0 comments on commit 2bc27f4

Please sign in to comment.