In diesem Beispiel sende ich eine Nachricht an das Zielroot/msg/1/data
System.out.println("topic:" + topic + " message:" + new String(message1.getPayload(), StandardCharsets.UTF_8) + " id:" + message1.getId());
MqttSubscription[] mqttSubscriptions = {mqttSubscription1, mqttSubscription2};
IMqttMessageListener[] mqttMessageListeners = {iMqttMessageListener, iMqttMessageListener};
mqttClient.subscribe(mqttSubscriptions, mqttMessageListeners);
// Publish message
MqttMessage message = new MqttMessage("TestMessage".getBytes());
message.setQos(2);
mqttClient.publish("root/msg/1/data", message);
}
}
< /code>
Erwartete Ausgabe < /p>
topic:root/msg/1/data message:TestMessage id:1
topic:root/msg/1/data message:TestMessage id:1
< /code>
Tatsächliche Ausgabe < /p>
topic:root/msg/1/data message:TestMessage id:2
topic:root/msg/1/data message:TestMessage id:2
topic:root/msg/1/data message:TestMessage id:1
topic:root/msg/1/data message:TestMessage id:1
< /code>
Ich hatte erwartet, 1 Nachricht für jedes Abonnement zu erhalten. Vorgeschlagen von @brits in unten Antwort Ich habe das gleiche Beispiel mit Abonnementkennung ausprobiert. Aber ich habe immer noch eine zusätzliche Kopie der Nachricht für jedes überlappende Abonnement erhalten. Es scheint, dass es hauptsächlich von der Broker -Implementierung abhängt, wie sie diesen Satz aus der Spezifikation interpretiert haben. Die Meldung, eine für jedes zusätzliche passende Abonnement und die Respektierung der QoS des Abonnements in jedem Fall. >
artemis - Ausgabe < /li>
< /ol>
root/msg/1/# topic:root/msg/1/data message:TestMessage id:1
root/msg/+/# topic:root/msg/1/data message:TestMessage id:1
root/msg/1/# topic:root/msg/1/data message:TestMessage id:2
root/msg/+/# topic:root/msg/1/data message:TestMessage id:2
< /code>
Mosquitto - Ausgabe < /li>
< /ol>
root/msg/1/# topic:root/msg/1/data message:TestMessage id:1
root/msg/+/# topic:root/msg/1/data message:TestMessage id:2
[/code]
Code mit Abonnementkennung
Code: Select all
public class MqttPahoFinalAsync {
public static void main(String[] args) throws MqttException {
MqttConnectionOptionsBuilder builder = new MqttConnectionOptionsBuilder();
MqttConnectionOptions mqttConnectionOptions = builder.automaticReconnect(true)
.username("user")
.password("password".getBytes())
.cleanStart(true)
.requestReponseInfo(true)
.build();
mqttConnectionOptions.setUseSubscriptionIdentifiers(true);
MqttAsyncClient mqttClient = new MqttAsyncClient("tcp://localhost:1883", "Client-01");
mqttClient.connect(mqttConnectionOptions).waitForCompletion();
// 2 subscriptions, both will match the incoming data
// subscription 1
MqttProperties subProperties1 = new MqttProperties();
subProperties1.setSubscriptionIdentifiers(List.of(0)); // paho forces to have it initialised like this
subProperties1.setSubscriptionIdentifier(1);
MqttSubscription mqttSubscription1 = new MqttSubscription("root/msg/1/#", 2);
IMqttMessageListener iMqttMessageListener1 = (topic, message1) ->
System.out.println("root/msg/1/# topic:" + topic + " message:" + new String(message1.getPayload(), StandardCharsets.UTF_8) + " id:" + message1.getId());
mqttClient.subscribe(mqttSubscription1, null, null, iMqttMessageListener1, subProperties1).waitForCompletion();
// subscription 2
MqttProperties subProperties2 = new MqttProperties();
subProperties2.setSubscriptionIdentifiers(List.of(0)); // paho forces to have it initialised like this
subProperties2.setSubscriptionIdentifier(2);
MqttSubscription mqttSubscription2 = new MqttSubscription("root/msg/+/#", 2);
IMqttMessageListener iMqttMessageListener2 = (topic, message1) ->
System.out.println("root/msg/+/# topic:" + topic + " message:" + new String(message1.getPayload(), StandardCharsets.UTF_8) + " id:" + message1.getId());
mqttClient.subscribe(mqttSubscription2, null, null, iMqttMessageListener2, subProperties2).waitForCompletion();
// Publish message
MqttMessage message = new MqttMessage("TestMessage".getBytes());
message.setQos(2);
mqttClient.publish("root/msg/1/data", message);
}
}