diff --git a/Makefile.am b/Makefile.am index 47fdd92b..99a02876 100644 --- a/Makefile.am +++ b/Makefile.am @@ -139,6 +139,7 @@ mavlink_routerd_SOURCES = \ src/common/conf_file.cpp \ src/common/conf_file.h \ src/common/dbg.h \ + src/mavlink-router/dedup.cpp \ src/common/mavlink.h \ src/mavlink-router/endpoint.cpp \ src/mavlink-router/endpoint.h \ @@ -213,6 +214,8 @@ mainloop_test_SOURCES = \ src/mavlink-router/binlog.cpp \ src/mavlink-router/binlog.h \ src/mavlink-router/comm.h \ + src/mavlink-router/dedup.h \ + src/mavlink-router/dedup.cpp \ src/mavlink-router/endpoint.h \ src/mavlink-router/endpoint.cpp \ src/mavlink-router/endpoint.h \ diff --git a/src/mavlink-router/dedup.cpp b/src/mavlink-router/dedup.cpp new file mode 100644 index 00000000..031f9bbf --- /dev/null +++ b/src/mavlink-router/dedup.cpp @@ -0,0 +1,94 @@ +/* + * This file is part of the MAVLink Router project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include "dedup.h" + +#include +#include +#include +#include + +class DedupImpl { + using hash_t = uint64_t; + using time_t = uint32_t; +public: + DedupImpl() : _start_time(std::chrono::system_clock::now()) + { + } + + bool add_check_packet(const uint8_t* buffer, uint32_t size, uint32_t dedup_period_ms) + { + using namespace std::chrono; + time_t timestamp = duration_cast(std::chrono::system_clock::now() - _start_time).count(); + // pop data from front queue, delete corresponding data from multiset + while (_time_hash_queue.size() > 0 && timestamp > _time_hash_queue.front().first + dedup_period_ms) { + hash_t hash_to_delete = _time_hash_queue.front().second; + _packet_hash_set.erase(_packet_hash_set.find(hash_to_delete)); + _time_hash_queue.pop(); + } + + // hash buffer + // TODO: with C++17 use a string_view instead, or use a custom hash function + _hash_buffer.assign((const char*)buffer, (uint64_t)size); + hash_t hash = std::hash{}(_hash_buffer); + + bool new_packet_hash = true; + if (_packet_hash_set.find(hash) == _packet_hash_set.end()) { + // add hash and timestamp to back of queue, and add hash to multiset + _packet_hash_set.insert(hash); + _time_hash_queue.emplace(timestamp, hash); + } else { + new_packet_hash = false; + } + + + return new_packet_hash; + } + +private: + const std::chrono::time_point _start_time; + + std::queue> _time_hash_queue; + std::unordered_set _packet_hash_set; + std::string _hash_buffer; +}; + +Dedup::Dedup(uint32_t dedup_period_ms) : _dedup_period_ms(dedup_period_ms), _impl(new DedupImpl()) +{ +} + +Dedup::~Dedup() +{ +} + +void Dedup::set_dedup_period(uint32_t dedup_period_ms) +{ + _dedup_period_ms = dedup_period_ms; +} + +Dedup::PacketStatus Dedup::add_check_packet(const uint8_t* buffer, uint32_t size) +{ + // short circuit if disabled + if (_dedup_period_ms == 0) { + return PacketStatus::NEW_PACKET_OR_TIMED_OUT; + } + + if (_impl->add_check_packet(buffer, size, _dedup_period_ms)) { + return PacketStatus::NEW_PACKET_OR_TIMED_OUT; + } + return PacketStatus::ALREADY_EXISTS_IN_BUFFER; +} diff --git a/src/mavlink-router/dedup.h b/src/mavlink-router/dedup.h new file mode 100644 index 00000000..692073f5 --- /dev/null +++ b/src/mavlink-router/dedup.h @@ -0,0 +1,42 @@ +/* + * This file is part of the MAVLink Router project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +#pragma once + +#include + +class DedupImpl; +class Dedup { +public: + + enum class PacketStatus { + NEW_PACKET_OR_TIMED_OUT, + ALREADY_EXISTS_IN_BUFFER + }; + + Dedup(uint32_t dedup_period_ms = 0); + ~Dedup(); + + void set_dedup_period(uint32_t dedup_period_ms); + + PacketStatus add_check_packet(const uint8_t* buffer, uint32_t size); +private: + + uint32_t _dedup_period_ms; + std::unique_ptr _impl; +}; diff --git a/src/mavlink-router/endpoint.cpp b/src/mavlink-router/endpoint.cpp index f2845f4d..e6d3d2d9 100644 --- a/src/mavlink-router/endpoint.cpp +++ b/src/mavlink-router/endpoint.cpp @@ -81,9 +81,13 @@ int Endpoint::handle_read() while ((r = read_msg(&buf, &target_sysid, &target_compid, &src_sysid, &src_compid, &msg_id)) > 0) { - if (allowed_by_filter(msg_id) && allowed_by_dropout()) + if (r != CrcError && allowed_by_filter(msg_id) && allowed_by_dropout() && allowed_by_dedup(&buf)) { + if (r == ReadOk) { + _add_sys_comp_id(((uint16_t)src_sysid << 8) | src_compid); + } Mainloop::get_instance().route_msg(&buf, target_sysid, target_compid, src_sysid, src_compid, msg_id); + } } return r; @@ -232,9 +236,8 @@ int Endpoint::read_msg(struct buffer *pbuf, int *target_sysid, int *target_compi if (!_check_crc(msg_entry)) { _stat.read.crc_error++; _stat.read.crc_error_bytes += expected_size; - return 0; + return CrcError; } - _add_sys_comp_id(((uint16_t)*src_sysid << 8) | *src_compid); } _stat.read.handled++; @@ -374,6 +377,10 @@ bool Endpoint::allowed_by_dropout() return true; } +bool Endpoint::allowed_by_dedup(const buffer* buf) +{ + return Mainloop::get_instance().add_check_dedup(buf); +} void Endpoint::postprocess_msg(int target_sysid, int target_compid, uint8_t src_sysid, uint8_t src_compid, uint32_t msg_id) diff --git a/src/mavlink-router/endpoint.h b/src/mavlink-router/endpoint.h index 496f9713..710f3287 100644 --- a/src/mavlink-router/endpoint.h +++ b/src/mavlink-router/endpoint.h @@ -74,6 +74,7 @@ class Endpoint : public Pollable { enum read_msg_result { ReadOk = 1, ReadUnkownMsg, + CrcError }; Endpoint(const std::string& name); @@ -103,9 +104,12 @@ class Endpoint : public Pollable { bool allowed_by_filter(uint32_t msg_id); void add_message_to_filter(uint32_t msg_id) { _message_filter.push_back(msg_id); } void add_message_to_nodelay(uint32_t msg_id) { _message_nodelay.push_back(msg_id); } + bool allowed_by_dropout(); void set_dropout_percentage (uint32_t dropout_percentage) { _dropout_percentage = dropout_percentage; } + bool allowed_by_dedup(const buffer* buf); + void start_expire_timer(); void reset_expire_timer(); diff --git a/src/mavlink-router/mainloop.cpp b/src/mavlink-router/mainloop.cpp index d3aeb37c..7e30bb99 100644 --- a/src/mavlink-router/mainloop.cpp +++ b/src/mavlink-router/mainloop.cpp @@ -394,6 +394,12 @@ static bool _print_statistics_timeout_cb(void *data) return true; } +bool Mainloop::add_check_dedup(const buffer* buf) +{ + return _dedup.add_check_packet(buf->data, buf->len) == Dedup::PacketStatus::NEW_PACKET_OR_TIMED_OUT; +} + + bool Mainloop::remove_dynamic_endpoint(Endpoint *endpoint) { if (!endpoint) { diff --git a/src/mavlink-router/mainloop.h b/src/mavlink-router/mainloop.h index 308eb501..c742899d 100644 --- a/src/mavlink-router/mainloop.h +++ b/src/mavlink-router/mainloop.h @@ -25,6 +25,7 @@ #include "binlog.h" #include "comm.h" +#include "dedup.h" #include "endpoint.h" #include "timeout.h" #include "ulog.h" @@ -86,6 +87,8 @@ class Mainloop { bool remove_dynamic_endpoint(const dynamic_command& command); bool remove_dynamic_endpoint(Endpoint *endpoint); + bool add_check_dedup(const buffer* buf); + void print_statistics(); int epollfd = -1; @@ -135,6 +138,8 @@ class Mainloop { Timeout *_timeouts = nullptr; + Dedup _dedup; + std::atomic _should_exit {false}; struct {