Python Zeromq Parallele Pipeline Mehrere Verbraucher (Arbeitnehmer)
Posted: 06 Sep 2025, 15:38
Hallo zusammen!
Ich möchte Daten aus verschiedenen Python -Programmen über Zeromq kombinieren. Ich denke, für diesen Job wäre die beste Lösung parallele Pipelines, wie in diesem Beispiel im Ponit "3. Parallel Pipeline (Parallel Pipeline -Modus)". Das habe ich gerade die drei Muster -Produzenten , Consumer und resultCollector verwendet, die Sie im Beispiel finden können. Ich habe gerade kleine Änderungen vorgenommen:
Produzent
Consumer_1
Ergebniskollektor
#coding:utf-8
import zmq
context = zmq.Context()
result_receiver = context.socket(zmq.PULL)
result_receiver.connect("tcp://localhost:5599")
result = result_receiver.recv_json()
collected_data = []
for i in (0, 10000):
collected_data.append(result)
print(collected_data)
< /code>
Die Hauptkommunikation zwischen ihnen funktioniert also.
Aber jetzt habe ich versucht, einen anderen Verbraucher (Arbeiter) hinzuzufügen, der Daten liefert. Es sollte möglich sein, wie Sie in dem Beispiel sehen können, das ich verlinkt habe.import random
import zmq
context = zmq.Context()
consumer_id = 10
#receiving work
consumer_receiver = context.socket(zmq.PULL)
consumer_receiver.connect("tcp://localhost:5566")
#forewarding results
consumer_sender = context.socket(zmq.PUSH)
consumer_sender.bind("tcp://*:5599")
msg = consumer_receiver.recv_json()
for i in (0, 100):
if msg == "start working":
data = "id: " + str(consumer_id) + "; World"
consumer_sender.send_json(data)
< /code>
Es ist so ziemlich das gleiche wie Consumer_1.---------------------------------------------------------------------------
ZMQError Traceback (most recent call last)
in
12 #forewarding results
13 consumer_sender = context.socket(zmq.PUSH)
---> 14 consumer_sender.bind("tcp://*:5599")
15
16 msg = consumer_receiver.recv_json()
~\Anaconda3\lib\site-packages\zmq\sugar\socket.py in bind(self, addr)
171
172 """
--> 173 super().bind(addr)
174 return self._bind_cm(addr)
175
zmq/backend/cython/socket.pyx in zmq.backend.cython.socket.Socket.bind()
~\Anaconda3\lib\site-packages\zmq\backend\cython\checkrc.pxd in zmq.backend.cython.checkrc._check_rc()
ZMQError: Address in use
< /code>
Wo ist meine Schuld? Es muss möglich sein, mit verschiedenen Programmen zu einem einzelnen Port zu schieben (wie im Beispiel Port 5599), oder ich irre mich?>
Ich möchte Daten aus verschiedenen Python -Programmen über Zeromq kombinieren. Ich denke, für diesen Job wäre die beste Lösung parallele Pipelines, wie in diesem Beispiel im Ponit "3. Parallel Pipeline (Parallel Pipeline -Modus)". Das habe ich gerade die drei Muster -Produzenten , Consumer und resultCollector verwendet, die Sie im Beispiel finden können. Ich habe gerade kleine Änderungen vorgenommen:
Produzent
Code: Select all
import zmq
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("tcp://*:5566")
work_message = "start working"
socket.send_json(work_message)
Code: Select all
import random
import zmq
context = zmq.Context()
consumer_id = 199
#receiving work
consumer_receiver = context.socket(zmq.PULL)
consumer_receiver.connect("tcp://localhost:5566")
#forewarding results
consumer_sender = context.socket(zmq.PUSH)
consumer_sender.bind("tcp://*:5500")
msg = consumer_receiver.recv_json()
for i in (0, 100):
if msg == "start working":
data = "id: " + str(consumer_id) + "; Hello"
consumer_sender.send_json(data)
#coding:utf-8
import zmq
context = zmq.Context()
result_receiver = context.socket(zmq.PULL)
result_receiver.connect("tcp://localhost:5599")
result = result_receiver.recv_json()
collected_data = []
for i in (0, 10000):
collected_data.append(result)
print(collected_data)
< /code>
Die Hauptkommunikation zwischen ihnen funktioniert also.
Aber jetzt habe ich versucht, einen anderen Verbraucher (Arbeiter) hinzuzufügen, der Daten liefert. Es sollte möglich sein, wie Sie in dem Beispiel sehen können, das ich verlinkt habe.import random
import zmq
context = zmq.Context()
consumer_id = 10
#receiving work
consumer_receiver = context.socket(zmq.PULL)
consumer_receiver.connect("tcp://localhost:5566")
#forewarding results
consumer_sender = context.socket(zmq.PUSH)
consumer_sender.bind("tcp://*:5599")
msg = consumer_receiver.recv_json()
for i in (0, 100):
if msg == "start working":
data = "id: " + str(consumer_id) + "; World"
consumer_sender.send_json(data)
< /code>
Es ist so ziemlich das gleiche wie Consumer_1.---------------------------------------------------------------------------
ZMQError Traceback (most recent call last)
in
12 #forewarding results
13 consumer_sender = context.socket(zmq.PUSH)
---> 14 consumer_sender.bind("tcp://*:5599")
15
16 msg = consumer_receiver.recv_json()
~\Anaconda3\lib\site-packages\zmq\sugar\socket.py in bind(self, addr)
171
172 """
--> 173 super().bind(addr)
174 return self._bind_cm(addr)
175
zmq/backend/cython/socket.pyx in zmq.backend.cython.socket.Socket.bind()
~\Anaconda3\lib\site-packages\zmq\backend\cython\checkrc.pxd in zmq.backend.cython.checkrc._check_rc()
ZMQError: Address in use
< /code>
Wo ist meine Schuld? Es muss möglich sein, mit verschiedenen Programmen zu einem einzelnen Port zu schieben (wie im Beispiel Port 5599), oder ich irre mich?>