Code: Select all
allListeners.forEach(listener -> {
// skip registering if already present
if(activeListenerIds.contains(listener.getId())) {
return;
}
// register new listener
SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
endpoint.setId(listener.getId().toString());
endpoint.setDestination(listener.getQueue());
endpoint.setMessageListener(message -> {
try {
restTemplate.postForObject(listener.getCallbackUrl(), message.getBody(String.class), Void.class);
message.acknowledge();
logger.debug("message {} acknowledged", message.getJMSMessageID());
} catch (JMSException e) {
logger.error("Could not retrieve message contents or failed in acknowledgement", e);
} catch (RestClientResponseException e) {
logger.error("Callback to {} failed with statusCode {}", listener.getCallbackUrl(), e.getRawStatusCode());
}
});
registrar.registerEndpoint(endpoint);
activeListenerIds.add(listener.getId());
});
Code: Select all
public ConnectionFactory jmsConnectionFactory() throws JMSException {
TransportConfiguration[] jmsTransportConfigurations = Optional.of(applicationProperties.getBrokerUrl())
.orElse(Collections.emptyList())
.stream()
.map(brokerUrl -> {
String[] splitHostAndPort = brokerUrl.split(":");
return new TransportConfiguration(NettyConnectorFactory.class.getName(), Map.of(
TransportConstants.HOST_PROP_NAME, splitHostAndPort[0],
TransportConstants.PORT_PROP_NAME, splitHostAndPort[1],
TransportConstants.HTTP_ENABLED_PROP_NAME, applicationProperties.isHttpEnabled(),
TransportConstants.SSL_ENABLED_PROP_NAME, applicationProperties.isSslEnabled()
));
}).toArray(TransportConfiguration[]::new);
return ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.CF, jmsTransportConfigurations);
}
@Bean
public CachingConnectionFactory cachingConnectionFactory() throws JMSException {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(jmsConnectionFactory());
cachingConnectionFactory.setSessionCacheSize(10);
return cachingConnectionFactory;
}
@Bean
public DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory(CachingConnectionFactory cachingConnectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(cachingConnectionFactory);
factory.setConcurrency("5");
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
factory.setErrorHandler(error -> logger.error("Error in listener: {}", error.getMessage(), error));
factory.setSessionTransacted(true);
return factory;
}
Code: Select all
Configuration embeddedActiveMqServerConfig = new ConfigurationImpl();
embeddedActiveMqServerConfig//.addAcceptorConfiguration("in-vm", "vm://0")
.addAcceptorConfiguration("tcp", "tcp://0.0.0.0:61616?httpEnabled=true&reconnectAttempts=5&retryInterval=2000&maximumConnections=200&connectionTTL=60000")
.addQueueConfiguration(new QueueConfiguration("test.queue.0"))
.addQueueConfiguration(new QueueConfiguration("test.queue.1"))
.setSecurityEnabled(false);
embeddedActiveMqServer = new EmbeddedActiveMQ();
embeddedActiveMqServer.setConfiguration(embeddedActiveMqServerConfig);
embeddedActiveMqServer.start();
Code: Select all
15:47:42.056 [1-1] INFO o.s.j.c.CachingConnectionFactory - Encountered a JMSException - resetting the underlying JMS Connection
javax.jms.IllegalStateException: AMQ219017: Consumer is closed
at org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.checkClosed(ClientConsumerImpl.java:950)
at org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.receive(ClientConsumerImpl.java:197)
at org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.receive(ClientConsumerImpl.java:381)
at org.apache.activemq.artemis.jms.client.ActiveMQMessageConsumer.getMessage(ActiveMQMessageConsumer.java:209)
at org.apache.activemq.artemis.jms.client.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:134)
at org.springframework.jms.connection.CachedMessageConsumer.receive(CachedMessageConsumer.java:86)
at org.springframework.jms.support.destination.JmsDestinationAccessor.receiveFromConsumer(JmsDestinationAccessor.java:132)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:431)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:316)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:270)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1237)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1227)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1120)
at java.base/java.lang.Thread.run(Thread.java:1589)
Caused by: org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException: AMQ219017: Consumer is closed
... 14 common frames omitted
15:47:42.056 [1-1] WARN o.s.j.l.DefaultMessageListenerContainer - Setup of JMS message listener invoker failed for destination 'test.queue.0' - trying to recover. Cause: AMQ219017: Consumer is closed
Jeder Vorschlag wäre eine große Hilfe