Code: Select all
front
[*]
Code: Select all
container
[*]
Code: Select all
meta_data
Code: Select all
std::atomic
Code: Select all
struct MetaDatum{
bool occupied;
unsigned int counter;
};
Code: Select all
_capacity
'mpmcQueue.h'
Code: Select all
#include
#include
#include
#include
#include
#include
struct MetaDatum{
unsigned int counter;
bool occupied;
};
template
struct MPMCQueue{
private:
T** container;
std::atomic* meta_data;
std::atomic front;
std::atomic back;
size_t _capacity;
public:
MPMCQueue(size_t capacity);
bool try_push(T* ptr);
T* pop();
};
< /code>
'mpmcQueue.cpp'
#include "mpmcqueue.h"
/* Public member functions */
template
MPMCQueue::MPMCQueue(size_t capacity): _capacity(capacity+1), front(0), back(0){
container = (T**) new T*[_capacity];
meta_data = new std::atomic[_capacity];
for (int i=0; itrue gets through
MetaDatum comparison_meta_datum = old_meta_datum;
comparison_meta_datum.occupied = false;
MetaDatum new_meta_datum = MetaDatum{old_counter+1, true};
if (!meta_data[old_index].compare_exchange_strong(comparison_meta_datum, new_meta_datum)){
continue;
}
// push the value non-atomically because there is no other thread competing here
container[old_index] = ptr;
// update the back index without CAS because only the thread that succeeded in the above CAS should reach here
back.store((old_index+1)%_capacity);
return true;
}
}
template
T* MPMCQueue::pop(){
// only pop if front index is not back index
while (true){
// get front index
unsigned int old_index = front.load();
MetaDatum old_meta_datum = meta_data[old_index].load();
unsigned int old_counter = old_meta_datum.counter;
// verify that the queue is not empty
if (back.load() == old_index){
return nullptr;
}
// perform a CAS on the array location such that only the thread that succeeds in changing true->false gets through
MetaDatum comparison_meta_datum = old_meta_datum;
comparison_meta_datum.occupied = true;
MetaDatum new_meta_datum = MetaDatum{old_counter+1, false};
if (!meta_data[old_index].compare_exchange_strong(comparison_meta_datum, new_meta_datum)){
continue;
}
// pop the value non-atomically because there is no other thread competing here
T* ptr = container[old_index];
// update the front index without CAS because only the thread that succeeded in the above CAS should reach here
front.store((old_index+1)%_capacity);
return ptr;
}
}
/* Testing */
std::atomic counter{1};
std::atomic sum{0};
std::atomic out_sum{0};
size_t max_counter = 1000;
std::atomic done{false};
void keep_popping(MPMCQueue& MPMCQueue){
int* next_ptr = MPMCQueue.pop();
while (!done || next_ptr!=nullptr){
if (next_ptr == nullptr){
next_ptr = MPMCQueue.pop();
continue;
}
out_sum.fetch_add(*next_ptr);
delete next_ptr;
next_ptr = MPMCQueue.pop();
}
}
void keep_pushing(MPMCQueue& MPMCQueue){
while (true){
size_t old_counter = counter.fetch_add(1);
if (old_counter > max_counter){
done.store(true);
break;
}
sum.fetch_add(old_counter);
int* x = new int(old_counter);
while(!MPMCQueue.try_push(x));
}
}
int main(){
assert(std::atomic{}.is_always_lock_free == true); // must be guaranteed lock free on this system for the ring buffer to work
assert(std::atomic{}.is_always_lock_free == true);
int num_threads = 8;
size_t capacity = 10;
MPMCQueue MPMCQueue{capacity};
std::vector threadpool{};
for (int i=0; i