Ich versuche, einen asynchronen Iterator über einem std :: vektor mit einer festen Zuordnungsgröße zu erstellen (bekannt vor dem Starten eines Threads). Ziel ist es, dass mehrere Hersteller die verfügbaren Daten innerhalb des Vektors inkrementieren, während mehrere Verbraucher die erzeugten Daten sammeln. Insbesondere bin ich besorgt darüber, ob ich korrekt verwende Speicherauftragsmechanismen, um eine ordnungsgemäße Synchronisation zwischen atomarem und nichtatomem Datenzugriff zu gewährleisten. Alle Anleitungen oder Verbesserungen wären sehr geschätzt. Zusätzlich habe ich Erklärungen für Klarheit hinzugefügt. Der Code besteht aus den folgenden Komponenten: < /p>
: Eine thread-sichere Funktion, die von einem einzelnen Produzenten-Thread aufgerufen werden soll (ich bin ziemlich sicher, dass diese Funktion Thread-Safe ist. Nun, ich hoffe es: D) .
: Für mehrere Produzenten beabsichtigt, bin ich jedoch ungewiss, ob meine Implementierung Thread-safe ist. />
Implementiert ein Produzenten-Consumer-Setup, bei dem jeder Produzent in seinem dedizierten async_vector .
die Anzahl von ASync_Vector
Beispiel 2: example_multi_camera_producer_one_shared_buffer
implementiert ein Produzenten-Konsumenten-Setup, bei dem mehrere Produzenten dieselbe Async_vector . Ein gemeinsamer Puffer.
#include
#include
#include
#include
#include
#include
#include
#include
#include
template
struct async_iterator
{
async_iterator() = default;
async_iterator(T capacity) : _capacity(capacity) {}
async_iterator(const async_iterator&) = delete;
async_iterator& operator=(const async_iterator&) = delete;
async_iterator(async_iterator&&) = delete;
async_iterator& operator=(async_iterator&&) = delete;
using value_type = T;
using atomic_value_type = std::atomic;
using optional_value_type = std::optional;
static constexpr value_type ABORT_FLAG = std::numeric_limits::max();
value_type _capacity{0}; // Maximum container size
atomic_value_type _filled_size{0}; // Current container size (dynamic in function of the producer)
atomic_value_type _index{0}; // Current iterator index (used by the consumer)
optional_value_type next()
{
auto _next_index = _index.fetch_add(1, std::memory_order_relaxed);
if (_next_index >= _capacity)
return std::nullopt; // End of iteration
while (true) {
auto _current_size = _filled_size.load(std::memory_order_acquire);
if (_current_size == ABORT_FLAG) return std::nullopt; // Abort check
if (_next_index < _current_size) return _next_index; // The element is available
// Wait for the producer to fill the container
_filled_size.wait(_current_size, std::memory_order_acquire);
}
}
void notify_all() { _filled_size.notify_all(); }
void abort()
{
_filled_size.store(ABORT_FLAG, std::memory_order_release);
_index.store(_capacity, std::memory_order_release);
_filled_size.notify_all();
}
// Check if the iterator is still valid
operator bool() const { return _index.load(std::memory_order_relaxed) < _capacity; }
auto unprocessed_latency() const { return _filled_size.load(std::memory_order_relaxed) - _index.load(std::memory_order_relaxed); }
auto filled_size() const { return _filled_size.load(std::memory_order_relaxed); }
auto remaining_capacity() const { return _capacity - _filled_size.load(std::memory_order_relaxed); }
auto is_aborted() const { return _filled_size.load(std::memory_order_relaxed) == ABORT_FLAG; }
auto is_finished() const { return !(_index.load(std::memory_order_relaxed) < _capacity); }
};
template
struct async_vector : public async_iterator
{
std::vector _data;
async_vector() = default;
async_vector(auto && ... args) : _data(std::forward(args)...) {
_capacity = _data.size();
_filled_size.store(_capacity, std::memory_order_release);
}
async_vector(size_t fixed_size) : _data(fixed_size) {
_capacity = fixed_size;
}
void resize(size_t size)
{
assert((_filled_size == ABORT_FLAG) || (_filled_size >= _capacity && _index >= _capacity) && "Threads are still running on the old vector");
_filled_size.store(0, std::memory_order_release);
_index.store(0, std::memory_order_release);
_data.resize(size);
_data.shrink_to_fit();
_capacity = size;
}
auto capacity() const { assert(_data.size() == _capacity); return _data.size(); }
#ifdef _DEBUG
std::atomic _thread_id;
bool is_same_thread() {
if (_thread_id == std::thread::id{}) {
_thread_id = std::this_thread::get_id();
return true;
}
return (_thread_id == std::this_thread::get_id());
}
#endif
// This function is only safe to call from the same thread to fill the vector
bool emplace_back_by_unique_thread(auto&& elem)
{
#ifdef _DEBUG
assert(is_same_thread() && "emplace_back_by_unique_thread must be called from the same thread");
#endif
auto next_index = _filled_size.load(std::memory_order_relaxed);
if (next_index >= _data.size()) return false; // On abort or complete filled vector
_data[next_index] = std::forward(elem);
_filled_size.fetch_add(1, std::memory_order_release);
_filled_size.notify_all();
return true;
}
// Can be called from multiple threads
bool pop_front(T& elem)
{
if (auto next_index_opt = next())
{
elem = _data[*next_index_opt];
return true;
}
return false; // Abort or empty queue
}
// Well, i am not sure if this is thread safe.
// It is safe to call this function from multiple threads ?
bool emplace_back(auto&& elem)
{
auto next_index = _filled_size.fetch_add(1, std::memory_order_acquire);
// If the vector is full or aborted, revert the increment and return false
if (next_index >= _data.size()) {
_filled_size.fetch_sub(1, std::memory_order_release);
return false;
}
_data[next_index] = std::forward(elem);
std::atomic_thread_fence(std::memory_order_release); // Ensure the data is written before the size is incremented
_filled_size.notify_all(); // Notify all consumers
return true;
}
};
// Variables used inside examples
//--------------------------------
static constexpr int camera_count = 4; // Number of cameras to simulate as producers
static constexpr int frame_count = 25; // Number of frames to store per camera
static constexpr int camera_ti = 50; // camera interval in ms, i.e. camera integration time
static constexpr int consumer_by_camera = 3; // Number of consumers per camera (the consumer processing time is 3 times the camera integration time)
static constexpr int consumer_count = camera_count * consumer_by_camera; // Number of consumers to simulate as executors
static constexpr int consumer_processing_time = camera_ti * consumer_by_camera; // Consumer processing time in ms (3 times the camera integration time)
using mat = std::string; // For example to avoid OpenCV dependency
using async_buffer = async_vector;
// Example 1
//--------------------------------
// The number of producers (cameras) is the same as the number of buffers (async_vector).
void example_one_camera_producer_per_buffer()
{
std::vector vFrameBuffer(camera_count); // Async vector of buffers, one per camera
for(auto & buffer : vFrameBuffer) buffer.resize(frame_count); // Resize each buffer with the frame count
// Producer function to simulate camera frame acquisition and store them in the async vector buffer
auto producer = [&](int buffer_id, int camera_id)
{
for(int i = 0; i < frame_count; ++i)
{
std::this_thread::sleep_for(std::chrono::milliseconds(camera_ti));
auto frame = std::format("Hello from camera {} - Frame {}", camera_id, i);
if (!vFrameBuffer[buffer_id].emplace_back_by_unique_thread(std::move(frame))) // Always the same thread for the same camera
break; // Queue is full, or abort the producer
}
};
std::atomic_int processing_count = 0; // To count the number of frames processed by the consumers
// Consumer function to simulate frame processing
auto consumer = [&](int consumer_id, int buffer_id)
{
auto & async_buffer = vFrameBuffer[buffer_id];
mat frame;
while(async_buffer.pop_front(frame)) {
++processing_count;
std::this_thread::sleep_for(std::chrono::milliseconds(consumer_processing_time));
std::osyncstream(std::cout)
Ich versuche, einen asynchronen Iterator über einem std :: vektor mit einer festen Zuordnungsgröße zu erstellen (bekannt vor dem Starten eines Threads). Ziel ist es, dass mehrere Hersteller die verfügbaren Daten innerhalb des Vektors inkrementieren, während mehrere Verbraucher die erzeugten Daten sammeln. Insbesondere bin ich besorgt darüber, ob ich korrekt verwende Speicherauftragsmechanismen, um eine ordnungsgemäße Synchronisation zwischen atomarem und nichtatomem Datenzugriff zu gewährleisten. Alle Anleitungen oder Verbesserungen wären sehr geschätzt. Zusätzlich habe ich Erklärungen für Klarheit hinzugefügt. Der Code besteht aus den folgenden Komponenten: < /p>
[*] [b][code]async_iterator[/code] [/b]: [list] verwaltet einen atomaren Iterator ([code]_index[/code]) über einem vorallozierten Container (z. B. ein nicht leerer Vektor oder ein Array).[code]_filled_size[/code]). [*] Enthält eine Nächste () Funktion zum Inkrementieren von _index , wobei Sie bei Bedarf basierend auf _FILLE_SIZE .[code]async_vector[/code] (abgeleitet von async_iterator ):
Wickelt einen std :: vector mit einem async_iterator .[code]emplace_back_by_unique_thread()[/code]: Eine thread-sichere Funktion, die von einem einzelnen Produzenten-Thread [b] aufgerufen werden soll (ich bin ziemlich sicher, dass diese Funktion Thread-Safe ist. Nun, ich hoffe es: D) [/b]. [*][code]emplace_back()[/code]: Für mehrere Produzenten beabsichtigt, [b] bin ich jedoch ungewiss, ob meine Implementierung Thread-safe ist. /> Implementiert ein Produzenten-Consumer-Setup, bei dem jeder Produzent in seinem dedizierten async_vector . [*] die Anzahl von ASync_Vector [/list] Beispiel 2: example_multi_camera_producer_one_shared_buffer [/b]
implementiert ein Produzenten-Konsumenten-Setup, bei dem mehrere Produzenten dieselbe Async_vector . Ein gemeinsamer Puffer.[code]main()[/code]
Läuft beide Beispiele aus. /> Einblicke oder Empfehlungen wären sehr geschätzt.[code]#include #include #include #include #include #include #include #include #include
value_type _capacity{0}; // Maximum container size atomic_value_type _filled_size{0}; // Current container size (dynamic in function of the producer) atomic_value_type _index{0}; // Current iterator index (used by the consumer)
optional_value_type next() { auto _next_index = _index.fetch_add(1, std::memory_order_relaxed); if (_next_index >= _capacity) return std::nullopt; // End of iteration
while (true) { auto _current_size = _filled_size.load(std::memory_order_acquire);
if (_current_size == ABORT_FLAG) return std::nullopt; // Abort check if (_next_index < _current_size) return _next_index; // The element is available
// Wait for the producer to fill the container _filled_size.wait(_current_size, std::memory_order_acquire); } }
// This function is only safe to call from the same thread to fill the vector bool emplace_back_by_unique_thread(auto&& elem) { #ifdef _DEBUG assert(is_same_thread() && "emplace_back_by_unique_thread must be called from the same thread"); #endif
auto next_index = _filled_size.load(std::memory_order_relaxed); if (next_index >= _data.size()) return false; // On abort or complete filled vector
// Can be called from multiple threads bool pop_front(T& elem) { if (auto next_index_opt = next()) { elem = _data[*next_index_opt]; return true; } return false; // Abort or empty queue }
// Well, i am not sure if this is thread safe. // It is safe to call this function from multiple threads ? bool emplace_back(auto&& elem) { auto next_index = _filled_size.fetch_add(1, std::memory_order_acquire);
// If the vector is full or aborted, revert the increment and return false if (next_index >= _data.size()) { _filled_size.fetch_sub(1, std::memory_order_release); return false; }
_data[next_index] = std::forward(elem); std::atomic_thread_fence(std::memory_order_release); // Ensure the data is written before the size is incremented _filled_size.notify_all(); // Notify all consumers return true; } };
// Variables used inside examples //--------------------------------
static constexpr int camera_count = 4; // Number of cameras to simulate as producers static constexpr int frame_count = 25; // Number of frames to store per camera static constexpr int camera_ti = 50; // camera interval in ms, i.e. camera integration time
static constexpr int consumer_by_camera = 3; // Number of consumers per camera (the consumer processing time is 3 times the camera integration time) static constexpr int consumer_count = camera_count * consumer_by_camera; // Number of consumers to simulate as executors static constexpr int consumer_processing_time = camera_ti * consumer_by_camera; // Consumer processing time in ms (3 times the camera integration time)
using mat = std::string; // For example to avoid OpenCV dependency using async_buffer = async_vector;
// Example 1 //-------------------------------- // The number of producers (cameras) is the same as the number of buffers (async_vector).
void example_one_camera_producer_per_buffer() { std::vector vFrameBuffer(camera_count); // Async vector of buffers, one per camera for(auto & buffer : vFrameBuffer) buffer.resize(frame_count); // Resize each buffer with the frame count
// Producer function to simulate camera frame acquisition and store them in the async vector buffer auto producer = [&](int buffer_id, int camera_id) { for(int i = 0; i < frame_count; ++i) { std::this_thread::sleep_for(std::chrono::milliseconds(camera_ti)); auto frame = std::format("Hello from camera {} - Frame {}", camera_id, i); if (!vFrameBuffer[buffer_id].emplace_back_by_unique_thread(std::move(frame))) // Always the same thread for the same camera break; // Queue is full, or abort the producer } };
std::atomic_int processing_count = 0; // To count the number of frames processed by the consumers
// Consumer function to simulate frame processing auto consumer = [&](int consumer_id, int buffer_id) { auto & async_buffer = vFrameBuffer[buffer_id]; mat frame; while(async_buffer.pop_front(frame)) { ++processing_count; std::this_thread::sleep_for(std::chrono::milliseconds(consumer_processing_time)); std::osyncstream(std::cout)
Ich habe über diese Frage nachgedacht. Das OP dieser Frage wollte dem Ergebnis von Collection .Iterator () einem Iterator zuweisen. Wenn die iterable Schnittstelle so definiert wurde:
interface...
Wenn SubClass SuperClass erweitert, kann Iterator nicht in Iterator umgewandelt werden. Allerdings habe ich die Situation, dass die Schnittstellen verlangen Iterator
Ich mache also ein Tutorial, das einen Teil über asynchrones JavaScript enthält. Ich habe das Gefühl, dass ich es ziemlich gut verstanden habe, aber es enthält diesen Abschnitt, der meiner Meinung...
Ich arbeite derzeit in einem Gruppenprojekt, bei dem wir einen Code -Injektionsangriff auf ein simuliertes intelligentes Gerät, insbesondere eine Glühbirne, simulieren. Die Art und Weise, wie dies...
Ich versuche, den Python aus der Quelle zu kompilieren, da unser Sysadminerium es sauber halten möchte. Das Problem ist, dass, wenn ich Python3.11.11 aus dem Quellcode zusammenstelle, so viele Pakete...