Entwerfen einer schlossfreien Kreisqueue mit CASC++

Programme in C++. Entwicklerforum
Anonymous
 Entwerfen einer schlossfreien Kreisqueue mit CAS

Post by Anonymous »

Ich habe versucht, eine schlossfreie kreisförmige Warteschlange (Ringpuffer) für einen Multi-Producer, Multi-Consumer (MPMC) -Anutzungsfall zu schreiben. Jetzt, bevor die Hasser von Radreinigung auf mich herabsteigen, möchte ich dies vorab, indem ich meine akademische Neugier bei der Behebung dieses Problems feststellen möchte und nicht nur die gut getestete Warteschlange eines anderen von einem Github Repo.

Code: Select all

front
, zurück sind vom Typ std :: atomic und stellen den Index der Vorder- und Rückseite der kreisförmigen Warteschlange dar.

[*]

Code: Select all

container
ist ein reguläres altes Array von Zeigern der Größe _kapazität , um den Datentyp T Objekte im Speicher zu unterschreiben

[*]

Code: Select all

meta_data
ist ein Array von Typ

Code: Select all

std::atomic
wobei metadatum ein Strukturdatentyp ist wie folgt:

Code: Select all

struct MetaDatum{
bool occupied;
unsigned int counter;
};
[*] Der Zweck von besetzt besteht darin, anzugeben, ob der entsprechende Array -Standort Daten hat oder nicht und der Zweck von Zähler das Problem "ABA" durch Versioning -Änderungen am entsprechenden Array -Ort verhindern. is_always_lock_free und der Assert wird am Anfang von Haupt erstellt, um diese Garantie sicherzustellen.

Code: Select all

_capacity
ist nichtatomisch nicht signiert int , die die Anzahl der Elemente darstellt, die wir zu einem bestimmten Zeitpunkt plus eins unterbringen möchten. Die Auswahl von "Plus One" wird kurz erklärt. Pop und if (zurück+1)%_ Kapazität Equals Front dürfen nicht drücken. Die letztere Eigenart besteht darin, das problematische Szenario der Unterscheidung zwischen einer "vollständig gefüllten" und "vollständig leeren" Warteschlange zu vermeiden, ohne dass eine Größe Variable in beiden Szenarien vorne und zurück zusammenfällt. Stattdessen lassen wir nie einen Thread drücken, wenn der Rückindex eines hinter dem vorderen Index ist. Dies ist auch der Grund, warum wir auch _kapazität auf die Benutzer -Set -Kapazität plus einsetzt werden, um dem Benutzer der Warteschlange den Wert des Geldes zu geben. unendliche Schleife für andere. Dies ist der Teil, den ich trotz umfassender Fehlerbehebung nicht debuggen konnte. Hier ist der vollständige Code: < /p>
'mpmcQueue.h'

Code: Select all

    #include 
#include 
#include 
#include 
#include 
#include 

struct MetaDatum{
unsigned int counter;
bool occupied;
};

template 
struct MPMCQueue{
private:
T** container;
std::atomic* meta_data;
std::atomic front;
std::atomic back;
size_t _capacity;

public:
MPMCQueue(size_t capacity);
bool try_push(T* ptr);
T* pop();
};
< /code>
'mpmcQueue.cpp'
#include "mpmcqueue.h"

/* Public member functions */

template 
MPMCQueue::MPMCQueue(size_t capacity): _capacity(capacity+1), front(0), back(0){
container = (T**) new T*[_capacity];
meta_data = new std::atomic[_capacity];
for (int i=0; itrue gets through
MetaDatum comparison_meta_datum = old_meta_datum;
comparison_meta_datum.occupied = false;
MetaDatum new_meta_datum = MetaDatum{old_counter+1, true};
if (!meta_data[old_index].compare_exchange_strong(comparison_meta_datum, new_meta_datum)){
continue;
}
// push the value non-atomically because there is no other thread competing here
container[old_index] = ptr;
// update the back index without CAS because only the thread that succeeded in the above CAS should reach here
back.store((old_index+1)%_capacity);
return true;
}
}

template 
T* MPMCQueue::pop(){
// only pop if front index is not back index
while (true){
// get front index
unsigned int old_index = front.load();
MetaDatum old_meta_datum = meta_data[old_index].load();
unsigned int old_counter = old_meta_datum.counter;

// verify that the queue is not empty
if (back.load() == old_index){
return nullptr;
}
// perform a CAS on the array location such that only the thread that succeeds in changing true->false gets through
MetaDatum comparison_meta_datum = old_meta_datum;
comparison_meta_datum.occupied = true;
MetaDatum new_meta_datum = MetaDatum{old_counter+1, false};
if (!meta_data[old_index].compare_exchange_strong(comparison_meta_datum, new_meta_datum)){
continue;
}
// pop the value non-atomically because there is no other thread competing here
T* ptr = container[old_index];
// update the front index without CAS because only the thread that succeeded in the above CAS should reach here
front.store((old_index+1)%_capacity);
return ptr;
}
}

/* Testing */

std::atomic counter{1};
std::atomic sum{0};
std::atomic out_sum{0};
size_t max_counter = 1000;
std::atomic done{false};

void keep_popping(MPMCQueue& MPMCQueue){
int* next_ptr = MPMCQueue.pop();
while (!done || next_ptr!=nullptr){
if (next_ptr == nullptr){
next_ptr = MPMCQueue.pop();
continue;
}
out_sum.fetch_add(*next_ptr);
delete next_ptr;
next_ptr = MPMCQueue.pop();
}
}

void keep_pushing(MPMCQueue& MPMCQueue){
while (true){
size_t old_counter = counter.fetch_add(1);
if (old_counter >  max_counter){
done.store(true);
break;
}
sum.fetch_add(old_counter);
int* x = new int(old_counter);
while(!MPMCQueue.try_push(x));
}
}

int main(){
assert(std::atomic{}.is_always_lock_free == true); // must be guaranteed lock free on this system for the ring buffer to work
assert(std::atomic{}.is_always_lock_free == true);
int num_threads = 8;
size_t capacity = 10;
MPMCQueue MPMCQueue{capacity};
std::vector threadpool{};

for (int i=0; i

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post