Zum Problem verlorener Elemente in der sperrenfreien Warteschlange aus „C++ Concurrency in Action“C++

Programme in C++. Entwicklerforum
Guest
 Zum Problem verlorener Elemente in der sperrenfreien Warteschlange aus „C++ Concurrency in Action“

Post by Guest »

Ich habe Kapitel 7 von C++ Concurrency in Action (2. Auflage) gelesen. Beim Testen der im Buch bereitgestellten Implementierung einer sperrenfreien Warteschlange habe ich gelegentlich eine Situation beobachtet, in der in der Warteschlange eingereihte Elemente verloren gingen. Unten ist die Implementierung der sperrenfreien Warteschlange:

Code: Select all

template 
class lock_free_queue
{
private:
struct node;
struct counted_node_ptr
{
int external_count;
node *ptr;
uint8_t padding[sizeof(void *) - sizeof(int)];
};

std::atomic _head;
std::atomic _tail;

struct node_counter
{
unsigned internal_count : 30;
unsigned external_counters : 2;
};

struct node
{
std::atomic data;
std::atomic count;
std::atomic next;

node()
{
data.store(nullptr);
node_counter new_count;
new_count.internal_count = 0;
new_count.external_counters = 2;
count.store(new_count);
counted_node_ptr new_next{0, nullptr, {0}};
next.store(new_next);
}

void release_ref()
{
node_counter old_counter = count.load(std::memory_order_relaxed);
node_counter new_counter;
do
{
new_counter = old_counter;
--new_counter.internal_count;
} while (!count.compare_exchange_strong(old_counter, new_counter,
std::memory_order_acquire,
std::memory_order_relaxed));
if (!new_counter.internal_count && !new_counter.external_counters)
{
delete this;
}
}
};

static void increase_external_count(std::atomic &counter,
counted_node_ptr &old_counter)
{
counted_node_ptr new_counter;
do
{
new_counter = old_counter;
++new_counter.external_count;
} while (!counter.compare_exchange_strong(old_counter, new_counter,
std::memory_order_acquire,
std::memory_order_relaxed));
old_counter.external_count = new_counter.external_count;
}

static void free_external_counter(counted_node_ptr &old_node_ptr)
{
node *const ptr = old_node_ptr.ptr;
int const count_increase = old_node_ptr.external_count - 2;
node_counter old_counter = ptr->count.load(std::memory_order_relaxed);
node_counter new_counter;
do
{
new_counter = old_counter;
--new_counter.external_counters;
new_counter.internal_count += count_increase;
} while (!ptr->count.compare_exchange_strong(old_counter, new_counter,
std::memory_order_acquire,
std::memory_order_relaxed));
if (!new_counter.internal_count && !new_counter.external_counters)
{
delete ptr;
}
}

void set_new_tail(counted_node_ptr &old_tail,
counted_node_ptr const &new_tail)
{
node *const current_tail_ptr = old_tail.ptr;
while (!_tail.compare_exchange_weak(old_tail, new_tail) && old_tail.ptr == current_tail_ptr)
;
if (old_tail.ptr == current_tail_ptr)
{
free_external_counter(old_tail);
}
else
{
current_tail_ptr->release_ref();
}
}

public:
lock_free_queue()
{
counted_node_ptr dummy{1, new node, {0}};
_head.store(dummy);
_tail.store(dummy);
}

void push(T new_value)
{
std::unique_ptr  new_data(new T(new_value));
counted_node_ptr new_next;
new_next.ptr = new node;
new_next.external_count = 1;
counted_node_ptr old_tail = _tail.load();
for (;;)
{
increase_external_count(_tail, old_tail);
T *old_data = nullptr;
if (old_tail.ptr->data.compare_exchange_strong(
old_data, new_data.get()))
{
counted_node_ptr old_next = {0, nullptr, {0}};
if (!old_tail.ptr->next.compare_exchange_strong(
old_next, new_next))
{
delete new_next.ptr;
new_next = old_next;
}
set_new_tail(old_tail, new_next);
new_data.release();
break;
}
else
{
counted_node_ptr old_next = {0, nullptr, {0}};
if (old_tail.ptr->next.compare_exchange_strong(
old_next, new_next))
{
old_next = new_next;
new_next.ptr = new node;
}
set_new_tail(old_tail, old_next);
}
}
}

std::unique_ptr pop()
{
counted_node_ptr old_head = _head.load(std::memory_order_relaxed);
for (;;)
{
increase_external_count(_head, old_head);
node *const ptr = old_head.ptr;
if (ptr == _tail.load().ptr)
{
ptr->release_ref(); //
return std::unique_ptr();
}
counted_node_ptr next = ptr->next.load();
if (_head.compare_exchange_strong(old_head, next))
{
T *const res = ptr->data.exchange(nullptr);
free_external_counter(old_head);
return std::unique_ptr(res);
}
ptr->release_ref();
}
}
};
Testcode

Code: Select all

void t1_producer(lock_free_queue &queue, int start, int count)
{
for (int i = start; i < start + count; ++i)
{
queue.push(i);
}
}

void t1_consumer(lock_free_queue &queue, std::atomic &sum, int consume_count)
{
for (int i = 0; i < consume_count;)
{
auto value = queue.pop();
if (value)
{
sum += *value;
++i;
}
}
}

int main()
{
for (int i = 0; i < 100; i++)
{
lock_free_queue queue;
std::atomic sum(0);

int num_producers = 4;
int num_consumers = 4;
int items_per_producer = 1000;

std::vector producers;
std::vector consumers;

for (int i = 0; i < num_producers; ++i)
{
producers.emplace_back(t1_producer, std::ref(queue), i * items_per_producer, items_per_producer);
}

for (int i = 0; i < num_consumers; ++i)
{
consumers.emplace_back(t1_consumer, std::ref(queue), std::ref(sum), (num_producers * items_per_producer) / num_consumers);
}

for (auto &t : producers)
{
t.join();
}

for (auto &t : consumers)
{
t.join();
}

int expected_sum = (num_producers * items_per_producer * (items_per_producer * num_producers - 1)) / 2;
std::cout

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post