Ordnungsgemäße Verwendung von Atomik für einen asynchronen Iterator mit mehreren Herstellern und Verbrauchern in C ++ 20C++

Programme in C++. Entwicklerforum
Anonymous
 Ordnungsgemäße Verwendung von Atomik für einen asynchronen Iterator mit mehreren Herstellern und Verbrauchern in C ++ 20

Post by Anonymous »

Ich versuche, einen asynchronen Iterator über einem std :: vektor mit einer festen Zuordnungsgröße zu erstellen (bekannt vor dem Starten eines Threads). Ziel ist es, dass mehrere Hersteller die verfügbaren Daten innerhalb des Vektors inkrementieren, während mehrere Verbraucher die erzeugten Daten sammeln. Insbesondere bin ich besorgt darüber, ob ich korrekt verwende Speicherauftragsmechanismen, um eine ordnungsgemäße Synchronisation zwischen atomarem und nichtatomem Datenzugriff zu gewährleisten. Alle Anleitungen oder Verbesserungen wären sehr geschätzt. Zusätzlich habe ich Erklärungen für Klarheit hinzugefügt. Der Code besteht aus den folgenden Komponenten: < /p>

[*]

Code: Select all

async_iterator
[/b]:
  • verwaltet einen atomaren Iterator (

    Code: Select all

    _index
    ) über einem vorallozierten Container (z. B. ein nicht leerer Vektor oder ein Array).

    Code: Select all

    _filled_size
    ).
  • Enthält eine Nächste () Funktion zum Inkrementieren von _index , wobei Sie bei Bedarf basierend auf _FILLE_SIZE .

    Code: Select all

    async_vector
    (abgeleitet von async_iterator ):

    Wickelt einen std :: vector mit einem async_iterator .

    Code: Select all

    emplace_back_by_unique_thread()
    : Eine thread-sichere Funktion, die von einem einzelnen Produzenten-Thread aufgerufen werden soll (ich bin ziemlich sicher, dass diese Funktion Thread-Safe ist. Nun, ich hoffe es: D) .
  • Code: Select all

    emplace_back()
    : Für mehrere Produzenten beabsichtigt, bin ich jedoch ungewiss, ob meine Implementierung Thread-safe ist. />
    Implementiert ein Produzenten-Consumer-Setup, bei dem jeder Produzent in seinem dedizierten async_vector .
  • die Anzahl von ASync_Vector
Beispiel 2: example_multi_camera_producer_one_shared_buffer

implementiert ein Produzenten-Konsumenten-Setup, bei dem mehrere Produzenten dieselbe Async_vector . Ein gemeinsamer Puffer.

Code: Select all

main()


Läuft beide Beispiele aus. /> Einblicke oder Empfehlungen wären sehr geschätzt.

Code: Select all

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

template 
struct async_iterator
{
async_iterator() = default;
async_iterator(T capacity) : _capacity(capacity) {}
async_iterator(const async_iterator&) = delete;
async_iterator& operator=(const async_iterator&) = delete;
async_iterator(async_iterator&&) = delete;
async_iterator& operator=(async_iterator&&) = delete;

using value_type = T;
using atomic_value_type = std::atomic;
using optional_value_type = std::optional;

static constexpr value_type ABORT_FLAG = std::numeric_limits::max();

value_type _capacity{0};                  // Maximum container size
atomic_value_type _filled_size{0};        // Current container size (dynamic in function of the producer)
atomic_value_type _index{0};              // Current iterator index (used by the consumer)

optional_value_type next()
{
auto _next_index = _index.fetch_add(1, std::memory_order_relaxed);
if (_next_index >= _capacity)
return std::nullopt; // End of iteration

while (true) {
auto _current_size = _filled_size.load(std::memory_order_acquire);

if (_current_size == ABORT_FLAG) return std::nullopt; // Abort check
if (_next_index < _current_size) return _next_index;  // The element is available

// Wait for the producer to fill the container
_filled_size.wait(_current_size, std::memory_order_acquire);
}
}

void notify_all() { _filled_size.notify_all(); }

void abort()
{
_filled_size.store(ABORT_FLAG, std::memory_order_release);
_index.store(_capacity, std::memory_order_release);
_filled_size.notify_all();
}

// Check if the iterator is still valid
operator bool() const { return _index.load(std::memory_order_relaxed) < _capacity; }

auto unprocessed_latency() const { return _filled_size.load(std::memory_order_relaxed) - _index.load(std::memory_order_relaxed); }
auto filled_size() const { return _filled_size.load(std::memory_order_relaxed); }
auto remaining_capacity() const { return _capacity - _filled_size.load(std::memory_order_relaxed); }
auto is_aborted() const { return _filled_size.load(std::memory_order_relaxed) == ABORT_FLAG; }
auto is_finished() const { return !(_index.load(std::memory_order_relaxed) < _capacity); }
};

template 
struct async_vector : public async_iterator
{
std::vector _data;

async_vector() = default;
async_vector(auto && ...  args) : _data(std::forward(args)...) {
_capacity = _data.size();
_filled_size.store(_capacity, std::memory_order_release);
}

async_vector(size_t fixed_size) : _data(fixed_size) {
_capacity = fixed_size;
}

void resize(size_t size)
{
assert((_filled_size == ABORT_FLAG) || (_filled_size >= _capacity && _index >= _capacity) && "Threads are still running on the old vector");
_filled_size.store(0, std::memory_order_release);
_index.store(0, std::memory_order_release);
_data.resize(size);
_data.shrink_to_fit();
_capacity = size;
}

auto capacity() const {  assert(_data.size() == _capacity); return _data.size(); }

#ifdef _DEBUG
std::atomic _thread_id;

bool is_same_thread() {
if (_thread_id == std::thread::id{}) {
_thread_id = std::this_thread::get_id();
return true;
}
return (_thread_id == std::this_thread::get_id());
}
#endif

// This function is only safe to call from the same thread to fill the vector
bool emplace_back_by_unique_thread(auto&& elem)
{
#ifdef _DEBUG
assert(is_same_thread() && "emplace_back_by_unique_thread must be called from the same thread");
#endif

auto next_index = _filled_size.load(std::memory_order_relaxed);
if (next_index >= _data.size()) return false; // On abort or complete filled vector

_data[next_index] = std::forward(elem);
_filled_size.fetch_add(1, std::memory_order_release);
_filled_size.notify_all();
return true;
}

// Can be called from multiple threads
bool pop_front(T& elem)
{
if (auto next_index_opt = next())
{
elem = _data[*next_index_opt];
return true;
}
return false; // Abort or empty queue
}

// Well, i am not sure if this is thread safe.
// It is safe to call this function from multiple threads ?
bool emplace_back(auto&& elem)
{
auto next_index = _filled_size.fetch_add(1, std::memory_order_acquire);

// If the vector is full or aborted, revert the increment and return false
if (next_index >= _data.size()) {
_filled_size.fetch_sub(1, std::memory_order_release);
return false;
}

_data[next_index] = std::forward(elem);
std::atomic_thread_fence(std::memory_order_release); // Ensure the data is written before the size is incremented
_filled_size.notify_all(); // Notify all consumers
return true;
}
};

// Variables used inside examples
//--------------------------------

static constexpr int camera_count = 4;       // Number of cameras to simulate as producers
static constexpr int frame_count = 25;       // Number of frames to store per camera
static constexpr int camera_ti = 50;         // camera interval in ms, i.e.  camera integration time

static constexpr int consumer_by_camera = 3; // Number of consumers per camera (the consumer processing time is 3 times the camera integration time)
static constexpr int consumer_count = camera_count * consumer_by_camera;         // Number of consumers to simulate as executors
static constexpr int consumer_processing_time = camera_ti * consumer_by_camera;  // Consumer processing time in ms (3 times the camera integration time)

using mat = std::string;  // For example to avoid OpenCV dependency
using async_buffer = async_vector;

// Example 1
//--------------------------------
// The number of producers (cameras) is the same as the number of buffers (async_vector).

void example_one_camera_producer_per_buffer()
{
std::vector vFrameBuffer(camera_count);           // Async vector of buffers, one per camera
for(auto & buffer : vFrameBuffer) buffer.resize(frame_count);   // Resize each buffer with the frame count

// Producer function to simulate camera frame acquisition and store them in the async vector buffer
auto producer = [&](int buffer_id, int camera_id)
{
for(int i = 0; i < frame_count; ++i)
{
std::this_thread::sleep_for(std::chrono::milliseconds(camera_ti));
auto frame = std::format("Hello from camera {} - Frame {}", camera_id, i);
if (!vFrameBuffer[buffer_id].emplace_back_by_unique_thread(std::move(frame))) // Always the same thread for the same camera
break; // Queue is full, or abort the producer
}
};

std::atomic_int processing_count = 0; // To count the number of frames processed by the consumers

// Consumer function to simulate frame processing
auto consumer = [&](int consumer_id, int buffer_id)
{
auto & async_buffer = vFrameBuffer[buffer_id];
mat frame;
while(async_buffer.pop_front(frame)) {
++processing_count;
std::this_thread::sleep_for(std::chrono::milliseconds(consumer_processing_time));
std::osyncstream(std::cout)

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post