Pyzmq Asyncio Sub -Socket erhält die letzte Nachricht nicht, selbst wenn der Zusammenhang auf 1 gesetzt istPython

Python-Programme
Guest
 Pyzmq Asyncio Sub -Socket erhält die letzte Nachricht nicht, selbst wenn der Zusammenhang auf 1 gesetzt ist

Post by Guest »

Ich habe einen asynchrischen Pub -Socket wie folgt: < /p>

Code: Select all

import asyncio

import zmq
import zmq.asyncio

async def serve(address: str):
context = zmq.asyncio.Context()
socket = context.socket(zmq.PUB)
socket.setsockopt(zmq.CONFLATE, 1)
socket.bind(address)
count = 0
while True:
try:
print("sending message %d" % count)
await socket.send(b"message %d" % count)
await asyncio.sleep(1)
count += 1
except asyncio.CancelledError:
break
socket.close()
context.term()

if __name__ == "__main__":
asyncio.run(serve("tcp://*:5555"))
< /code>
und eine asynchrische Sub -Socket wie folgt: < /p>
import asyncio
import zmq
import zmq.asyncio

DELAY = 5

async def consume(address: str):
context = zmq.asyncio.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.CONFLATE, 1)
socket.setsockopt(zmq.SUBSCRIBE, b"")
socket.connect(address)
while True:
try:
message = await socket.recv()
print(f"received: {message.decode('utf-8')}")
await asyncio.sleep(DELAY)  # simulating message processing
except asyncio.CancelledError:
break
socket.close()
context.term()

if __name__ == "__main__":
asyncio.run(consume("tcp://localhost:5555"))
< /code>
Beide mit zmq_conflate Socket -Option auf 1, um unnötige Nachrichten zu verhindern, da ich nur die neuesten von ihnen benötige. < /p>
Wenn diese beiden Skripte sind Ausführen, was ich erwarte, ist der nächste Druck des Sub -Skripts als der neueste Druck des Pub -Skripts. Tatsächlich ist das Sub -Skript die Verspätung 
Zeiten unterhalb des Pub -Skripts.
Wenn Sie beispielsweise Dely = 5 berücksichtigen, wenn das Sub -Skript den Socket erreicht hat , aber stattdessen wird empfangen: Nachricht 18 .import time
import zmq
import zmq.asyncio

DELAY = 5

def consume(address: str):
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.CONFLATE, 1)
socket.setsockopt(zmq.SUBSCRIBE, b"")
socket.connect(address)
while True:
try:
message = socket.recv()
print(f"received: {message.decode('utf-8')}")
time.sleep(DELAY)
except KeyboardInterrupt:
break
socket.close()
context.term()

if __name__ == "__main__":
consume("tcp://localhost:5555")
< /code>
Es tut tatsächlich das, was ich erwarte. Um es in eine andere Asyncio-Anwendung zu integrieren und die Verwendung von Threadings zu verhindern. < /p>
Hinweis: < /strong>
Die Tatsache, dass der Pub-Socket asynchron ist Skript und es ändert nichts.

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post