Das ist mir bei mehreren Projekten passiert, die stark auf asynchroner Parallelität basieren, aber ich konnte es bisher noch nie in einem kleinen Beispiel reproduzieren.
Hier ist ein minimales Beispiel, das schließlich das Einfrieren zeigt:
Code: Select all
import asyncio
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=10)
async def dummy_task(i):
await asyncio.sleep(0.1)
return i
async def batch_eval(trades):
results = await asyncio.gather(*(dummy_task(t) for t in trades))
return results
def sync_batch_eval(trades):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
results = loop.run_until_complete(batch_eval(trades))
loop.close()
return results
async def main():
i = 0
while True:
trades = list(range(10))
res = await asyncio.get_running_loop().run_in_executor(
executor, sync_batch_eval, trades
)
if i % 50 == 0:
print("iteration", i, "ok")
i += 1
await asyncio.sleep(0.05)
asyncio.run(main())
Die CPU-Auslastung sinkt auf nahezu Null, aber der Prozess bleibt bestehen.
Mein echter Code (siehe dieses Repository, Einstiegspunkt hier) folgt einem ähnlichen Muster, bei dem ich eine Funktion wie diese aufrufe:
Code: Select all
def sync_batch_eval(trades, settings, allow_edge):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
async def run_batch():
tasks = [batch_evaluate_trade(trade, settings, allow_edge) for trade in trades]
return await asyncio.gather(*tasks)
results = loop.run_until_complete(run_batch())
loop.close()
return results
Mobile version