Warum führt das Warten auf einen Async-Generator in asyncio.gather zu nichtdeterministischen Hängen und wie kann ich ihnPython

Python-Programme
Anonymous
 Warum führt das Warten auf einen Async-Generator in asyncio.gather zu nichtdeterministischen Hängen und wie kann ich ihn

Post by Anonymous »

Ich stoße auf ein seltsames Problem mit asyncio, bei dem die Verwendung von asyncio.gather() für Funktionen, die denselben Async-Generator verwenden, dazu führt, dass das Programm manchmal auf unbestimmte Zeit hängen bleibt. Das Verhalten ist nicht deterministisch – manchmal wird es sofort abgeschlossen, manchmal friert es für immer ein.
Hier ist ein Minimalbeispiel:

Code: Select all

import asyncio

async def source():
for i in range(5):
print(f"yielding {i}")
await asyncio.sleep(0.1)
yield i

async def consumer(name, agen):
async for x in agen:
print(f"{name} got {x}")
await asyncio.sleep(0.05)
return f"{name} done"

async def main():
agen = source()

# Run two consumers concurrently on the same async generator
results = await asyncio.gather(
consumer("A", agen),
consumer("B", agen)
)

print("Results:", results)

asyncio.run(main())
Beobachtetes Verhalten:
  • Manchmal erhält nur ein Verbraucher Artikel.
  • Manchmal erhält der zweite Verbraucher nie etwas.
  • Manchmal bleibt das Programm für immer hängen, nachdem es 4 zurückgegeben hat.
  • In seltenen Fällen werden beide Verbraucher teilweise, aber nicht vollständig ausgeführt.
Ich erwartete, dass beide Verbraucher unabhängig voneinander den Generator durchlaufen würden – ähnlich wie zwei normale Funktionen dieselbe Liste durchlaufen können – aber das ist eindeutig nicht der Fall.

Meine Fragen:
  • Warum führt das Warten auf einen asynchronen Generator von mehreren Aufgaben zu nichtdeterministischen Hängen?
  • Welcher genau Teil des asynchronen Iterationsmodells von Python macht dies unsicher oder undefiniert?
  • Gibt es ein korrektes Muster für das „Multicasting“ eines asynchronen Generators an mehrere Verbraucher gleichzeitig, ohne alle Ergebnisse im Speicher zu speichern zuerst?
  • Ist es sicher, den Generator mit etwas wie asyncio.Queue zu umschließen, oder würde das zu Gegendruck-/Reihenfolgeproblemen führen?
  • Was ist die Lösung mit dem geringsten Overhead, die das Streaming-Verhalten beibehält und Deadlocks vermeidet?

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post