Code: Select all
import asyncio
import zmq
import zmq.asyncio
async def serve(address: str):
context = zmq.asyncio.Context()
socket = context.socket(zmq.PUB)
socket.setsockopt(zmq.CONFLATE, 1)
socket.bind(address)
count = 0
while True:
try:
print("sending message %d" % count)
await socket.send(b"message %d" % count)
await asyncio.sleep(1)
count += 1
except asyncio.CancelledError:
break
socket.close()
context.term()
if __name__ == "__main__":
asyncio.run(serve("tcp://*:5555"))
< /code>
und eine asynchrische Sub -Socket wie folgt: < /p>
import asyncio
import zmq
import zmq.asyncio
DELAY = 5
async def consume(address: str):
context = zmq.asyncio.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.CONFLATE, 1)
socket.setsockopt(zmq.SUBSCRIBE, b"")
socket.connect(address)
while True:
try:
message = await socket.recv()
print(f"received: {message.decode('utf-8')}")
await asyncio.sleep(DELAY) # simulating message processing
except asyncio.CancelledError:
break
socket.close()
context.term()
if __name__ == "__main__":
asyncio.run(consume("tcp://localhost:5555"))
< /code>
Beide mit zmq_conflate Socket -Option auf 1, um unnötige Nachrichten zu verhindern, da ich nur die neuesten von ihnen benötige. < /p>
Wenn diese beiden Skripte sind Ausführen, was ich erwarte, ist der nächste Druck des Sub -Skripts als der neueste Druck des Pub -Skripts. Tatsächlich ist das Sub -Skript die Verspätung
Wenn Sie beispielsweise Dely = 5 berücksichtigen, wenn das Sub -Skript den Socket erreicht hat , aber stattdessen wird empfangen: Nachricht 18 .import time
import zmq
import zmq.asyncio
DELAY = 5
def consume(address: str):
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.CONFLATE, 1)
socket.setsockopt(zmq.SUBSCRIBE, b"")
socket.connect(address)
while True:
try:
message = socket.recv()
print(f"received: {message.decode('utf-8')}")
time.sleep(DELAY)
except KeyboardInterrupt:
break
socket.close()
context.term()
if __name__ == "__main__":
consume("tcp://localhost:5555")
< /code>
Es tut tatsächlich das, was ich erwarte. Um es in eine andere Asyncio-Anwendung zu integrieren und die Verwendung von Threadings zu verhindern. < /p>
Hinweis: < /strong>
Die Tatsache, dass der Pub-Socket asynchron ist Skript und es ändert nichts.