-
Notifications
You must be signed in to change notification settings - Fork 58
/
threadsafe_queue.h
152 lines (130 loc) · 3.23 KB
/
threadsafe_queue.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
#ifndef __THREADSAFE_QUEUE_H_
#define __THREADSAFE_QUEUE_H_
#include <queue>
#include <mutex>
#include <condition_variable>
#include <memory>
#include <atomic>
using namespace std;
template<typename T> class threadsafe_queue
{
public:
threadsafe_queue(size_t size_limit)
: size_limit(size_limit), m_termination(false)
{
}
~threadsafe_queue() = default;
/**
* 1. When termination is not called, one element is dequeued every time the
* queue is called until the queue is empty. This method blocks the thread.
* 2. After termination is called, this method will never block. If it is
* already in a blocked state, contact the blocked state.
* 3. When true is returned, the value is valid. When false is returned, value
* is invalid. Returns false when termination is called and the queue is empty.
**/
//return nullptr if the queue is empty
std::shared_ptr<T> wait_and_pop()
{
unique_lock<mutex> lk(mut);
data_cond.wait(lk, [this] {
return !data_queue.empty() ||
m_termination.load(memory_order_acquire);
});
//dequeue if not empty
if (!data_queue.empty())
{
shared_ptr<T> res = data_queue.front();
data_queue.pop();
return res;
}
//If the queue is empty, return nullptr
return nullptr;
}
//return false if the queue is empty
bool wait_and_pop(T &&value)
{
shared_ptr<T> res = wait_and_pop();
if (res == nullptr)
return false;
value = std::move(res);
return true;
}
//return nullptr if the queue is empty
std::shared_ptr<T> try_pop()
{
lock_guard<mutex> lk(mut);
//dequeue if not empty
if (!data_queue.empty())
{
shared_ptr<T> res = data_queue.front();
data_queue.pop();
return res;
}
//If the queue is empty, return nullptr
return nullptr;
}
//return false if the queue is empty
bool try_pop(T &&value)
{
shared_ptr<T> res = try_pop();
if (res == nullptr)
return false;
value = std::move(res);
return true;
}
//insert queue, move
void move_push(T &&new_value)
{
if (m_termination.load(memory_order_acquire))
return;
shared_ptr<T> data(make_shared<T>(std::move(new_value)));
unique_lock<mutex> lk(mut);
data_queue.push(data);
if (data_queue.size() > size_limit) {
data_queue.pop();
m_dropped_count.fetch_add(1, memory_order_relaxed);
}
data_cond.notify_one();
}
//insert queue
void push(T new_value)
{
move_push(new_value);
}
bool empty()
{
unique_lock<mutex> lk(mut);
return data_queue.empty();
}
size_t size()
{
unique_lock<mutex> lk(mut);
return data_queue.size();
}
size_t dropped_count() const
{
return m_dropped_count.load(memory_order_relaxed);
}
//Set this queue to terminated state.
//In the exit state, the enqueue is ignored, and the dequeue can be performed.
//When the queue is empty, wait_and_pop will not block.
void termination()
{
unique_lock<mutex> lk(mut);
m_termination.store(true, memory_order_release);
data_cond.notify_all();
}
//Get whether this queue is terminated
bool is_termination() const
{
return m_termination.load(memory_order_acquire);
}
private:
mutex mut;
queue<shared_ptr<T>> data_queue;
const size_t size_limit;
condition_variable data_cond;
atomic_bool m_termination;
atomic_size_t m_dropped_count;
};
#endif