Skip to content

Commit

Permalink
stack/spsc_queue - introduce move semantics
Browse files Browse the repository at this point in the history
  • Loading branch information
timblechmann committed Nov 4, 2023
1 parent e145cd1 commit 362cab7
Show file tree
Hide file tree
Showing 7 changed files with 220 additions and 39 deletions.
29 changes: 25 additions & 4 deletions include/boost/lockfree/detail/freelist.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,25 @@ class freelist_stack : Alloc
}

template < bool ThreadSafe, bool Bounded, typename ArgumentType >
T* construct( ArgumentType const& arg )
T* construct( const ArgumentType& arg )
{
T* node = allocate< ThreadSafe, Bounded >();
if ( node )
new ( node ) T( arg );
return node;
}

template < bool ThreadSafe, bool Bounded, typename ArgumentType >
T* construct( ArgumentType&& arg )
{
T* node = allocate< ThreadSafe, Bounded >();
if ( node )
new ( node ) T( std::forward< ArgumentType >( arg ) );
return node;
}

template < bool ThreadSafe, bool Bounded, typename ArgumentType1, typename ArgumentType2 >
T* construct( ArgumentType1 const& arg1, ArgumentType2 const& arg2 )
T* construct( ArgumentType1&& arg1, ArgumentType2&& arg2 )
{
T* node = allocate< ThreadSafe, Bounded >();
if ( node )
Expand Down Expand Up @@ -447,7 +456,7 @@ class fixed_size_freelist : NodeStorage
}

template < bool ThreadSafe, bool Bounded, typename ArgumentType >
T* construct( ArgumentType const& arg )
T* construct( const ArgumentType& arg )
{
index_t node_index = allocate< ThreadSafe >();
if ( node_index == null_handle() )
Expand All @@ -458,8 +467,20 @@ class fixed_size_freelist : NodeStorage
return node;
}

template < bool ThreadSafe, bool Bounded, typename ArgumentType >
T* construct( ArgumentType&& arg )
{
index_t node_index = allocate< ThreadSafe >();
if ( node_index == null_handle() )
return NULL;

T* node = NodeStorage::nodes() + node_index;
new ( node ) T( std::forward< ArgumentType >( arg ) );
return node;
}

template < bool ThreadSafe, bool Bounded, typename ArgumentType1, typename ArgumentType2 >
T* construct( ArgumentType1 const& arg1, ArgumentType2 const& arg2 )
T* construct( const ArgumentType1& arg1, const ArgumentType2& arg2 )
{
index_t node_index = allocate< ThreadSafe >();
if ( node_index == null_handle() )
Expand Down
37 changes: 31 additions & 6 deletions include/boost/lockfree/queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,11 +295,17 @@ class queue
* \note Thread-safe. If internal memory pool is exhausted and the memory pool is not fixed-sized, a new node will
* be allocated from the OS. This may not be lock-free.
* */
bool push( T const& t )
bool push( const T& t )
{
return do_push< false >( t );
}

/// \copydoc boost::lockfree::queue::push(const T & t)
bool push( T&& t )
{
return do_push< false >( std::forward< T >( t ) );
}

/** Pushes object t to the queue.
*
* \post object will be pushed to the queue, if internal node can be allocated
Expand All @@ -308,18 +314,36 @@ class queue
* \note Thread-safe and non-blocking. If internal memory pool is exhausted, operation will fail
* \throws if memory allocator throws
* */
bool bounded_push( T const& t )
bool bounded_push( const T& t )
{
return do_push< true >( t );
}

/// \copydoc boost::lockfree::queue::bounded_push(const T & t)
bool bounded_push( T&& t )
{
return do_push< true >( std::forward< T >( t ) );
}


private:
#ifndef BOOST_DOXYGEN_INVOKED
template < bool Bounded >
bool do_push( T&& t )
{
node* n = pool.template construct< true, Bounded >( std::forward< T >( t ), pool.null_handle() );
return do_push_node( n );
}

template < bool Bounded >
bool do_push( T const& t )
{
node* n = pool.template construct< true, Bounded >( t, pool.null_handle() );
node* n = pool.template construct< true, Bounded >( t, pool.null_handle() );
return do_push_node( n );
}

bool do_push_node( node* n )
{
handle_type node_handle = pool.get_handle( n );

if ( n == NULL )
Expand Down Expand Up @@ -347,6 +371,7 @@ class queue
}
}
}

#endif

public:
Expand All @@ -358,9 +383,9 @@ class queue
* \note Not Thread-safe. If internal memory pool is exhausted and the memory pool is not fixed-sized, a new node
* will be allocated from the OS. This may not be lock-free. \throws if memory allocator throws
* */
bool unsynchronized_push( T const& t )
bool unsynchronized_push( T&& t )
{
node* n = pool.template construct< false, false >( t, pool.null_handle() );
node* n = pool.template construct< false, false >( std::forward< T >( t ), pool.null_handle() );

if ( n == NULL )
return false;
Expand Down Expand Up @@ -509,7 +534,7 @@ class queue
T element;
bool success = pop( element );
if ( success )
f( element );
f( std::move( element ) );

return success;
}
Expand Down
64 changes: 48 additions & 16 deletions include/boost/lockfree/spsc_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ class ringbuffer_base
return write_available( write_index, read_index, max_size );
}

bool push( T const& t, T* buffer, size_t max_size )

bool push( const T& t, T* buffer, size_t max_size )
{
const size_t write_index = write_index_.load( memory_order_relaxed ); // only written from push thread
const size_t next = next_index( write_index, max_size );
Expand All @@ -107,6 +108,21 @@ class ringbuffer_base
return true;
}

bool push( T&& t, T* buffer, size_t max_size )
{
const size_t write_index = write_index_.load( memory_order_relaxed ); // only written from push thread
const size_t next = next_index( write_index, max_size );

if ( next == read_index_.load( memory_order_acquire ) )
return false; /* ringbuffer is full */

new ( buffer + write_index ) T( std::forward< T >( t ) ); // move-construct

write_index_.store( next, memory_order_release );

return true;
}

size_t push( const T* input_buffer, size_t input_count, T* internal_buffer, size_t max_size )
{
return push( input_buffer, input_buffer + input_count, internal_buffer, max_size ) - input_buffer;
Expand Down Expand Up @@ -159,7 +175,7 @@ class ringbuffer_base
return false;

T& object_to_consume = buffer[ read_index ];
functor( object_to_consume );
functor( std::move( object_to_consume ) );
object_to_consume.~T();

size_t next = next_index( read_index, max_size );
Expand Down Expand Up @@ -221,12 +237,12 @@ class ringbuffer_base
const size_t count0 = max_size - read_index;
const size_t count1 = output_count - count0;

copy_and_delete( internal_buffer + read_index, internal_buffer + max_size, output_buffer );
copy_and_delete( internal_buffer, internal_buffer + count1, output_buffer + count0 );
move_and_delete( internal_buffer + read_index, internal_buffer + max_size, output_buffer );
move_and_delete( internal_buffer, internal_buffer + count1, output_buffer + count0 );

new_read_index -= max_size;
} else {
copy_and_delete( internal_buffer + read_index, internal_buffer + read_index + output_count, output_buffer );
move_and_delete( internal_buffer + read_index, internal_buffer + read_index + output_count, output_buffer );
if ( new_read_index == max_size )
new_read_index = 0;
}
Expand All @@ -252,12 +268,12 @@ class ringbuffer_base
const size_t count0 = max_size - read_index;
const size_t count1 = avail - count0;

it = copy_and_delete( internal_buffer + read_index, internal_buffer + max_size, it );
copy_and_delete( internal_buffer, internal_buffer + count1, it );
it = move_and_delete( internal_buffer + read_index, internal_buffer + max_size, it );
move_and_delete( internal_buffer, internal_buffer + count1, it );

new_read_index -= max_size;
} else {
copy_and_delete( internal_buffer + read_index, internal_buffer + read_index + avail, it );
move_and_delete( internal_buffer + read_index, internal_buffer + read_index + avail, it );
if ( new_read_index == max_size )
new_read_index = 0;
}
Expand Down Expand Up @@ -322,13 +338,13 @@ class ringbuffer_base
}

template < class OutputIterator >
OutputIterator copy_and_delete( T* first, T* last, OutputIterator out )
OutputIterator move_and_delete( T* first, T* last, OutputIterator out )
{
if ( std::is_trivially_destructible< T >::value ) {
return std::copy( first, last, out ); // will use memcpy if possible
} else {
for ( ; first != last; ++first, ++out ) {
*out = *first;
*out = std::move( *first );
first->~T();
}
return out;
Expand All @@ -339,7 +355,7 @@ class ringbuffer_base
void run_functor_and_delete( T* first, T* last, Functor&& functor )
{
for ( ; first != last; ++first ) {
functor( *first );
functor( std::move( *first ) );
first->~T();
}
}
Expand Down Expand Up @@ -379,11 +395,16 @@ class compile_time_sized_ringbuffer : public ringbuffer_base< T >
}

public:
bool push( T const& t )
bool push( const T& t )
{
return ringbuffer_base< T >::push( t, data(), max_size );
}

bool push( T&& t )
{
return ringbuffer_base< T >::push( std::forward< T >( t ), data(), max_size );
}

template < typename Functor >
bool consume_one( Functor&& f )
{
Expand Down Expand Up @@ -484,11 +505,16 @@ class runtime_sized_ringbuffer : public ringbuffer_base< T >, private Alloc
allocator_traits::deallocate( allocator, array_, max_elements_ );
}

bool push( T const& t )
bool push( const T& t )
{
return ringbuffer_base< T >::push( t, &*array_, max_elements_ );
}

bool push( T&& t )
{
return ringbuffer_base< T >::push( std::forward< T >( t ), &*array_, max_elements_ );
}

template < typename Functor >
bool consume_one( Functor&& f )
{
Expand Down Expand Up @@ -676,11 +702,17 @@ class spsc_queue : public detail::make_ringbuffer< T, Options... >::ringbuffer_t
*
* \note Thread-safe and wait-free
* */
bool push( T const& t )
bool push( const T& t )
{
return base_type::push( t );
}

/// \copydoc boost::lockfree::spsc_queue::push(const T& t)
bool push( T&& t )
{
return base_type::push( std::forward< T >( t ) );
}

/** Pops one object from ringbuffer.
*
* \pre only one thread is allowed to pop data from the spsc_queue
Expand All @@ -705,8 +737,8 @@ class spsc_queue : public detail::make_ringbuffer< T, Options... >::ringbuffer_t
template < typename U, typename Enabler = std::enable_if< std::is_convertible< T, U >::value > >
bool pop( U& ret )
{
return consume_one( [ & ]( const T& t ) {
ret = std::move( t );
return consume_one( [ & ]( T&& t ) {
ret = std::forward< T >( t );
} );
}

Expand Down
Loading

0 comments on commit 362cab7

Please sign in to comment.