In unserem Projekt gibt es eine Reihe von Funktionen in 2 Geschmacksrichtungen: asynchron und synchron. Wir schreiben die asynchrone Version und geben einen Runa () um die Funktion asyncio.run () Funktion:
Code: Select all
async def read_sensor_async():
print("Reading sensor...")
# Actual code
return 9 # Simulate sensor reading
def read_sensor():
return runa(read_sensor_async())
def runa(func):
# Prepare steps...
return asyncio.run(func)
< /code>
Die Dinge scheinen bis zu diesem Punkt in Ordnung zu sein. Wenn wir jedoch read_sensor ()
in einem Jupyter -Notizbuch ausführen (was in einer Ereignisschleife bedeutet). Wir erhalten einen RunTimeError für das Aufrufen von asyncio.run () . Wenn Sie die Dokumentation lesen, ist dies das erwartete Verhalten. Um mit dieser Situation umzugehen, müssen wir Runa sowohl innerhalb als auch außerhalb einer Ereignisschleife vorbereiten. Hier ist unsere Version 2: < /p>
Code: Select all
def runa(func):
# Prepare steps...
with contextlib.suppress(RuntimeError):
return asyncio.run(func)
# Gets here means we are inside an event loop
return await func
Das
Problem mit dieser Version ist, da runa () ein Synchron ist. Wir können nicht erwarten verwenden. Was können wir tun, um Func ?>