Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a deduplication utility for raw buffers #36

Draft
wants to merge 8 commits into
base: auterion-router-latest
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ mavlink_routerd_SOURCES = \
src/common/conf_file.cpp \
src/common/conf_file.h \
src/common/dbg.h \
src/mavlink-router/dedup.cpp \
src/common/mavlink.h \
src/mavlink-router/endpoint.cpp \
src/mavlink-router/endpoint.h \
Expand Down Expand Up @@ -213,6 +214,8 @@ mainloop_test_SOURCES = \
src/mavlink-router/binlog.cpp \
src/mavlink-router/binlog.h \
src/mavlink-router/comm.h \
src/mavlink-router/dedup.h \
src/mavlink-router/dedup.cpp \
src/mavlink-router/endpoint.h \
src/mavlink-router/endpoint.cpp \
src/mavlink-router/endpoint.h \
Expand Down
94 changes: 94 additions & 0 deletions src/mavlink-router/dedup.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* This file is part of the MAVLink Router project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


#include "dedup.h"

#include <chrono>
#include <string>
#include <queue>
#include <unordered_set>

class DedupImpl {
using hash_t = uint64_t;
using time_t = uint32_t;
public:
DedupImpl() : _start_time(std::chrono::system_clock::now())
{
}

bool add_check_packet(const uint8_t* buffer, uint32_t size, uint32_t dedup_period_ms)
{
using namespace std::chrono;
time_t timestamp = duration_cast<milliseconds>(std::chrono::system_clock::now() - _start_time).count();
// pop data from front queue, delete corresponding data from multiset
while (_time_hash_queue.size() > 0 && timestamp > _time_hash_queue.front().first + dedup_period_ms) {
hash_t hash_to_delete = _time_hash_queue.front().second;
_packet_hash_set.erase(_packet_hash_set.find(hash_to_delete));
_time_hash_queue.pop();
}

// hash buffer
// TODO: with C++17 use a string_view instead, or use a custom hash function
_hash_buffer.assign((const char*)buffer, (uint64_t)size);
hash_t hash = std::hash<std::string>{}(_hash_buffer);

bool new_packet_hash = true;
if (_packet_hash_set.find(hash) == _packet_hash_set.end()) {
// add hash and timestamp to back of queue, and add hash to multiset
_packet_hash_set.insert(hash);
_time_hash_queue.emplace(timestamp, hash);
} else {
new_packet_hash = false;
}


return new_packet_hash;
}

private:
const std::chrono::time_point<std::chrono::system_clock> _start_time;
jkflying marked this conversation as resolved.
Show resolved Hide resolved

std::queue<std::pair<time_t, hash_t>> _time_hash_queue;
std::unordered_set<hash_t> _packet_hash_set;
std::string _hash_buffer;
};

Dedup::Dedup(uint32_t dedup_period_ms) : _dedup_period_ms(dedup_period_ms), _impl(new DedupImpl())
{
}

Dedup::~Dedup()
{
}

void Dedup::set_dedup_period(uint32_t dedup_period_ms)
{
_dedup_period_ms = dedup_period_ms;
}

Dedup::PacketStatus Dedup::add_check_packet(const uint8_t* buffer, uint32_t size)
{
// short circuit if disabled
if (_dedup_period_ms == 0) {
return PacketStatus::NEW_PACKET_OR_TIMED_OUT;
}

if (_impl->add_check_packet(buffer, size, _dedup_period_ms)) {
return PacketStatus::NEW_PACKET_OR_TIMED_OUT;
}
return PacketStatus::ALREADY_EXISTS_IN_BUFFER;
}
42 changes: 42 additions & 0 deletions src/mavlink-router/dedup.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* This file is part of the MAVLink Router project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/



#pragma once

#include <memory>

class DedupImpl;
class Dedup {
public:

enum class PacketStatus {
NEW_PACKET_OR_TIMED_OUT,
ALREADY_EXISTS_IN_BUFFER
};

Dedup(uint32_t dedup_period_ms = 0);
~Dedup();

void set_dedup_period(uint32_t dedup_period_ms);

PacketStatus add_check_packet(const uint8_t* buffer, uint32_t size);
private:

uint32_t _dedup_period_ms;
std::unique_ptr<DedupImpl> _impl;
};
13 changes: 10 additions & 3 deletions src/mavlink-router/endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,13 @@ int Endpoint::handle_read()

while ((r = read_msg(&buf, &target_sysid, &target_compid, &src_sysid,
&src_compid, &msg_id)) > 0) {
if (allowed_by_filter(msg_id) && allowed_by_dropout())
if (r != CrcError && allowed_by_filter(msg_id) && allowed_by_dropout() && allowed_by_dedup(&buf)) {
if (r == ReadOk) {
_add_sys_comp_id(((uint16_t)src_sysid << 8) | src_compid);
}
Mainloop::get_instance().route_msg(&buf, target_sysid, target_compid,
src_sysid, src_compid, msg_id);
}
}

return r;
Expand Down Expand Up @@ -232,9 +236,8 @@ int Endpoint::read_msg(struct buffer *pbuf, int *target_sysid, int *target_compi
if (!_check_crc(msg_entry)) {
_stat.read.crc_error++;
_stat.read.crc_error_bytes += expected_size;
return 0;
return CrcError;
}
_add_sys_comp_id(((uint16_t)*src_sysid << 8) | *src_compid);
}

_stat.read.handled++;
Expand Down Expand Up @@ -374,6 +377,10 @@ bool Endpoint::allowed_by_dropout()
return true;
}

bool Endpoint::allowed_by_dedup(const buffer* buf)
{
return Mainloop::get_instance().add_check_dedup(buf);
}

void Endpoint::postprocess_msg(int target_sysid, int target_compid, uint8_t src_sysid,
uint8_t src_compid, uint32_t msg_id)
Expand Down
4 changes: 4 additions & 0 deletions src/mavlink-router/endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class Endpoint : public Pollable {
enum read_msg_result {
ReadOk = 1,
ReadUnkownMsg,
CrcError
};

Endpoint(const std::string& name);
Expand Down Expand Up @@ -103,9 +104,12 @@ class Endpoint : public Pollable {
bool allowed_by_filter(uint32_t msg_id);
void add_message_to_filter(uint32_t msg_id) { _message_filter.push_back(msg_id); }
void add_message_to_nodelay(uint32_t msg_id) { _message_nodelay.push_back(msg_id); }

bool allowed_by_dropout();
void set_dropout_percentage (uint32_t dropout_percentage) { _dropout_percentage = dropout_percentage; }

bool allowed_by_dedup(const buffer* buf);

void start_expire_timer();

void reset_expire_timer();
Expand Down
6 changes: 6 additions & 0 deletions src/mavlink-router/mainloop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,12 @@ static bool _print_statistics_timeout_cb(void *data)
return true;
}

bool Mainloop::add_check_dedup(const buffer* buf)
{
return _dedup.add_check_packet(buf->data, buf->len) == Dedup::PacketStatus::NEW_PACKET_OR_TIMED_OUT;
}


bool Mainloop::remove_dynamic_endpoint(Endpoint *endpoint)
{
if (!endpoint) {
Expand Down
5 changes: 5 additions & 0 deletions src/mavlink-router/mainloop.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include "binlog.h"
#include "comm.h"
#include "dedup.h"
#include "endpoint.h"
#include "timeout.h"
#include "ulog.h"
Expand Down Expand Up @@ -86,6 +87,8 @@ class Mainloop {
bool remove_dynamic_endpoint(const dynamic_command& command);
bool remove_dynamic_endpoint(Endpoint *endpoint);

bool add_check_dedup(const buffer* buf);

void print_statistics();

int epollfd = -1;
Expand Down Expand Up @@ -135,6 +138,8 @@ class Mainloop {

Timeout *_timeouts = nullptr;

Dedup _dedup;

std::atomic<bool> _should_exit {false};

struct {
Expand Down