-
Notifications
You must be signed in to change notification settings - Fork 118
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SHM transport: The longer it runs, The more latency increases. (Buffer Full Exception) #789
Comments
@MiguelCompany we have detected this issue with production environment. we will give it a shot if falling back to |
I have some questions about the code in Assuming there's currently only 1 free cell (that is, At this moment, a data will push to a cell. It will throw an exception "buffer full". At this time, the value of bool push(
const T& data)
{
// If no listeners the buffer is dropped
if (node_->registered_listeners_ == 0)
{
return false;
}
auto pointer = node_->pointer_.load(std::memory_order_relaxed);
// if free cells, increase the write pointer and decrease the free cells
while (pointer.ptr.free_cells > 0 &&
!node_->pointer_.compare_exchange_weak(pointer,
{ { inc_pointer(pointer.ptr.write_p), pointer.ptr.free_cells - 1 } },
std::memory_order_release,
std::memory_order_relaxed))
{
}
if (pointer.ptr.free_cells == 0)
{
throw std::runtime_error("Buffer full");
}
...
} If a listener pop cell, and it is the last listener on this cell, this cell can be reused ( bool pop()
{
...
// If all the listeners have read the cell
if (counter == 1)
{
// Increase the free cells => increase the global read pointer
auto pointer = buffer_.node_->pointer_.load(std::memory_order_relaxed);
while (!buffer_.node_->pointer_.compare_exchange_weak(pointer,
{ { pointer.ptr.write_p, pointer.ptr.free_cells + 1 } },
std::memory_order_release,
std::memory_order_relaxed))
{
}
} And then, a new data will push to a cell. Then we'll face the same issue --- the push didn't succeed, but the |
The exception would never be thrown if the current thread successfully updated the counter from 1 to 0. You might want to try upgrading all |
@Barry-Xu-2018 you are saying, this if statement should be moved above? diff --git a/src/cpp/rtps/transport/shared_mem/MultiProducerConsumerRingBuffer.hpp b/src/cpp/rtps/transport/shared_mem/MultiProducerConsumerRingBuffer.hpp
index 51e559155..52e2be7f4 100644
--- a/src/cpp/rtps/transport/shared_mem/MultiProducerConsumerRingBuffer.hpp
+++ b/src/cpp/rtps/transport/shared_mem/MultiProducerConsumerRingBuffer.hpp
@@ -216,6 +216,11 @@ public:
auto pointer = node_->pointer_.load(std::memory_order_relaxed);
+ if (pointer.ptr.free_cells == 0)
+ {
+ throw std::runtime_error("Buffer full");
+ }
+
// if free cells, increase the write pointer and decrease the free cells
while (pointer.ptr.free_cells > 0 &&
!node_->pointer_.compare_exchange_weak(pointer,
@@ -225,11 +230,6 @@ public:
{
}
- if (pointer.ptr.free_cells == 0)
- {
- throw std::runtime_error("Buffer full");
- }
-
auto& cell = cells_[get_pointer_value(pointer.ptr.write_p)];
cell.data(data); |
okay that guarantees that previous write operation with other threads before diff --git a/src/cpp/rtps/transport/shared_mem/MultiProducerConsumerRingBuffer.hpp b/src/cpp/rtps/transport/shared_mem/MultiProducerConsumerRingBuffer.hpp
index 51e559155..d2be85ada 100644
--- a/src/cpp/rtps/transport/shared_mem/MultiProducerConsumerRingBuffer.hpp
+++ b/src/cpp/rtps/transport/shared_mem/MultiProducerConsumerRingBuffer.hpp
@@ -85,7 +85,7 @@ public:
*/
Cell* head()
{
- auto pointer = buffer_.node_->pointer_.load(std::memory_order_relaxed);
+ auto pointer = buffer_.node_->pointer_.load(std::memory_order_acquire);
// If local read_pointer and write_pointer are equal => buffer is empty for this listener
if (read_p_ == pointer.ptr.write_p )
@@ -121,7 +121,7 @@ public:
if (counter == 1)
{
// Increase the free cells => increase the global read pointer
- auto pointer = buffer_.node_->pointer_.load(std::memory_order_relaxed);
+ auto pointer = buffer_.node_->pointer_.load(std::memory_order_acquire);
while (!buffer_.node_->pointer_.compare_exchange_weak(pointer,
{ { pointer.ptr.write_p, pointer.ptr.free_cells + 1 } },
std::memory_order_release,
@@ -214,7 +214,7 @@ public:
return false;
}
- auto pointer = node_->pointer_.load(std::memory_order_relaxed);
+ auto pointer = node_->pointer_.load(std::memory_order_acquire);
// if free cells, increase the write pointer and decrease the free cells
while (pointer.ptr.free_cells > 0 &&
@@ -240,12 +240,12 @@ public:
bool is_buffer_full()
{
- return (node_->pointer_.load(std::memory_order_relaxed).ptr.free_cells == 0);
+ return (node_->pointer_.load(std::memory_order_acquire).ptr.free_cells == 0);
}
bool is_buffer_empty()
{
- return (node_->pointer_.load(std::memory_order_relaxed).ptr.free_cells == node_->total_cells_);
+ return (node_->pointer_.load(std::memory_order_acquire).ptr.free_cells == node_->total_cells_);
}
/**
@@ -261,7 +261,7 @@ public:
// The new listener's read pointer is the current write pointer
auto listener = std::unique_ptr<Listener>(
new Listener(
- *this, node_->pointer_.load(std::memory_order_relaxed).ptr.write_p));
+ *this, node_->pointer_.load(std::memory_order_acquire).ptr.write_p));
node_->registered_listeners_++;
@@ -293,7 +293,7 @@ public:
{
if (node_->registered_listeners_ > 0)
{
- auto pointer = node_->pointer_.load(std::memory_order_relaxed);
+ auto pointer = node_->pointer_.load(std::memory_order_acquire);
uint32_t p = pointer_to_head(pointer);
|
Thanks. I misunderstand.
// if free cells, increase the write pointer and decrease the free cells
while (pointer.ptr.free_cells > 0 &&
!node_->pointer_.compare_exchange_weak(pointer,
{ { inc_pointer(pointer.ptr.write_p), pointer.ptr.free_cells - 1 } },
std::memory_order_release,
std::memory_order_relaxed))
{
}
if (pointer.ptr.free_cells == 0)
{
throw std::runtime_error("Buffer full");
} No. I think std::memory_order_relaxed is unsuitable, while compare_exchange_weak is failure. while (pointer.ptr.free_cells > 0 &&
!node_->pointer_.compare_exchange_weak(pointer,
{ { inc_pointer(pointer.ptr.write_p), pointer.ptr.free_cells - 1 } },
std::memory_order_release,
std::memory_order_acquire)) CC: @fujitatomoya This is my thought. |
Looking at the following example from the std::memory_order documentation makes me think that we only need the changes suggested by @fujitatomoya in #789 (comment) #include <atomic>
#include <cassert>
#include <thread>
#include <vector>
std::vector<int> data;
std::atomic<int> flag = {0};
void thread_1()
{
data.push_back(42);
flag.store(1, std::memory_order_release);
}
void thread_2()
{
int expected = 1;
// memory_order_relaxed is okay because this is an RMW,
// and RMWs (with any ordering) following a release form a release sequence
while (!flag.compare_exchange_strong(expected, 2, std::memory_order_relaxed))
{
expected = 1;
}
}
void thread_3()
{
while (flag.load(std::memory_order_acquire) < 2)
;
// if we read the value 2 from the atomic flag, we see 42 in the vector
[assert](http://en.cppreference.com/w/cpp/error/assert)(data.at(0) == 42); // will never fire
}
int main()
{
[std::thread](http://en.cppreference.com/w/cpp/thread/thread) a(thread_1);
[std::thread](http://en.cppreference.com/w/cpp/thread/thread) b(thread_2);
[std::thread](http://en.cppreference.com/w/cpp/thread/thread) c(thread_3);
a.join(); b.join(); c.join();
} |
Cell* head()
{
auto pointer = buffer_.node_->pointer_.load(std::memory_order_relaxed);
// If local read_pointer and write_pointer are equal => buffer is empty for this listener
if (read_p_ == pointer.ptr.write_p )
{
return nullptr;
}
auto cell = &buffer_.cells_[get_pointer_value(read_p_)];
return cell->ref_counter() != 0 ? cell : nullptr;
} If memory order is if memory order is Of course, to ensure order, I agree with changing to use |
i may be mistaken on some points...
not really sure what this means here.
this should not be happening, and compiler should not do that. if it does, it breaks the program sequence. |
I want to express that no reads or writes in the current thread can be reordered before this load.
Cell* head()
{
auto pointer = buffer_.node_->pointer_.load(std::memory_order_relaxed);
// If local read_pointer and write_pointer are equal => buffer is empty for this listener
if (read_p_ == pointer.ptr.write_p )
{
return nullptr;
}
auto cell = &buffer_.cells_[get_pointer_value(read_p_)];
return cell->ref_counter() != 0 ? cell : nullptr;
} |
@fujitatomoya Are you going to try the patch proposed in #789 (comment)? If you see improvements with that, I would then think of a regression test, and prepare a PR for Fast DDS with that |
@MiguelCompany we are still working on how situation changes with the patch and UDP transport, let me have a couple of days. thanks for checking. |
i believe that Buffer Full situation should not be true in the 1st place... |
CC: @Barry-Xu-2018 |
From your description on #789 (comment), this seems like a slowly growing snowball issue. It seems that from time to time, independently of the transport used, there are packet drops due to the transport buffers (UDP or SHM) being full. That would make the writer resend some data, making the snowball grow and leading to the buffers filling more quickly (since it now has to send both old and new data). The presence of gap messages is evidence that the history of the writer became full, and some samples were removed from the history before they were acknowledged by the readers. In this situation, increasing the buffers in UDP, or increasing both segment size and port capacity in SHM would improve the situation. My suggestion would be to start with UDP only and increase the buffers till you make the situation stable. Then add SHM (with the patch), and use the same size for the segments as you used for the UDP buffers |
totally agree.
we kinda already tried with the original requirement is that robots at site should be 24/7 running without power-off. but we have not tried with UDP send/receive by the way, there is a general question about Data Sharing delivery and SHM transport. with SHM transport, descriptors are stored in the port with mutex, and that descriptor describes where to access the data where is shared memory address of the participants who send the data. what if the data sharing delivery is enabled? i mean in this case, DataWriter allocates the shared memory segment based on the QoS configuration history and depth as in HistoryCache, and that is coupled with DataReader. i am trying to comprehend the memory management with data sharing delivery and SHM transport both enabled. |
we have tried with SHM segment size, but it does not solve the root cause... it just extends the time window until the problem happens. (current segment size is 10MB, but we can still meet the problem.) |
@fujitatomoya One thing you could try is reducing the heartbeat period on the publishers. You can do that by XML, see this section of the documentation. Reducing the |
could you share your thoughts on #789 (comment)
i am not sure if this is related, but would like to know how Fast-DDS deals with this configuration. |
@MiguelCompany thanks!
will give it a shot. |
The data-sharing mechanism is independent of the transports. |
Description
tf
/tf_static
topics start generating the dropping message warning, 'the timestamp on the message is earlier than all the data in the transform cache`.once it starts happening, the frequency just increases linearly as time goes.
this actually increases the delay and latency between publisher and subscribers significantly.
the reproduction rate is approximately 30%.
System Information
tf
/tf_static
topic: Publisher -15 and Subscriber 30 for each topic.XML configuration
Additional Information
port_queue_capacity
increases 4 times bigger, the 4 times longer it takes until the problem starts. (almost the same situation forsegment_size
) this does not avoid this original problem, it only can buy some time based on the configuration.The text was updated successfully, but these errors were encountered: