So konsumieren Sie Nachrichten, die vom Rabbitmq-MQTT-Plugin über Rabbitmq-Streams weitergeleitet werdenPython

Python-Programme
Anonymous
 So konsumieren Sie Nachrichten, die vom Rabbitmq-MQTT-Plugin über Rabbitmq-Streams weitergeleitet werden

Post by Anonymous »

Ziel:
Fanout-Muster für MQTT-Nachrichten mit RabbitMQ implementieren.
IoT-Daten -> MQTT-Austausch -> RabbitMq-Stream -> mehrere Verbraucher
Image

RabbitMQ läuft lokal so (5552 ist für Stream, 1883 für MQTT):

Code: Select all

docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672 -p 5672:5672 -p 1883:1883  \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:4-management

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream rabbitmq_stream_management
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_mqtt
Ich veröffentliche und verifiziere, dass ich MQTT-Daten wie folgt nutzen kann:

Code: Select all

# publish
❯ mosquitto_pub -h localhost -p 1883 -t test_mqtt -m "Hello, MQTT" -u guest -P guest
# subscribe
❯ mosquitto_sub  -h localhost -p 1883 -t test_mqtt -u guest -P guest
Hello, MQTT
Allerdings möchte ich einen RMQ-Stream abonnieren, nicht MQTT, um die Daten in einem Fanout-Muster zu empfangen.
Stream mit 3 Nachrichten, die über den obigen mosquitto_pub-Befehl darin veröffentlicht wurden:
Image

Und so abonniere ich den Stream mit dem Rstream-Client aus dem offiziellen Rabbitmq-Tutorial:

Code: Select all

import asyncio

from rstream import (
AMQPMessage,
Consumer,
ConsumerOffsetSpecification,
MessageContext,
OffsetType,
)

STREAM_NAME = "mqtt_stream"
# 5GB
STREAM_RETENTION = 5000000000

async def receive():
async with Consumer(host="localhost", username="guest", password="guest") as consumer:

async def on_message(msg: AMQPMessage, message_context: MessageContext):
print("Got message: {} from stream {}".format(msg, message_context.stream))

print("Press control + C to close")
await consumer.start()
await consumer.subscribe(
stream=STREAM_NAME,
callback=on_message,
offset_specification=ConsumerOffsetSpecification(OffsetType.LAST, None),
)
try:
await consumer.run()
except (KeyboardInterrupt, asyncio.CancelledError):
print("Closing Consumer...")
return

with asyncio.Runner() as runner:
runner.run(receive())
Was ich schließlich in der Konsole sehe, ist meine Nachricht, aber begleitet von einem binären Blob:

Code: Select all

❯ python receive.py
Press control + C to close
Got message: b'\x00Sp\xc0\x06\x05B@@AC\x00Sr\xc12\x04\xa3\nx-exchange\xa1\tamq.topic\xa3\rx-routing-key\xa1\ttest_mqtt\x00Su\xa0\x0bHello, MQTT' from stream mqtt_stream
Frage:
Gibt es eine Möglichkeit, bei der ich keinen benutzerdefinierten Code schreiben muss, um die reine Zeichenfolge (letztendlich wird es JSON sein) aus der Nachricht zu erhalten?
Wenn ich mit rstream im Stream veröffentliche, gibt derselbe Code eine saubere Zeichenfolgennachricht aus...

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post