Code: Select all
@Slf4j
@Configuration
@EnableIntegration
public class TcpClientConfig {
@Value("${tcp.server.ip}")
private String ip;
@Value("${tcp.server.port}")
private int port;
@Value("${tcp.active.socket}")
private int activeSocket;
public static final String TCP_DEFAULT_CHANNEL = "tcp-client-channel";
private final TcpClientService tcpClientService;
public TcpClientConfig(TcpClientService tcpClientService) {
this.tcpClientService = tcpClientService;
}
@Bean
public List cachingClientConnectionFactories() {
List factories = new ArrayList();
for (int i=0; i {
if (event instanceof TcpConnectionOpenEvent) {
TcpConnection connection = (TcpConnection) ((TcpConnectionOpenEvent) event).getSource();
log.info("TCP Connection Opened: {}", connection.getConnectionId());
} else if (event instanceof TcpConnectionCloseEvent) {
TcpConnection closeConnection = (TcpConnection) ((TcpConnectionCloseEvent) event).getSource();
log.warn("TCP Connection Closed: {}", closeConnection.getConnectionId());
try {
if (!Thread.currentThread().isInterrupted()) {
log.info(factory.getConnection().getConnectionId());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Connection retrieval interrupted", e);
}
tcpClientService.connect("N");
}
});
CachingClientConnectionFactory cachingFactory = new CachingClientConnectionFactory(factory, activeSocket);
cachingFactory.setLeaveOpen(true);
return cachingFactory;
}
@Bean
@ServiceActivator(inputChannel = TCP_DEFAULT_CHANNEL)
public TcpOutboundGateway tcpOutboundGateway(List cachingClientConnectionFactories) {
TcpOutboundGateway gateway = new TcpOutboundGateway();
FailoverClientConnectionFactory failoverFactory = new FailoverClientConnectionFactory(cachingClientConnectionFactories);
gateway.setConnectionFactory(failoverFactory);
gateway.setRequiresReply(true);
return gateway;
}
}
definiert@Component
@MessagingGateway(defaultRequestChannel = TcpClientConfig.TCP_DEFAULT_CHANNEL)
public interface TcpClientGateway {
Map send(Map request);
}
< /code>
TCPClientService < /strong>
Ich habe eine geplante Aufgabe implementiert, um Ping -Nachrichten zu senden: < /p>
public void init() {
isTaskRunning.set(true);
log.info("TCP Connection init");
int successCount = 0;
try {
while(successCount < activeSocket) {
try {
int socketNumber = successCount + 1;
log.info("Attempting to connect socket #{}", socketNumber);
Future future = executorService.submit(() -> connect("N"));
boolean connected = future.get(3, TimeUnit.SECONDS);
if (connected) {
log.info("Successfully connected socket #{}", socketNumber);
successCount++;
} else {
log.error("Failed to connect socket #{}", socketNumber);
}
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Error while connecting socket #{}: {}", successCount + 1, e.getMessage());
} finally {
try {
Thread.sleep(1000);
} catch(InterruptedException e) {
log.error("Error while connecting socket #{}: {}", successCount + 1, e.getMessage());
}
}
}
} catch (Exception e) {
log.error("Error during initialization: {}", e.getMessage());
Thread.currentThread().interrupt();
}
isTaskRunning.set(false);
}
public boolean connect(String isReportLine) {
log.info("TCP Connection connect");
try {
Map connect = new LinkedHashMap();
connect.put("BEGIN", "CONNECT");
Map result = clientGateway.send(connect);
if (!"100".equals(result.get("CODE"))) {
log.error("Gateway Connect Fail");
return false;
}
return true;
} catch (Exception e) {
log.error("[CONNECT] message='{}'", e.getMessage());
return false;
}
}
@Scheduled(fixedRate = 30 * 1000)
public void sendPings() {
log.info("### Send Ping ###");
for (Map.Entry entry : activeConnections.entrySet()) {
this.sendPing();
}
}
public void sendPing() {
try {
if (isTaskRunning.get()) {
return;
}
Map ping = new LinkedHashMap();
ping.put("BEGIN", "PING");
Map result = clientGateway.send(ping);
if (!"100".equals(result.get("CODE"))) {
log.error("[PING] error");
return;
}
Thread.sleep(1000);
} catch (Exception e) {
log.error("[PING] err: {}", e.getMessage());
}
}
public void sendMessage() {
try {
if (isTaskRunning.get()) {
return;
}
Map send = new LinkedHashMap();
send.put("BEGIN", "SEND");
Map result = clientGateway.send(send);
if (!"100".equals(result.get("CODE"))) {
log.error("[PING] error");
return;
}
Thread.sleep(1000);
} catch (Exception e) {
log.error("[PING] err: {}", e.getMessage());
}
}
...
< /code>
Das Problem < /strong>
Meine Ping -Implementierung funktioniert und erhält Antworten, aber ich glaube, es verwendet nicht alle Socket -Verbindungen, die ich erstellt habe. Wenn ich Pings über die Clientgateway.send () -Methode sende, scheint es, dass der FailroverClientConnectionFactory immer dieselbe Verbindung auswählt, anstatt alle verfügbaren Verbindungen durchzusetzen. Die Verbindung empfängt alle 30 Sekunden eine Ping -Nachricht, um sie am Leben zu erhalten. Für den normalen Nachrichtenverkehr möchte ich immer noch die Failover -Funktionalität verwenden, um eine verfügbare Verbindung automatisch auszuwählen. Wählen Sie eine der in Schritt 1 erstellten Verbindungen aus, um Nachrichten zu senden und Antworten zu erhalten.>