Code: Select all
template
struct Packet{
Queue queue;
atomic cnt;
void send(t: T) {
if self.cnt.load(memory_order::seq_cst) < DISCONNECTED + FUDGE {
throw "Err"
}
this->queue.push(t);
if(this->cnt.fetch_add(1, memory_order:seq_cst) == -1) {
this->take_to_wake().signal();
}
}
T recv(Option deadline) {
try{
auto t = this->try_recv();
return t;
}catch(e){
}
auto [wait_token, signal_token] = blocking::tokens();
if this->decrement(signal_token) == Installed {
wait_token.wait();
}
try{
auto t = this->try_recv();
this->steals.get() -= 1;
return t;
}catch(...){
}
}
fn decrement(SignalToken token) -> StartResult {
unsafe {
assert_eq(this->to_wake.load(memory_order:seq_cst), 0);
auto ptr = token.cast_to_usize();
this->to_wake.store(ptr, memory_order:seq_cst);
auto steals = ptr::replace(self.steals.get(), 0);
auto r = self.cnt.fetch_sub(1 + steals, memory_order:seq_cst);
if(r==DISCONNECTED ) {
this->cnt.store(DISCONNECTED, memory_order:seq_cst);
}else{
assert(n >= 0);
if n - steals to_wake.store(0, memory_order:seq_cst);
drop(SignalToken::cast_from_usize(ptr));
return Abort;
}
}
T try_recv() {
while(true){
try{
auto t = this->queue.pop();
return t;
}catch(e){
if(e=="Empty"){
throw e;
}
if(e=="Inconsistent"){
thread::yield_now();
}
}
}
}
};
Code: Select all
template
struct Node{
std::atomic next;
T value;
Node(T v):value(v),next(){}
};
template
struct Queue {
std::atomic head;
Node* tail;
Queue(){
auto h = new Node{T{}};
head.store(h);
tail = h;
}
void push(T t){
auto node = new Node{t};
auto pre = this->head.exchange(node,std::memory_order::acq_rel);
pre->next.store(node,std::memory_order::release);
}
T pop(){
auto tail = this->tail;
auto next = tail->next.load(std::memory_order::acquire);
if(next){
this->tail = next;
auto ret = next->value;
delete tail;
return ret;
}
if (this->head.load(std::memory_order::acquire) == tail){ // #1
throw "empty";
}else{
throw "Inconsistent";
}
}
};
Angenommen, drei Threads senden die Daten. Gemäß [intro.races] p14
Wenn ein Nebeneffekt Push-Threads, da kein Vorhergehen die Sichtbarkeit erzwingt; das heißt, diese Modifikationen nach dem anfänglichen Nebeneffekt in der Mod-Reihenfolge zu sehen. Es ist jedoch garantiert, dass der Anfangswert head und sein Feld, das als nächstes null sein soll, sichtbar sind.
Gemäß [atomic.order] p10
Atomere Lese-, Änderungs- und Schreiboperationen müssen immer den letzten Wert (in der Änderungsreihenfolge) lesen, der vor dem mit der Lese-, Änderungs- und Schreiboperation verknüpften Schreibvorgang geschrieben wurde.
Wenn einer RMW davon liest den Anfangswert 0, andere RMW-Operationen müssen die spätere Modifikation in der Modifikationsreihenfolge lesen und keine zwei RMW-Operationen können dieselbe Modifikation lesen. Alle Operationen auf cnt sind RMW-Operationen; Dadurch wird gewährleistet, dass der Ladeteil des RMW-Vorgangs immer voranschreiten kann und nicht auf einen anderen überspringen kann (d. h. alle Vorgänge werden serialisiert und können sich nicht überlappen). Mit der durch die serialisierten RMW-Operationen (d. h. cnt.fetch_xxx) eingerichteten Synchronisierung ist gewährleistet, dass die Ladevorgänge zu next und head die späteren Werte in ihren Änderungsreihenfolgen sehen, da es Vorher-Beziehungen zwischen Nebenwirkungen zu next oder head und den Ladevorgängen gibt.
Jeder Austausch in push kann eine Synchronisierung mit einem anderen herstellen. Da fetch_add(1,...) jedoch nach dem Aufruf von push sequenziert wird, gibt es durch die Synchronisierung in push für zwei beliebige fetch_adds kein „Vorhergehen vor“ und somit keine Einschränkung für diese fetch_add(1,...)s hinsichtlich ihrer Reihenfolge in der Änderungsreihenfolge; Es kann vorkommen, dass fetch_add, der dem ersten Knoten außer head entspricht, später in seiner Änderungsreihenfolge angeordnet werden kann. Konkret habe ich versucht, diesen Fall im folgenden Bild zu veranschaulichen:

Der erste Pop beginnt bei diesem Kopf und head->next ist null, der cnt.fetch_sub(1) synchronisiert sich mit M1, dies kann nur garantieren, dass node2->next = node3 vor try_recv() passiert, das nach cnt.fetch_sub(1) sequenziert wird. Wenn ich in meinem Bild nicht falsch argumentiert habe, ist head->next = node1 immer noch ungeordnet nach head->next.load(...) in try_recv (d. h. in queue.pop()), jedoch wird self.head.load garantiert, dass node3 aufgrund der durch RMW-Operationen auf cnt hergestellten Synchronisierung angezeigt wird. Daher sollte try_recv in den Zweig „Inconsistent“ wechseln, um eine Schleife zu drehen und auf die Aktualisierung zu warten, auch wenn dies immer noch nicht garantieren kann, dass der neue Wert angezeigt wird, gemäß [atomics].order] p12
Die Implementierung sollte Atomspeicher für Atomlasten sichtbar machen, und Atomlasten sollten Atomspeicher innerhalb einer angemessenen Zeitspanne beobachten.
Das heißt, die Schleife kann technisch gesehen eine Endlosschleife sein (auch wenn dies in einer realen Implementierung nicht möglich ist).
Nicht nur in diesem Fall, wenn wir die Reihenfolge von M1 und cnt.fetch_sub(1) im Bild vertauschen, können wir in diesem Fall immer noch in einen inkonsistenten Status geraten? Ist meine Argumentation richtig?
Mobile version