Python Zeromq Parallele Pipeline Mehrere Verbraucher (Arbeitnehmer)Python

Python-Programme
Guest
 Python Zeromq Parallele Pipeline Mehrere Verbraucher (Arbeitnehmer)

Post by Guest »

Hallo zusammen!
Ich möchte Daten aus verschiedenen Python -Programmen über Zeromq kombinieren. Ich denke, für diesen Job wäre die beste Lösung parallele Pipelines, wie in diesem Beispiel im Ponit "3. Parallel Pipeline (Parallel Pipeline -Modus)". Das habe ich gerade die drei Muster -Produzenten , Consumer und resultCollector verwendet, die Sie im Beispiel finden können. Ich habe gerade kleine Änderungen vorgenommen:
Produzent

Code: Select all

import zmq

context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("tcp://*:5566")

work_message = "start working"
socket.send_json(work_message)
Consumer_1

Code: Select all

import random
import zmq

context = zmq.Context()
consumer_id = 199

#receiving work
consumer_receiver = context.socket(zmq.PULL)
consumer_receiver.connect("tcp://localhost:5566")

#forewarding results
consumer_sender = context.socket(zmq.PUSH)
consumer_sender.bind("tcp://*:5500")

msg = consumer_receiver.recv_json()

for i in (0, 100):
if msg == "start working":
data = "id: " + str(consumer_id) + "; Hello"
consumer_sender.send_json(data)
Ergebniskollektor
#coding:utf-8

import zmq

context = zmq.Context()
result_receiver = context.socket(zmq.PULL)
result_receiver.connect("tcp://localhost:5599")

result = result_receiver.recv_json()

collected_data = []

for i in (0, 10000):
collected_data.append(result)

print(collected_data)
< /code>
Die Hauptkommunikation zwischen ihnen funktioniert also.
Aber jetzt habe ich versucht, einen anderen Verbraucher (Arbeiter) hinzuzufügen, der Daten liefert. Es sollte möglich sein, wie Sie in dem Beispiel sehen können, das ich verlinkt habe.import random
import zmq

context = zmq.Context()
consumer_id = 10

#receiving work
consumer_receiver = context.socket(zmq.PULL)
consumer_receiver.connect("tcp://localhost:5566")

#forewarding results
consumer_sender = context.socket(zmq.PUSH)
consumer_sender.bind("tcp://*:5599")

msg = consumer_receiver.recv_json()

for i in (0, 100):
if msg == "start working":
data = "id: " + str(consumer_id) + "; World"
consumer_sender.send_json(data)
< /code>
Es ist so ziemlich das gleiche wie Consumer_1.---------------------------------------------------------------------------
ZMQError Traceback (most recent call last)
in
12 #forewarding results
13 consumer_sender = context.socket(zmq.PUSH)
---> 14 consumer_sender.bind("tcp://*:5599")
15
16 msg = consumer_receiver.recv_json()

~\Anaconda3\lib\site-packages\zmq\sugar\socket.py in bind(self, addr)
171
172 """
--> 173 super().bind(addr)
174 return self._bind_cm(addr)
175

zmq/backend/cython/socket.pyx in zmq.backend.cython.socket.Socket.bind()

~\Anaconda3\lib\site-packages\zmq\backend\cython\checkrc.pxd in zmq.backend.cython.checkrc._check_rc()

ZMQError: Address in use
< /code>
Wo ist meine Schuld? Es muss möglich sein, mit verschiedenen Programmen zu einem einzelnen Port zu schieben (wie im Beispiel Port 5599), oder ich irre mich?>

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post