9 #ifndef LIBPMEMOBJ_MPSC_QUEUE_HPP
10 #define LIBPMEMOBJ_MPSC_QUEUE_HPP
14 #include <libpmemobj++/detail/ringbuf.hpp>
32 namespace experimental
60 template <
typename Function>
65 static constexpr
size_t CAPACITY =
66 pmem::detail::CACHELINE_SIZE -
sizeof(size_t);
67 static constexpr
size_t DIRTY_FLAG =
68 (1ULL << (
sizeof(size_t) * 8 - 1));
75 iterator(
char *data,
char *
end);
85 first_block *seek_next(first_block *);
91 void clear_cachelines(first_block *block,
size_t size);
92 void restore_offsets();
94 size_t consume_cachelines(
size_t *offset);
95 void release_cachelines(
size_t len);
102 std::unique_ptr<ringbuf::ringbuf_t> ring_buffer;
110 size_t consume_offset = 0;
111 size_t consume_len = 0;
122 iterator
begin()
const;
123 iterator
end()
const;
152 template <
typename Function =
void (*)(pmem::obj::
string_view)>
155 Function &&on_produce =
160 ringbuf::ringbuf_worker_t *w;
163 ptrdiff_t acquire_cachelines(
size_t len);
164 void produce_cachelines();
205 auto buf_data =
pmem.data();
207 buf =
const_cast<char *
>(buf_data.data());
208 buf_size = buf_data.size();
210 assert(buf_size % pmem::detail::CACHELINE_SIZE == 0);
213 std::unique_ptr<ringbuf::ringbuf_t>(
new ringbuf::ringbuf_t(
214 max_workers, buf_size / pmem::detail::CACHELINE_SIZE));
222 mpsc_queue::worker::acquire_cachelines(
size_t len)
224 assert(len % pmem::detail::CACHELINE_SIZE == 0);
225 auto ret = ringbuf_acquire(queue->ring_buffer.get(), w,
226 len / pmem::detail::CACHELINE_SIZE);
231 return ret *
static_cast<ptrdiff_t
>(pmem::detail::CACHELINE_SIZE);
235 mpsc_queue::worker::produce_cachelines()
237 ringbuf_produce(queue->ring_buffer.get(), w);
241 mpsc_queue::consume_cachelines(
size_t *offset)
243 auto ret = ringbuf_consume(ring_buffer.get(), offset);
245 *offset *= pmem::detail::CACHELINE_SIZE;
246 return ret * pmem::detail::CACHELINE_SIZE;
253 mpsc_queue::release_cachelines(
size_t len)
255 assert(len % pmem::detail::CACHELINE_SIZE == 0);
256 ringbuf_release(ring_buffer.get(), len / pmem::detail::CACHELINE_SIZE);
260 mpsc_queue::restore_offsets()
263 assert(
pmem->written < buf_size);
269 if (!
pmem->written) {
276 auto acq = w.acquire_cachelines(buf_size -
277 pmem::detail::CACHELINE_SIZE);
281 w.produce_cachelines();
301 auto acq = w.acquire_cachelines(
pmem->written);
303 w.produce_cachelines();
307 auto len = consume_cachelines(&offset);
308 assert(len ==
pmem->written);
309 release_cachelines(len);
312 assert(len ==
pmem->written);
314 acq = w.acquire_cachelines(buf_size -
pmem->written);
316 assert(
static_cast<size_t>(acq) ==
pmem->written);
317 w.produce_cachelines();
319 acq = w.acquire_cachelines(
pmem->written -
320 pmem::detail::CACHELINE_SIZE);
323 w.produce_cachelines();
332 : data_(size, 0), written(0)
345 auto addr =
reinterpret_cast<uintptr_t
>(&data_[0]);
347 pmem::detail::align_up(addr, pmem::detail::CACHELINE_SIZE);
349 auto size = data_.size() - (aligned_addr - addr);
351 pmem::detail::align_down(size, pmem::detail::CACHELINE_SIZE);
354 reinterpret_cast<const char *
>(aligned_addr), aligned_size);
358 mpsc_queue::get_id_manager()
397 template <
typename Function>
401 if (pmemobj_tx_stage() != TX_STAGE_NONE)
403 "Function called inside a transaction scope.");
405 bool consumed =
false;
411 for (
int i = 0; i < 2; i++) {
414 if (!ring_buffer->consume_in_progress) {
416 auto len = consume_cachelines(&offset);
420 consume_offset = offset;
423 assert(consume_len != 0);
426 #if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
427 ANNOTATE_HAPPENS_AFTER(ring_buffer.get());
430 auto data = buf + consume_offset;
431 auto begin = iterator(data, data + consume_len);
432 auto end = iterator(data + consume_len, data + consume_len);
440 auto b =
reinterpret_cast<first_block *
>(data);
441 clear_cachelines(b, consume_len);
443 if (consume_offset + consume_len < buf_size)
444 pmem->written = consume_offset + consume_len;
445 else if (consume_offset + consume_len == buf_size)
451 #if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
452 ANNOTATE_HAPPENS_BEFORE(ring_buffer.get());
455 release_cachelines(consume_len);
457 assert(!ring_buffer->consume_in_progress);
468 inline mpsc_queue::worker::worker(
mpsc_queue *q)
471 auto &manager = queue->get_id_manager();
473 #if LIBPMEMOBJ_CPP_VG_DRD_ENABLED
474 ANNOTATE_BENIGN_RACE_SIZED(
475 &manager,
sizeof(std::mutex),
476 "https://bugs.kde.org/show_bug.cgi?id=416286");
481 assert(id < q->ring_buffer->nworkers);
483 w = ringbuf_register(queue->ring_buffer.get(),
id);
486 inline mpsc_queue::worker::worker(mpsc_queue::worker &&other)
488 *
this = std::move(other);
491 inline mpsc_queue::worker &
492 mpsc_queue::worker::operator=(worker &&other)
494 if (
this != &other) {
499 other.queue =
nullptr;
505 inline mpsc_queue::worker::~worker()
508 ringbuf_unregister(queue->ring_buffer.get(), w);
509 auto &manager = queue->get_id_manager();
526 template <
typename Function>
529 Function &&on_produce)
532 pmem::detail::align_up(data.
size() +
sizeof(first_block::size),
533 pmem::detail::CACHELINE_SIZE);
534 auto offset = acquire_cachelines(req_size);
536 #if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
537 ANNOTATE_HAPPENS_AFTER(queue->ring_buffer.get());
543 store_to_log(data, queue->buf + offset);
545 #if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
546 ANNOTATE_HAPPENS_BEFORE(queue->ring_buffer.get());
550 queue->buf + offset +
sizeof(first_block::size), data.
size()));
552 produce_cachelines();
560 assert(
reinterpret_cast<uintptr_t
>(log_data) %
561 pmem::detail::CACHELINE_SIZE ==
568 auto b =
reinterpret_cast<first_block *
>(log_data);
569 auto s = pmem::detail::align_up(data.
size() +
sizeof(first_block::size),
570 pmem::detail::CACHELINE_SIZE);
571 auto e = b + s / pmem::detail::CACHELINE_SIZE;
573 assert(b->size == 0);
578 assert(
reinterpret_cast<first_block *
>(log_data)->size == 0);
581 fblock.size = data.
size() | size_t(first_block::DIRTY_FLAG);
600 size_t ncopy = (std::min)(data.
size(), size_t(first_block::CAPACITY));
601 std::copy_n(data.
data(), ncopy, fblock.data);
603 pmemobj_memcpy(queue->pop.handle(), log_data,
604 reinterpret_cast<char *
>(&fblock),
605 pmem::detail::CACHELINE_SIZE, PMEMOBJ_F_MEM_NONTEMPORAL);
607 size_t remaining_size = ncopy > data.
size() ? 0 : data.
size() - ncopy;
609 const char *srcof = data.
data() + ncopy;
610 size_t rcopy = pmem::detail::align_down(remaining_size,
611 pmem::detail::CACHELINE_SIZE);
612 size_t lcopy = remaining_size - rcopy;
614 char last_cacheline[pmem::detail::CACHELINE_SIZE];
616 std::copy_n(srcof + rcopy, lcopy, last_cacheline);
619 char *dest = log_data + pmem::detail::CACHELINE_SIZE;
621 pmemobj_memcpy(queue->pop.handle(), dest, srcof, rcopy,
622 PMEMOBJ_F_MEM_NODRAIN |
623 PMEMOBJ_F_MEM_NONTEMPORAL);
627 void *dest = log_data + pmem::detail::CACHELINE_SIZE + rcopy;
629 pmemobj_memcpy(queue->pop.handle(), dest, last_cacheline,
630 pmem::detail::CACHELINE_SIZE,
631 PMEMOBJ_F_MEM_NODRAIN |
632 PMEMOBJ_F_MEM_NONTEMPORAL);
635 pmemobj_drain(queue->pop.handle());
637 fblock.size &= (~size_t(first_block::DIRTY_FLAG));
639 pmemobj_memcpy(queue->pop.handle(), log_data,
640 reinterpret_cast<char *
>(&fblock),
641 pmem::detail::CACHELINE_SIZE, PMEMOBJ_F_MEM_NONTEMPORAL);
644 inline mpsc_queue::batch_type::batch_type(iterator begin_, iterator end_)
645 : begin_(begin_), end_(end_)
655 inline mpsc_queue::iterator
667 inline mpsc_queue::iterator
673 mpsc_queue::iterator::iterator(
char *data,
char *
end) : data(data),
end(
end)
675 auto b =
reinterpret_cast<first_block *
>(data);
676 auto next = seek_next(b);
678 this->data =
reinterpret_cast<char *
>(next);
682 mpsc_queue::clear_cachelines(first_block *block,
size_t size)
684 assert(size % pmem::detail::CACHELINE_SIZE == 0);
685 assert(pmemobj_tx_stage() == TX_STAGE_WORK);
688 static_cast<ptrdiff_t
>(size / pmem::detail::CACHELINE_SIZE);
690 while (block <
end) {
692 detail::conditional_add_to_tx(&block->size, 1,
693 POBJ_XADD_ASSUME_INITIALIZED);
698 assert(
end <=
reinterpret_cast<first_block *
>(buf + buf_size));
701 mpsc_queue::iterator &
704 auto block =
reinterpret_cast<first_block *
>(data);
705 assert(block->size != 0);
708 pmem::detail::align_up(block->size +
sizeof(block->size),
709 pmem::detail::CACHELINE_SIZE);
711 block += element_size / pmem::detail::CACHELINE_SIZE;
713 auto next = seek_next(block);
714 assert(next >= block);
717 data =
reinterpret_cast<char *
>(block);
725 return data == rhs.
data;
731 return data != rhs.
data;
736 auto b =
reinterpret_cast<first_block *
>(data);
740 mpsc_queue::first_block *
741 mpsc_queue::iterator::seek_next(mpsc_queue::first_block *b)
743 auto e =
reinterpret_cast<first_block *
>(
end);
757 }
else if (b->size &
size_t(first_block::DIRTY_FLAG)) {
759 b->size & (~size_t(first_block::DIRTY_FLAG));
760 auto aligned_size = pmem::detail::align_up(
761 size +
sizeof(b->size),
762 pmem::detail::CACHELINE_SIZE);
764 b += aligned_size / pmem::detail::CACHELINE_SIZE;
Our partial std::string_view implementation.
Definition: string_view.hpp:46
constexpr size_type size() const noexcept
Returns count of characters stored in this pmem::obj::string_view data.
Definition: string_view.hpp:334
constexpr const CharT * data() const noexcept
Returns pointer to data stored in this pmem::obj::string_view.
Definition: string_view.hpp:296
Type representing the range of the mpsc_queue elements.
Definition: mpsc_queue.hpp:118
iterator begin() const
Returns an iterator to the beginning of the accessed range of the mpsc_queue.
Definition: mpsc_queue.hpp:656
iterator end() const
Returns an iterator to the end of the accessed range of the mpsc_queue.
Definition: mpsc_queue.hpp:668
Type representing persistent data, which may be managed by mpsc_queue.
Definition: mpsc_queue.hpp:180
pmem_log_type(size_t size)
Constructs pmem_log_type object.
Definition: mpsc_queue.hpp:331
pmem::obj::string_view data()
Returns pmem::obj::string_view which allows to read-only access to the underlying buffer.
Definition: mpsc_queue.hpp:343
mpsc_queue producer worker class.
Definition: mpsc_queue.hpp:141
bool try_produce(pmem::obj::string_view data, Function &&on_produce=[](pmem::obj::string_view target) {})
Copies data from pmem::obj::string_view into the mpsc_queue.
Definition: mpsc_queue.hpp:528
Persistent memory aware implementation of multi producer single consumer queue.
Definition: mpsc_queue.hpp:50
bool try_consume_batch(Function &&f)
Evaluates callback function f() for the data, which is ready to be consumed.
Definition: mpsc_queue.hpp:399
mpsc_queue(pmem_log_type &pmem, size_t max_workers=1)
mpsc_queue constructor.
Definition: mpsc_queue.hpp:201
worker register_worker()
Registers the producer worker.
Definition: mpsc_queue.hpp:373
static void run(obj::pool_base &pool, std::function< void()> tx, Locks &... locks)
Execute a closure-like transaction and lock locks.
Definition: transaction.hpp:823
The non-template pool base class.
Definition: pool.hpp:50
Custom transaction error class.
Definition: pexceptions.hpp:176
Commonly used functionality.
A persistent version of thread-local storage.
Persistent_ptr transactional allocation functions for objects.
bool operator==(self_relative_ptr< T > const &lhs, self_relative_ptr< Y > const &rhs) noexcept
Equality operator.
Definition: self_relative_ptr.hpp:424
bool operator!=(self_relative_ptr< T > const &lhs, self_relative_ptr< Y > const &rhs) noexcept
Inequality operator.
Definition: self_relative_ptr.hpp:435
p< T > & operator++(p< T > &pp)
Prefix increment operator overload.
Definition: pext.hpp:48
bool operator!=(const allocator< T, P, Tr > &lhs, const OtherAllocator &rhs)
Determines if memory from another allocator can be deallocated from this one.
Definition: allocator.hpp:536
pmem::obj::array< T, N >::iterator end(pmem::obj::array< T, N > &a)
Non-member end.
Definition: array.hpp:849
pool_base pool_by_vptr(const T *that)
Retrieve pool handle for the given pointer.
Definition: utils.hpp:32
bool operator==(standard_alloc_policy< T > const &, standard_alloc_policy< T2 > const &)
Determines if memory from another allocator can be deallocated from this one.
Definition: allocator.hpp:420
pmem::obj::array< T, N >::iterator begin(pmem::obj::array< T, N > &a)
Non-member begin.
Definition: array.hpp:829
Persistent memory namespace.
Definition: allocation_flag.hpp:15
Persistent smart pointer.
Our partial std::string_view implementation.
This structure is used for assigning unique thread ids so that those ids will be reused in case of th...
Definition: enumerable_thread_specific.hpp:35
C++ pmemobj transactions.