Fanout-Muster für MQTT-Nachrichten mit RabbitMQ implementieren.
IoT-Daten -> MQTT-Austausch -> RabbitMq-Stream -> mehrere Verbraucher

RabbitMQ läuft lokal so (5552 ist für Stream, 1883 für MQTT):
Code: Select all
docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672 -p 5672:5672 -p 1883:1883 \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:4-management
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream rabbitmq_stream_management
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_mqtt
Code: Select all
# publish
❯ mosquitto_pub -h localhost -p 1883 -t test_mqtt -m "Hello, MQTT" -u guest -P guest
# subscribe
❯ mosquitto_sub -h localhost -p 1883 -t test_mqtt -u guest -P guest
Hello, MQTT
Stream mit 3 Nachrichten, die über den obigen mosquitto_pub-Befehl darin veröffentlicht wurden:

Und so abonniere ich den Stream mit dem Rstream-Client aus dem offiziellen Rabbitmq-Tutorial:
Code: Select all
import asyncio
from rstream import (
AMQPMessage,
Consumer,
ConsumerOffsetSpecification,
MessageContext,
OffsetType,
)
STREAM_NAME = "mqtt_stream"
# 5GB
STREAM_RETENTION = 5000000000
async def receive():
async with Consumer(host="localhost", username="guest", password="guest") as consumer:
async def on_message(msg: AMQPMessage, message_context: MessageContext):
print("Got message: {} from stream {}".format(msg, message_context.stream))
print("Press control + C to close")
await consumer.start()
await consumer.subscribe(
stream=STREAM_NAME,
callback=on_message,
offset_specification=ConsumerOffsetSpecification(OffsetType.LAST, None),
)
try:
await consumer.run()
except (KeyboardInterrupt, asyncio.CancelledError):
print("Closing Consumer...")
return
with asyncio.Runner() as runner:
runner.run(receive())
Code: Select all
❯ python receive.py
Press control + C to close
Got message: b'\x00Sp\xc0\x06\x05B@@AC\x00Sr\xc12\x04\xa3\nx-exchange\xa1\tamq.topic\xa3\rx-routing-key\xa1\ttest_mqtt\x00Su\xa0\x0bHello, MQTT' from stream mqtt_stream
Gibt es eine Möglichkeit, bei der ich keinen benutzerdefinierten Code schreiben muss, um die reine Zeichenfolge (letztendlich wird es JSON sein) aus der Nachricht zu erhalten?
Wenn ich mit rstream im Stream veröffentliche, gibt derselbe Code eine saubere Zeichenfolgennachricht aus...
Mobile version