paho-mqtt-cpp
MQTT C++ Client for POSIX and Windows
thread_queue.h
Go to the documentation of this file.
1 
9 /*******************************************************************************
10  * Copyright (c) 2017-2022 Frank Pagliughi <fpagliughi@mindspring.com>
11  *
12  * All rights reserved. This program and the accompanying materials
13  * are made available under the terms of the Eclipse Public License v2.0
14  * and Eclipse Distribution License v1.0 which accompany this distribution.
15  *
16  * The Eclipse Public License is available at
17  * http://www.eclipse.org/legal/epl-v20.html
18  * and the Eclipse Distribution License is available at
19  * http://www.eclipse.org/org/documents/edl-v10.php.
20  *
21  * Contributors:
22  * Frank Pagliughi - initial implementation and documentation
23  *******************************************************************************/
24 
25 #ifndef __mqtt_thread_queue_h
26 #define __mqtt_thread_queue_h
27 
28 #include <thread>
29 #include <mutex>
30 #include <condition_variable>
31 #include <limits>
32 #include <deque>
33 #include <queue>
34 #include <algorithm>
35 
36 namespace mqtt {
37 
39 
68 template <typename T, class Container=std::deque<T>>
70 {
71 public:
73  using container_type = Container;
75  using value_type = T;
77  using size_type = typename Container::size_type;
78 
80  static constexpr size_type MAX_CAPACITY = std::numeric_limits<size_type>::max();
81 
82 private:
84  mutable std::mutex lock_;
86  std::condition_variable notEmptyCond_;
88  std::condition_variable notFullCond_;
90  size_type cap_;
92  std::queue<T,Container> que_;
93 
95  using guard = std::lock_guard<std::mutex>;
97  using unique_guard = std::unique_lock<std::mutex>;
98 
99 public:
109  explicit thread_queue(size_t cap) : cap_(std::max<size_type>(cap, 1)) {}
115  bool empty() const {
116  guard g(lock_);
117  return que_.empty();
118  }
123  size_type capacity() const {
124  guard g(lock_);
125  return cap_;
126  }
133  void capacity(size_type cap) {
134  guard g(lock_);
135  cap_ = cap;
136  }
141  size_type size() const {
142  guard g(lock_);
143  return que_.size();
144  }
151  void put(value_type val) {
152  unique_guard g(lock_);
153  notFullCond_.wait(g, [this]{return que_.size() < cap_;});
154 
155  que_.emplace(std::move(val));
156  g.unlock();
157  notEmptyCond_.notify_one();
158  }
165  bool try_put(value_type val) {
166  unique_guard g(lock_);
167  if (que_.size() >= cap_)
168  return false;
169 
170  que_.emplace(std::move(val));
171  g.unlock();
172  notEmptyCond_.notify_one();
173  return true;
174  }
184  template <typename Rep, class Period>
185  bool try_put_for(value_type val, const std::chrono::duration<Rep, Period>& relTime) {
186  unique_guard g(lock_);
187  if (!notFullCond_.wait_for(g, relTime, [this]{return que_.size() < cap_;}))
188  return false;
189 
190  que_.emplace(std::move(val));
191  g.unlock();
192  notEmptyCond_.notify_one();
193  return true;
194  }
205  template <class Clock, class Duration>
206  bool try_put_until(value_type val, const std::chrono::time_point<Clock,Duration>& absTime) {
207  unique_guard g(lock_);
208  if (!notFullCond_.wait_until(g, absTime, [this]{return que_.size() < cap_;}))
209  return false;
210 
211  que_.emplace(std::move(val));
212  g.unlock();
213  notEmptyCond_.notify_one();
214  return true;
215  }
222  void get(value_type* val) {
223  if (!val)
224  return;
225 
226  unique_guard g(lock_);
227  notEmptyCond_.wait(g, [this]{return !que_.empty();});
228 
229  *val = std::move(que_.front());
230  que_.pop();
231  g.unlock();
232  notFullCond_.notify_one();
233  }
241  unique_guard g(lock_);
242  notEmptyCond_.wait(g, [this]{return !que_.empty();});
243 
244  value_type val = std::move(que_.front());
245  que_.pop();
246  g.unlock();
247  notFullCond_.notify_one();
248  return val;
249  }
258  bool try_get(value_type* val) {
259  if (!val)
260  return false;
261 
262  unique_guard g(lock_);
263  if (que_.empty())
264  return false;
265 
266  *val = std::move(que_.front());
267  que_.pop();
268  g.unlock();
269  notFullCond_.notify_one();
270  return true;
271  }
282  template <typename Rep, class Period>
283  bool try_get_for(value_type* val, const std::chrono::duration<Rep, Period>& relTime) {
284  if (!val)
285  return false;
286 
287  unique_guard g(lock_);
288  if (!notEmptyCond_.wait_for(g, relTime, [this]{return !que_.empty();}))
289  return false;
290 
291  *val = std::move(que_.front());
292  que_.pop();
293  g.unlock();
294  notFullCond_.notify_one();
295  return true;
296  }
307  template <class Clock, class Duration>
308  bool try_get_until(value_type* val, const std::chrono::time_point<Clock,Duration>& absTime) {
309  if (!val)
310  return false;
311 
312  unique_guard g(lock_);
313  if (!notEmptyCond_.wait_until(g, absTime, [this]{return !que_.empty();}))
314  return false;
315 
316  *val = std::move(que_.front());
317  que_.pop();
318  g.unlock();
319  notFullCond_.notify_one();
320  return true;
321  }
322 };
323 
325 // end namespace mqtt
326 }
327 
328 #endif // __mqtt_thread_queue_h
329 
Definition: thread_queue.h:70
typename Container::size_type size_type
Definition: thread_queue.h:77
T value_type
Definition: thread_queue.h:75
size_type size() const
Definition: thread_queue.h:141
void capacity(size_type cap)
Definition: thread_queue.h:133
Container container_type
Definition: thread_queue.h:73
bool try_put(value_type val)
Definition: thread_queue.h:165
bool try_get_for(value_type *val, const std::chrono::duration< Rep, Period > &relTime)
Definition: thread_queue.h:283
bool try_put_for(value_type val, const std::chrono::duration< Rep, Period > &relTime)
Definition: thread_queue.h:185
static constexpr size_type MAX_CAPACITY
Definition: thread_queue.h:80
bool try_get(value_type *val)
Definition: thread_queue.h:258
thread_queue()
Definition: thread_queue.h:103
bool try_put_until(value_type val, const std::chrono::time_point< Clock, Duration > &absTime)
Definition: thread_queue.h:206
void get(value_type *val)
Definition: thread_queue.h:222
bool empty() const
Definition: thread_queue.h:115
size_type capacity() const
Definition: thread_queue.h:123
thread_queue(size_t cap)
Definition: thread_queue.h:109
void put(value_type val)
Definition: thread_queue.h:151
value_type get()
Definition: thread_queue.h:240
bool try_get_until(value_type *val, const std::chrono::time_point< Clock, Duration > &absTime)
Definition: thread_queue.h:308
Definition: async_client.h:49