Code: Select all
from multiprocessing import Process, Queue
def f(q):
q.put('X' * 1000000)
print("f exits")
if __name__ == '__main__':
queue = Queue()
p = Process(target=f, args=(queue,))
p.start()
p.join() # this deadlocks
obj = queue.get()
Das erste, was ich nicht verstehe, ist: Warum gibt es eigentlich einen Deadlock?
Mein Verständnis (wahrscheinlich falsch) ist, dass, wenn die Zielfunktion f beendet wird, da wir q.cancel_join_thread() nicht aufgerufen haben, dann der Hintergrundthread dies tun wird verknüpft werden (d. h. q.join_thread() wird aufgerufen), damit der verbleibende gepufferte Inhalt in die eigentliche gemeinsame Warteschlange gestellt werden kann. Wenn dies erledigt ist, sollte der Prozess beendet werden und p.join() sollte zurückkehren.
Wenn wir andererseits das große Objekt „X“ * 1000000 durch ein kleineres „X“ * 100 ersetzen, wird es nicht blockiert. Warum macht die Größe des Put-Objekts den Unterschied im Blockierungsverhalten?
Ich weiß, dass das Vertauschen der letzten beiden Zeilen den Deadlock beseitigt. Aber ich würde erwarten, dass queue.get etwas im gemeinsam genutzten Objekt erstellt (Sachen abruft) und dass q.put() nicht direkt auf dieses gemeinsam genutzte Objekt (Warteschlange) einwirkt, sondern etwas in einen Puffer legt und der Hintergrundthread dafür sorgt, dass die Sachen aus dem Puffer in das eigentliche gemeinsam genutzte Objekt verschoben werden. Ich könnte eine Behauptung verstehen wie „
Code: Select all
join_thread BEARBEITEN 1
Meine zweite Frage betraf ein irgendwie unzuverlässiges Verhalten von get_nowait, das im folgenden MWE veranschaulicht wird:
Code: Select all
def f(q: Queue):
q.cancel_join_thread()
q.put('X' * 1000000)
print("f exits")
if __name__ == '__main__':
queue = Queue()
p = Process(target=f, args=(queue,))
p.start()
p.join() # this deadlocks
print("AFTER join")
obj = queue.get_nowait()
print("THE END", len(obj))
Vielen Dank an q.cancel_join_thread() am Anfang Mit dem Ziel f darf der untergeordnete Prozess beendet werden, ohne dem Thread beizutreten. Dies ist der Grund, warum die Ausführung des oben genannten MWE mindestens Folgendes ergibt:
Code: Select all
f exits
AFTER join
- entweder „THE END....“. Das bedeutet, dass der Thread das große Objekt übertragen hat und dieses von get_nowait()
- oder einer Ausnahme „Leer“ verbraucht wurde, was bedeutet, dass das große Objekt nicht in die Warteschlange übertragen wurde
Das ist wahrscheinlich der Grund für das scheinbar zufällige Verhalten, oder?
Meine letzte Frage lautet:
In der Python-Multiprocessing-Dokumentation wird empfohlen, mp.manager.queue() zu verwenden, um eine Warteschlange zu erstellen, die zwischen Prozessen gemeinsam genutzt werden soll. In der Tat, wenn ich meinen Code durch Folgendes ersetze:
Code: Select all
def f(q: Queue):
q.put('X' * 1000000)
print("f exits")
if __name__ == '__main__':
# queue = Queue()
man = Manager()
queue = man.Queue()
p = Process(target=f, args=(queue,))
p.start()
p.join()
print("AFTER join")
time.sleep(1) # even 1 second after child process exited
obj = queue.get_nowait()
print("THE END", len(obj))
Mobile version