Wie starte ich mehrere Threads genau gleichzeitig?C++

Programme in C++. Entwicklerforum
Anonymous
 Wie starte ich mehrere Threads genau gleichzeitig?

Post by Anonymous »

Bei der Evaluierung von Thread-Pool-Bibliotheken für Aufgaben mit kurzer Laufzeit ist mir aufgefallen, dass sie alle deutlich schlechter abschneiden als OpenMP. Die Hauptursache scheint darin zu liegen, dass andere Bibliotheken Schwierigkeiten haben, mehrere Threads gleichzeitig zu starten, während OpenMP das irgendwie schafft.
Um das Problem zu veranschaulichen, habe ich ein vereinfachtes parallel_for-Beispiel erstellt. Ich starte 8 Threads und lasse sie dann entweder mit std::condition_variable oder mit einem Spin-Look mit std::atomic warten, bis ihr Start signalisiert wird. Dadurch soll der Overhead beim Starten von Threads ausgeschlossen werden. Start- und Endzeiten pro Thread werden im Speicher protokolliert und dann zur Visualisierung in eine Datei geschrieben. Den gleichen Arbeitsaufwand parallelisiere ich auch mit OpenMP.
Die Ergebnisse sind unten zu sehen. Die Threads beginnen ihre Arbeit nicht zur gleichen Zeit, wenn eine reguläre Sperre oder eine Spin-Sperre verwendet wird, aber alle Threads beginnen ungefähr zur gleichen Zeit, wenn OpenMP verwendet wird.
Ich habe mit g++ -O3 -fopenmp -lm -std=c++20 main.cpp -o main kompiliert und das Experiment auf einer i5-10300H-CPU mit 8 Kernen (4 davon „echt“) ausgeführt.
Image

main.cpp

Code: Select all

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

size_t num_threads = 8;

double sec(){
std::chrono::duration d = std::chrono::high_resolution_clock::now().time_since_epoch();
return d.count();
}

void work(){
volatile double accumulator = 0.0;
for (size_t i = 0; i < 10 * 1000; i++){
accumulator += std::sin(i);
}
}

struct LogItem {
double start, end;
size_t thread_id;
};

void write_log(const std::vector& log, const char* filename) {
FILE* f = fopen(filename, "w");
fprintf(f, "start,end,thread_id\n");
for (const auto& item : log) {
fprintf(f, "%f,%f,%zu\n", item.start, item.end, item.thread_id);
}
fclose(f);
}

void parallel_for_lock(){
std::vector threads;
std::vector log(num_threads);

std::mutex mtx;
std::condition_variable cv;
bool start_flag = false;

for (size_t thread_id = 0; thread_id < num_threads; thread_id++){
threads.emplace_back([thread_id, &log, &mtx, &cv, &start_flag]{
// wait until start
{
std::unique_lock lock(mtx);
cv.wait(lock, [&start_flag]{ return start_flag; });
}
// do work and log the time it takes
double start = sec();
work();
double end = sec();
log[thread_id] = LogItem{start, end, thread_id};
});
}

// (attempt to) start all threads at once
{
std::lock_guard lock(mtx);
start_flag = true;
}
cv.notify_all();

for (auto& thread : threads){
thread.join();
}

write_log(log, "log_lock.csv");
}

void parallel_for_spin_lock(){
std::vector threads;
std::vector log(num_threads);

std::atomic start_flag{false};

for (size_t thread_id = 0; thread_id < num_threads; thread_id++){
threads.emplace_back([thread_id, &log, &start_flag]{
while (!start_flag.load(std::memory_order_acquire));
double start = sec();
work();
double end = sec();
log[thread_id] = LogItem{start, end, thread_id};
});
}

start_flag.store(true, std::memory_order_release);

for (auto& thread : threads){
thread.join();
}

write_log(log, "log_spin_lock.csv");
}

void parallel_for_omp(){
std::vector log(num_threads);

#pragma omp parallel for
for (size_t thread_id = 0; thread_id < num_threads; thread_id++){
double start = sec();
work();
double end = sec();
log[thread_id] = LogItem{start, end, thread_id};
}

write_log(log, "log_omp.csv");
}

int main(){
// run a few times for warmup
for (size_t i = 0; i < 10;  i++){
parallel_for_lock();
parallel_for_spin_lock();
parallel_for_omp();
}

return 0;
}

plot_results.py

Code: Select all

import csv, matplotlib.pyplot as plt

def plot_log(filename):
with open(filename) as f:
rows = list(csv.DictReader(f))

thread_ids = [int(row["thread_id"]) for row in rows]

# convert to ms
start_times = [float(row["start"]) * 1e3 for row in rows]
end_times = [float(row["end"]) * 1e3 for row in rows]

# start time at 0
min_time = min(start_times)
start_times = [s - min_time for s in start_times]
end_times = [e - min_time for e in end_times]

for start, end, tid in zip(start_times, end_times, thread_ids):
plt.barh(tid, end - start, left=start)

plt.xlabel("Time [ms]")
plt.ylabel("Thread ID")
plt.yticks(range(len(thread_ids)))
plt.grid(axis="y", alpha=0.5)
plt.xlim([0, 2])

def main():
plt.figure(figsize=(10, 16))
for i, name in enumerate(["lock", "spin_lock", "omp"], 1):
plt.subplot(3, 1, i)
plot_log(f"log_{name}.csv")
plt.title(name)
plt.tight_layout()
plt.show()

if __name__ == "__main__":
main()

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post