AMQ219006: Fehler bei getrennter Kanalverbindung für Spring Boot JMS-Listener
Posted: 18 Jan 2025, 22:59
Ich verwende die Spring JMS-Dokumentation, um Nachrichten-Listener dynamisch basierend auf Warteschlangennamen aus der Datenbank zu erstellen
Hier sind meine Konfigurations-Beans
Ich verwende einen eingebetteten aktiven MQ für meine Tests und er startet erfolgreich mit dem folgenden Snippet im Abschnitt @BeforeAll
Der Produzentencode funktioniert einwandfrei. Doch der Consumer, der die neu erzeugte Nachricht lesen und an eine Drittanbieter-App senden soll, scheitert mit folgendem Fehler in der Konsole
Ich habe alle möglichen Kombinationen ausprobiert und kann nicht herausfinden, was ich falsch mache
Jeder Vorschlag wäre eine große Hilfe
Code: Select all
allListeners.forEach(listener -> {
// skip registering if already present
if(activeListenerIds.contains(listener.getId())) {
return;
}
// register new listener
SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
endpoint.setId(listener.getId().toString());
endpoint.setDestination(listener.getQueue());
endpoint.setMessageListener(message -> {
try {
restTemplate.postForObject(listener.getCallbackUrl(), message.getBody(String.class), Void.class);
message.acknowledge();
logger.debug("message {} acknowledged", message.getJMSMessageID());
} catch (JMSException e) {
logger.error("Could not retrieve message contents or failed in acknowledgement", e);
} catch (RestClientResponseException e) {
logger.error("Callback to {} failed with statusCode {}", listener.getCallbackUrl(), e.getRawStatusCode());
}
});
registrar.registerEndpoint(endpoint);
activeListenerIds.add(listener.getId());
});
Code: Select all
public ConnectionFactory jmsConnectionFactory() throws JMSException {
TransportConfiguration[] jmsTransportConfigurations = Optional.of(applicationProperties.getBrokerUrl())
.orElse(Collections.emptyList())
.stream()
.map(brokerUrl -> {
String[] splitHostAndPort = brokerUrl.split(":");
return new TransportConfiguration(NettyConnectorFactory.class.getName(), Map.of(
TransportConstants.HOST_PROP_NAME, splitHostAndPort[0],
TransportConstants.PORT_PROP_NAME, splitHostAndPort[1],
TransportConstants.HTTP_ENABLED_PROP_NAME, applicationProperties.isHttpEnabled(),
TransportConstants.SSL_ENABLED_PROP_NAME, applicationProperties.isSslEnabled()
));
}).toArray(TransportConfiguration[]::new);
return ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.CF, jmsTransportConfigurations);
}
@Bean
public CachingConnectionFactory cachingConnectionFactory() throws JMSException {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(jmsConnectionFactory());
cachingConnectionFactory.setSessionCacheSize(10);
return cachingConnectionFactory;
}
@Bean
public DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory(CachingConnectionFactory cachingConnectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(cachingConnectionFactory);
factory.setConcurrency("5");
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
factory.setErrorHandler(error -> logger.error("Error in listener: {}", error.getMessage(), error));
factory.setSessionTransacted(true);
return factory;
}
Code: Select all
Configuration embeddedActiveMqServerConfig = new ConfigurationImpl();
embeddedActiveMqServerConfig//.addAcceptorConfiguration("in-vm", "vm://0")
.addAcceptorConfiguration("tcp", "tcp://0.0.0.0:61616?httpEnabled=true&reconnectAttempts=5&retryInterval=2000&maximumConnections=200&connectionTTL=60000")
.addQueueConfiguration(new QueueConfiguration("test.queue.0"))
.addQueueConfiguration(new QueueConfiguration("test.queue.1"))
.setSecurityEnabled(false);
embeddedActiveMqServer = new EmbeddedActiveMQ();
embeddedActiveMqServer.setConfiguration(embeddedActiveMqServerConfig);
embeddedActiveMqServer.start();
Code: Select all
15:47:42.056 [1-1] INFO o.s.j.c.CachingConnectionFactory - Encountered a JMSException - resetting the underlying JMS Connection
javax.jms.IllegalStateException: AMQ219017: Consumer is closed
at org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.checkClosed(ClientConsumerImpl.java:950)
at org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.receive(ClientConsumerImpl.java:197)
at org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.receive(ClientConsumerImpl.java:381)
at org.apache.activemq.artemis.jms.client.ActiveMQMessageConsumer.getMessage(ActiveMQMessageConsumer.java:209)
at org.apache.activemq.artemis.jms.client.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:134)
at org.springframework.jms.connection.CachedMessageConsumer.receive(CachedMessageConsumer.java:86)
at org.springframework.jms.support.destination.JmsDestinationAccessor.receiveFromConsumer(JmsDestinationAccessor.java:132)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:431)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:316)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:270)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1237)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1227)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1120)
at java.base/java.lang.Thread.run(Thread.java:1589)
Caused by: org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException: AMQ219017: Consumer is closed
... 14 common frames omitted
15:47:42.056 [1-1] WARN o.s.j.l.DefaultMessageListenerContainer - Setup of JMS message listener invoker failed for destination 'test.queue.0' - trying to recover. Cause: AMQ219017: Consumer is closed
Jeder Vorschlag wäre eine große Hilfe