Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SHM transport: The longer it runs, The more latency increases. (Buffer Full Exception) #789

Open
fujitatomoya opened this issue Nov 20, 2024 · 21 comments

Comments

@fujitatomoya
Copy link
Collaborator

fujitatomoya commented Nov 20, 2024

Description

tf / tf_static topics start generating the dropping message warning, 'the timestamp on the message is earlier than all the data in the transform cache`.
once it starts happening, the frequency just increases linearly as time goes.
this actually increases the delay and latency between publisher and subscribers significantly.

the reproduction rate is approximately 30%.

System Information

XML configuration

<?xml version="1.0" encoding="UTF-8"?>
<dds xmlns="http://www.eprosima.com/XMLSchemas/fastRTPS_Profiles">
    <profiles>
        <transport_descriptors>
            <!-- Create a descriptor for the new transport -->
            <transport_descriptor>
                <transport_id>shm_transport</transport_id>
                <type>SHM</type>
                <segment_size>10485760</segment_size> 
                <port_queue_capacity>256</port_queue_capacity>
            </transport_descriptor>
        </transport_descriptors>
        <participant profile_name="SHMParticipant" is_default_profile="true">
            <rtps>
                <!-- Link the Transport Layer to the Participant -->
                <userTransports>
                  <transport_id>shm_transport</transport_id>
                </userTransports>
                <!-- <useBuiltinTransports>false</useBuiltinTransports> -->
            </rtps>
        </participant>
        <publisher profile_name="service">
            <qos>
                <reliability>
                    <max_blocking_time>
                        <sec>10</sec>
                    </max_blocking_time>
                </reliability>
            </qos>
        </publisher>
    </profiles>
</dds>

Additional Information

@fujitatomoya
Copy link
Collaborator Author

@MiguelCompany we have detected this issue with production environment. we will give it a shot if falling back to udp_transport see if the problem happens. do you happen know any known issue for this?

@Barry-Xu-2018
Copy link
Contributor

I have some questions about the code in class MultiProducerConsumerRingBuffer. If my understanding is incorrect, please correct me.

Assuming there's currently only 1 free cell (that is, free_cells is 1)

At this moment, a data will push to a cell. It will throw an exception "buffer full". At this time, the value of free_cells is changed from 1 to 0. So, even though no cell was used, the value of the free_cells still decreased by 1.

https://github.com/eProsima/Fast-DDS/blob/b1e4707ad3cfe4cad7e5100318402206e0bd5e78/src/cpp/rtps/transport/shared_mem/MultiProducerConsumerRingBuffer.hpp#L219-L232

    bool push(
            const T& data)
    {
        // If no listeners the buffer is dropped
        if (node_->registered_listeners_ == 0)
        {
            return false;
        }

        auto pointer = node_->pointer_.load(std::memory_order_relaxed);

        // if free cells, increase the write pointer and decrease the free cells
        while (pointer.ptr.free_cells > 0 &&
                !node_->pointer_.compare_exchange_weak(pointer,
                { { inc_pointer(pointer.ptr.write_p), pointer.ptr.free_cells - 1 } },
                std::memory_order_release,
                std::memory_order_relaxed))
        {
        }

        if (pointer.ptr.free_cells == 0)
        {
            throw std::runtime_error("Buffer full");
        }
  ...
 }

If a listener pop cell, and it is the last listener on this cell, this cell can be reused (free_cells should be increased).
At this time, free_cells become 1.
https://github.com/eProsima/Fast-DDS/blob/b1e4707ad3cfe4cad7e5100318402206e0bd5e78/src/cpp/rtps/transport/shared_mem/MultiProducerConsumerRingBuffer.hpp#L120-L131

        bool pop()
        {
            ...

            // If all the listeners have read the cell
            if (counter == 1)
            {
                // Increase the free cells => increase the global read pointer
                auto pointer = buffer_.node_->pointer_.load(std::memory_order_relaxed);
                while (!buffer_.node_->pointer_.compare_exchange_weak(pointer,
                        { { pointer.ptr.write_p, pointer.ptr.free_cells + 1 } },
                        std::memory_order_release,
                        std::memory_order_relaxed))
                {
                }
            }

And then, a new data will push to a cell. Then we'll face the same issue --- the push didn't succeed, but the free_cells are reduced to 0 again.
This will cause the available cells not to be used and will always be in a buffer full state.

@MiguelCompany
Copy link
Collaborator

At this moment, a data will push to a cell. It will throw an exception "buffer full".

The exception would never be thrown if the current thread successfully updated the counter from 1 to 0.
In that case, the local value of pointer.ptr.free_cells would be 1.

You might want to try upgrading all node_->pointer_.load() calls to std::memory_order_acquire, and check whether the situation improves

@fujitatomoya
Copy link
Collaborator Author

@Barry-Xu-2018 you are saying, this if statement should be moved above?

diff --git a/src/cpp/rtps/transport/shared_mem/MultiProducerConsumerRingBuffer.hpp b/src/cpp/rtps/transport/shared_mem/MultiProducerConsumerRingBuffer.hpp
index 51e559155..52e2be7f4 100644
--- a/src/cpp/rtps/transport/shared_mem/MultiProducerConsumerRingBuffer.hpp
+++ b/src/cpp/rtps/transport/shared_mem/MultiProducerConsumerRingBuffer.hpp
@@ -216,6 +216,11 @@ public:

         auto pointer = node_->pointer_.load(std::memory_order_relaxed);

+        if (pointer.ptr.free_cells == 0)
+        {
+            throw std::runtime_error("Buffer full");
+        }
+
         // if free cells, increase the write pointer and decrease the free cells
         while (pointer.ptr.free_cells > 0 &&
                 !node_->pointer_.compare_exchange_weak(pointer,
@@ -225,11 +230,6 @@ public:
         {
         }

-        if (pointer.ptr.free_cells == 0)
-        {
-            throw std::runtime_error("Buffer full");
-        }
-
         auto& cell = cells_[get_pointer_value(pointer.ptr.write_p)];

         cell.data(data);

@fujitatomoya
Copy link
Collaborator Author

You might want to try upgrading all node_->pointer_.load() calls to std::memory_order_acquire, and check whether the situation improves

okay that guarantees that previous write operation with other threads before release to be visible when it loads the std::atomic<PtrType>. the expectation is that other threads are writing the data to this atomic field but it is not synchronized yet. and that could lead the buffer full situation with many threads are running?

diff --git a/src/cpp/rtps/transport/shared_mem/MultiProducerConsumerRingBuffer.hpp b/src/cpp/rtps/transport/shared_mem/MultiProducerConsumerRingBuffer.hpp
index 51e559155..d2be85ada 100644
--- a/src/cpp/rtps/transport/shared_mem/MultiProducerConsumerRingBuffer.hpp
+++ b/src/cpp/rtps/transport/shared_mem/MultiProducerConsumerRingBuffer.hpp
@@ -85,7 +85,7 @@ public:
          */
         Cell* head()
         {
-            auto pointer = buffer_.node_->pointer_.load(std::memory_order_relaxed);
+            auto pointer = buffer_.node_->pointer_.load(std::memory_order_acquire);

             // If local read_pointer and write_pointer are equal => buffer is empty for this listener
             if (read_p_ == pointer.ptr.write_p )
@@ -121,7 +121,7 @@ public:
             if (counter == 1)
             {
                 // Increase the free cells => increase the global read pointer
-                auto pointer = buffer_.node_->pointer_.load(std::memory_order_relaxed);
+                auto pointer = buffer_.node_->pointer_.load(std::memory_order_acquire);
                 while (!buffer_.node_->pointer_.compare_exchange_weak(pointer,
                         { { pointer.ptr.write_p, pointer.ptr.free_cells + 1 } },
                         std::memory_order_release,
@@ -214,7 +214,7 @@ public:
             return false;
         }

-        auto pointer = node_->pointer_.load(std::memory_order_relaxed);
+        auto pointer = node_->pointer_.load(std::memory_order_acquire);

         // if free cells, increase the write pointer and decrease the free cells
         while (pointer.ptr.free_cells > 0 &&
@@ -240,12 +240,12 @@ public:

     bool is_buffer_full()
     {
-        return (node_->pointer_.load(std::memory_order_relaxed).ptr.free_cells == 0);
+        return (node_->pointer_.load(std::memory_order_acquire).ptr.free_cells == 0);
     }

     bool is_buffer_empty()
     {
-        return (node_->pointer_.load(std::memory_order_relaxed).ptr.free_cells == node_->total_cells_);
+        return (node_->pointer_.load(std::memory_order_acquire).ptr.free_cells == node_->total_cells_);
     }

     /**
@@ -261,7 +261,7 @@ public:
         // The new listener's read pointer is the current write pointer
         auto listener = std::unique_ptr<Listener>(
             new Listener(
-                *this, node_->pointer_.load(std::memory_order_relaxed).ptr.write_p));
+                *this, node_->pointer_.load(std::memory_order_acquire).ptr.write_p));

         node_->registered_listeners_++;

@@ -293,7 +293,7 @@ public:
     {
         if (node_->registered_listeners_ > 0)
         {
-            auto pointer = node_->pointer_.load(std::memory_order_relaxed);
+            auto pointer = node_->pointer_.load(std::memory_order_acquire);

             uint32_t p = pointer_to_head(pointer);

@Barry-Xu-2018
Copy link
Contributor

Barry-Xu-2018 commented Nov 21, 2024

@MiguelCompany

The exception would never be thrown if the current thread successfully updated the counter from 1 to 0.
In that case, the local value of pointer.ptr.free_cells would be 1.

Thanks. I misunderstand.

You might want to try upgrading all node_->pointer_.load() calls to std::memory_order_acquire, and check whether the situation improves

        // if free cells, increase the write pointer and decrease the free cells
        while (pointer.ptr.free_cells > 0 &&
                !node_->pointer_.compare_exchange_weak(pointer,
                { { inc_pointer(pointer.ptr.write_p), pointer.ptr.free_cells - 1 } },
                std::memory_order_release,
                std::memory_order_relaxed))
        {
        }

        if (pointer.ptr.free_cells == 0)
        {
            throw std::runtime_error("Buffer full");
        }

No. I think std::memory_order_relaxed is unsuitable, while compare_exchange_weak is failure.
While compare_exchange_weak return failure, inode_->pointer_ will be loaded to pointer. And following comparison "pointer.ptr.free_cells > 0" and "pointer.ptr.free_cells == 0" will depend on updated pointer(Not sure whether read must be after updated pointer). So whether the memory order should be memory_order_acquire while compare_exchange_weak return failure.
That is

        while (pointer.ptr.free_cells > 0 &&
                !node_->pointer_.compare_exchange_weak(pointer,
                { { inc_pointer(pointer.ptr.write_p), pointer.ptr.free_cells - 1 } },
                std::memory_order_release,
                std::memory_order_acquire))

CC: @fujitatomoya This is my thought.

@MiguelCompany
Copy link
Collaborator

Looking at the following example from the std::memory_order documentation makes me think that we only need the changes suggested by @fujitatomoya in #789 (comment)

#include <atomic>
#include <cassert>
#include <thread>
#include <vector>
 
std::vector<int> data;
std::atomic<int> flag = {0};
 
void thread_1()
{
    data.push_back(42);
    flag.store(1, std::memory_order_release);
}
 
void thread_2()
{
    int expected = 1;
    // memory_order_relaxed is okay because this is an RMW,
    // and RMWs (with any ordering) following a release form a release sequence
    while (!flag.compare_exchange_strong(expected, 2, std::memory_order_relaxed))
    {
        expected = 1;
    }
}
 
void thread_3()
{
    while (flag.load(std::memory_order_acquire) < 2)
        ;
    // if we read the value 2 from the atomic flag, we see 42 in the vector
    [assert](http://en.cppreference.com/w/cpp/error/assert)(data.at(0) == 42); // will never fire
}
 
int main()
{
    [std::thread](http://en.cppreference.com/w/cpp/thread/thread) a(thread_1);
    [std::thread](http://en.cppreference.com/w/cpp/thread/thread) b(thread_2);
    [std::thread](http://en.cppreference.com/w/cpp/thread/thread) c(thread_3);
    a.join(); b.join(); c.join();
}

@Barry-Xu-2018
Copy link
Contributor

        Cell* head()
        {
            auto pointer = buffer_.node_->pointer_.load(std::memory_order_relaxed);

            // If local read_pointer and write_pointer are equal => buffer is empty for this listener
            if (read_p_ == pointer.ptr.write_p )
            {
                return nullptr;
            }

            auto cell = &buffer_.cells_[get_pointer_value(read_p_)];

            return cell->ref_counter() != 0 ? cell : nullptr;
        }

If memory order is memory_order_acquire, it can make sure reading operations (such as read_p_ == pointer.ptr.write_p) must be after auto pointer = buffer_.node_->pointer_.load(std::memory_order_relaxed);

if memory order is memory_order_relaxed, reading operations may be done before auto pointer = buffer_.node_->pointer_.load(std::memory_order_relaxed);
But I think read_p_ == pointer.ptr.write_p have to be located after auto pointer = buffer_.node_->pointer_.load(std::memory_order_relaxed); since pointer is a temporary variable. But auto cell = &buffer_.cells_[get_pointer_value(read_p_)]; may be done before auto pointer = buffer_.node_->pointer_.load(std::memory_order_relaxed); . Even if executed in this order, the final output won't be affected.

Of course, to ensure order, I agree with changing to use memory_order_acquire.

@fujitatomoya
Copy link
Collaborator Author

@Barry-Xu-2018

i may be mistaken on some points...

If memory order is memory_order_acquire, it can make sure reading operations (such as read_p_ == pointer.ptr.write_p) must be after auto pointer = buffer_.node_->pointer_.load(std::memory_order_relaxed);

not really sure what this means here.
i think it just make sure all the store operation in other threads where calls release can be visible on this thread here.

if memory order is memory_order_relaxed, reading operations may be done before auto pointer = buffer_.node_->pointer_.load(std::memory_order_relaxed);

this should not be happening, and compiler should not do that. if it does, it breaks the program sequence.

@Barry-Xu-2018
Copy link
Contributor

@fujitatomoya

If memory order is memory_order_acquire, it can make sure reading operations (such as read_p_ == pointer.ptr.write_p) must be after auto pointer = buffer_.node_->pointer_.load(std::memory_order_relaxed);

not really sure what this means here.
i think it just make sure all the store operation in other threads where calls release can be visible on this thread here.

I want to express that no reads or writes in the current thread can be reordered before this load.
Refer to the explanation of memory_order_acquire in https://en.cppreference.com/w/cpp/atomic/memory_order.

if memory order is memory_order_relaxed, reading operations may be done before auto pointer = buffer_.node_->pointer_.load(std::memory_order_relaxed);

this should not be happening, and compiler should not do that. if it does, it breaks the program sequence.

memory_order_relaxed only guarantees that the operation is atomic. Due to compiler or execution optimization, it's possible that "auto cell = &buffer_.cells_[get_pointer_value(read_p_)];" might be moved ahead of "buffer_.node_->pointer_.load()".
memory_order_acquire can avoid this optimization.

        Cell* head()
        {
            auto pointer = buffer_.node_->pointer_.load(std::memory_order_relaxed);

            // If local read_pointer and write_pointer are equal => buffer is empty for this listener
            if (read_p_ == pointer.ptr.write_p )
            {
                return nullptr;
            }

            auto cell = &buffer_.cells_[get_pointer_value(read_p_)];

            return cell->ref_counter() != 0 ? cell : nullptr;
        }

@MiguelCompany
Copy link
Collaborator

@fujitatomoya Are you going to try the patch proposed in #789 (comment)?

If you see improvements with that, I would then think of a regression test, and prepare a PR for Fast DDS with that

@fujitatomoya
Copy link
Collaborator Author

@MiguelCompany we are still working on how situation changes with the patch and UDP transport, let me have a couple of days. thanks for checking.

@fujitatomoya
Copy link
Collaborator Author

@MiguelCompany

  • UDP Transport (w/o SHM transport)

    • the problem (tf dropping message) can be reproduced, that tells us this problem is not only with SHM transport.
    • when the problem starts happening eprosima::fastrtps::rtps::StatefulReader::processGapMsg consumes CPU time a lot compared to normal situation. This indicates that publisher skips or drops the certain messages on the HistoryCache, and prevent subscriber from request the retransmission for the efficiency.
  • patch applied with SHM transport

    • can mitigate situation until the spike. w/o patch it was 10 hours, but with patch it can be more than 35 hours.
    • even before the spike, Buffer Full situation are appeared in order of 10~100 constantly. this is different from without patch condition.​
    • once it hits the spike, there are hundreds thousands Buffer Full situation are appeared... almost same without patch situation.

i believe that Buffer Full situation should not be true in the 1st place...

@fujitatomoya
Copy link
Collaborator Author

CC: @Barry-Xu-2018

@MiguelCompany
Copy link
Collaborator

@fujitatomoya

From your description on #789 (comment), this seems like a slowly growing snowball issue.

It seems that from time to time, independently of the transport used, there are packet drops due to the transport buffers (UDP or SHM) being full. That would make the writer resend some data, making the snowball grow and leading to the buffers filling more quickly (since it now has to send both old and new data).

The presence of gap messages is evidence that the history of the writer became full, and some samples were removed from the history before they were acknowledged by the readers.

In this situation, increasing the buffers in UDP, or increasing both segment size and port capacity in SHM would improve the situation.

My suggestion would be to start with UDP only and increase the buffers till you make the situation stable. Then add SHM (with the patch), and use the same size for the segments as you used for the UDP buffers

@fujitatomoya
Copy link
Collaborator Author

@MiguelCompany

It seems that from time to time, independently of the transport used, there are packet drops due to the transport buffers (UDP or SHM) being full. That would make the writer resend some data, making the snowball grow and leading to the buffers filling more quickly (since it now has to send both old and new data).

totally agree.

My suggestion would be to start with UDP only and increase the buffers till you make the situation stable. Then add SHM (with the patch), and use the same size for the segments as you used for the UDP buffers

we kinda already tried with port_queue_number.
the bigger port_queue_number is, the longer time until we hit this snow ball issue. (e.g 128 10 hours, 512 it takes 37 hours (default))
but eventually, we will hit this issue...

the original requirement is that robots at site should be 24/7 running without power-off.
i think that is common requirement if that is factory and logistics for robot automation... but with this issue, we cannot support that requirement.

but we have not tried with UDP send/receive buffer size and SHM transport segment size yet. we will give it a shot how that changes the situation.

by the way, there is a general question about Data Sharing delivery and SHM transport.

with SHM transport, descriptors are stored in the port with mutex, and that descriptor describes where to access the data where is shared memory address of the participants who send the data.
and this shared memory segment is allocated based on the configuration file, in default it says 512*1024 byte.
if the data sharing delivery is not enabled, it sounds reasonable that configuration of this segment size affects the buffer full situation.

what if the data sharing delivery is enabled? i mean in this case, DataWriter allocates the shared memory segment based on the QoS configuration history and depth as in HistoryCache, and that is coupled with DataReader.
so in data sharing delivery, this DataWriter's HistoryCache shared memory segment is used by SHM transport segment as well to achieve zero copy delivery?
that said, if data sharing delivery is enabled, we need to configure QoS for DataWriter, not SHM transport segment size?

i am trying to comprehend the memory management with data sharing delivery and SHM transport both enabled.

@fujitatomoya
Copy link
Collaborator Author

but we have not tried with UDP send/receive buffer size and SHM transport segment size yet. we will give it a shot how that changes the situation.

we have tried with SHM segment size, but it does not solve the root cause... it just extends the time window until the problem happens. (current segment size is 10MB, but we can still meet the problem.)

@MiguelCompany
Copy link
Collaborator

@fujitatomoya One thing you could try is reducing the heartbeat period on the publishers. You can do that by XML, see this section of the documentation.

Reducing the heartbeat_period would make the reliability mechanism more responsive, thus (hopefully) reducing the possibility of old and new data to be sent at the same time. I would start with something like heartbeat_period = 0.7 * publication_period.

@fujitatomoya
Copy link
Collaborator Author

@MiguelCompany

could you share your thoughts on #789 (comment)

by the way, there is a general question about Data Sharing delivery and SHM transport.

i am not sure if this is related, but would like to know how Fast-DDS deals with this configuration.

@fujitatomoya
Copy link
Collaborator Author

@MiguelCompany thanks!

Reducing the heartbeat_period would make the reliability mechanism more responsive, thus (hopefully) reducing the possibility of old and new data to be sent at the same time. I would start with something like heartbeat_period = 0.7 * publication_period.

will give it a shot.

@MiguelCompany
Copy link
Collaborator

@MiguelCompany

could you share your thoughts on #789 (comment)

The data-sharing mechanism is independent of the transports.
A data-sharing-enabled writer will store the serialized payload of the samples on the shared-memory-mapped file.
When communicating with a reader on a remote host it will use the relevant transport (UDP in the default configuration).
When communicating with a non-data-sharing reader on the same host it will use the relevant transport (SHM in the default configuration).
When communicating with a data-sharing-enabled reader on the same host it will use an interprocess condition variable.
A data-sharing-enabled reader will use the relevant transport to send acknowledgments.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants