Code: Select all
async def run_task(...):
...
semaphore = asyncio.Semaphore(cfg.concurrency_limit)
async def run_single_sample(task_sample: TaskSample):
async with semaphore:
await run_agent(cfg, task_sample, cfg.output_dir / task.value)
samples = [run_single_sample(task_sample) for task_sample in sliced_samples]
await tqdm.gather(*samples, desc=f"Task: {task.value}")
Während die asynchronen/blockierenden Dinge für die API-Aufrufe sinnvoll sind Ich bin mir nicht sicher, ob es für die CPU-Prozesse Sinn macht. Denn nach meinem Verständnis ist alles immer noch sequentiell. Z.B. Während Beispiel 1 möglicherweise auf einen CPU-Aufruf wartet, beginnt Beispiel 2, aber dann wird Beispiel 1 erneut fortgesetzt.
Wenn ich mir meine htop-Ausgabe anschaue, sieht es so aus, als würde sie 1 CPU zu 100 % nutzen, also dachte ich mir Möglicherweise stellt die CPU aufgrund der lokalen Ausführung des Codes innerhalb der Agentenausführung hier einen Engpass dar.
Daher habe ich versucht, es mit etwas wie ThreadPoolExecutor, asyncio.to_thread, asyncio.run_in_executor, ...
aber anscheinend sind alle für die Ausführung von Synchronisierungsmethoden gedacht.
Wie könnte ich es richtig machen? Oder ist mein Anwendungsfall irgendwie fehlerhaft?