Federintegration Mehrere TCP -Verbindungen Ping, sendenJava

Java-Forum
Anonymous
 Federintegration Mehrere TCP -Verbindungen Ping, senden

Post by Anonymous »

Ich implementiere einen TCP -Client in der Spring -Integration mit mehreren Socket -Verbindungen und muss ein Problem mit der Aufrechterhaltung aller Verbindungen am Leben erhalten.

Code: Select all

@Slf4j
@Configuration
@EnableIntegration
public class TcpClientConfig {

@Value("${tcp.server.ip}")
private String ip;

@Value("${tcp.server.port}")
private int port;

@Value("${tcp.active.socket}")
private int activeSocket;

public static final String TCP_DEFAULT_CHANNEL = "tcp-client-channel";

private final TcpClientService tcpClientService;

public TcpClientConfig(TcpClientService tcpClientService) {
this.tcpClientService = tcpClientService;
}

@Bean
public List cachingClientConnectionFactories() {
List factories = new ArrayList();

for (int i=0; i {
if (event instanceof TcpConnectionOpenEvent) {
TcpConnection connection = (TcpConnection) ((TcpConnectionOpenEvent) event).getSource();
log.info("TCP Connection Opened: {}", connection.getConnectionId());
} else if (event instanceof TcpConnectionCloseEvent) {
TcpConnection closeConnection = (TcpConnection) ((TcpConnectionCloseEvent) event).getSource();
log.warn("TCP Connection Closed: {}", closeConnection.getConnectionId());

try {
if (!Thread.currentThread().isInterrupted()) {
log.info(factory.getConnection().getConnectionId());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Connection retrieval interrupted", e);
}

tcpClientService.connect("N");
}
});

CachingClientConnectionFactory cachingFactory = new CachingClientConnectionFactory(factory, activeSocket);
cachingFactory.setLeaveOpen(true);
return cachingFactory;
}

@Bean
@ServiceActivator(inputChannel = TCP_DEFAULT_CHANNEL)
public TcpOutboundGateway tcpOutboundGateway(List cachingClientConnectionFactories) {
TcpOutboundGateway gateway = new TcpOutboundGateway();

FailoverClientConnectionFactory failoverFactory = new FailoverClientConnectionFactory(cachingClientConnectionFactories);
gateway.setConnectionFactory(failoverFactory);
gateway.setRequiresReply(true);

return gateway;
}

}
Für die Kommunikation habe ich ein Messaging Gateway :
definiert@Component
@MessagingGateway(defaultRequestChannel = TcpClientConfig.TCP_DEFAULT_CHANNEL)
public interface TcpClientGateway {
Map send(Map request);
}
< /code>
TCPClientService < /strong>
Ich habe eine geplante Aufgabe implementiert, um Ping -Nachrichten zu senden: < /p>
public void init() {
isTaskRunning.set(true);

log.info("TCP Connection init");
int successCount = 0;

try {
while(successCount < activeSocket) {
try {
int socketNumber = successCount + 1;
log.info("Attempting to connect socket #{}", socketNumber);

Future future = executorService.submit(() -> connect("N"));

boolean connected = future.get(3, TimeUnit.SECONDS);

if (connected) {
log.info("Successfully connected socket #{}", socketNumber);
successCount++;
} else {
log.error("Failed to connect socket #{}", socketNumber);
}
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Error while connecting socket #{}: {}", successCount + 1, e.getMessage());
} finally {
try {
Thread.sleep(1000);
} catch(InterruptedException e) {
log.error("Error while connecting socket #{}: {}", successCount + 1, e.getMessage());
}
}
}

} catch (Exception e) {
log.error("Error during initialization: {}", e.getMessage());
Thread.currentThread().interrupt();
}

isTaskRunning.set(false);
}

public boolean connect(String isReportLine) {
log.info("TCP Connection connect");

try {
Map connect = new LinkedHashMap();

connect.put("BEGIN", "CONNECT");

Map result = clientGateway.send(connect);

if (!"100".equals(result.get("CODE"))) {
log.error("Gateway Connect Fail");
return false;
}

return true;
} catch (Exception e) {
log.error("[CONNECT] message='{}'", e.getMessage());
return false;
}
}

@Scheduled(fixedRate = 30 * 1000)
public void sendPings() {
log.info("### Send Ping ###");
for (Map.Entry entry : activeConnections.entrySet()) {

this.sendPing();
}
}

public void sendPing() {
try {
if (isTaskRunning.get()) {
return;
}

Map ping = new LinkedHashMap();
ping.put("BEGIN", "PING");

Map result = clientGateway.send(ping);

if (!"100".equals(result.get("CODE"))) {
log.error("[PING] error");
return;
}

Thread.sleep(1000);
} catch (Exception e) {
log.error("[PING] err: {}", e.getMessage());
}
}

public void sendMessage() {
try {
if (isTaskRunning.get()) {
return;
}

Map send = new LinkedHashMap();
send.put("BEGIN", "SEND");

Map result = clientGateway.send(send);

if (!"100".equals(result.get("CODE"))) {
log.error("[PING] error");
return;
}

Thread.sleep(1000);
} catch (Exception e) {
log.error("[PING] err: {}", e.getMessage());
}
}
...

< /code>
Das Problem < /strong>
Meine Ping -Implementierung funktioniert und erhält Antworten, aber ich glaube, es verwendet nicht alle Socket -Verbindungen, die ich erstellt habe. Wenn ich Pings über die Clientgateway.send () -Methode sende, scheint es, dass der FailroverClientConnectionFactory immer dieselbe Verbindung auswählt, anstatt alle verfügbaren Verbindungen durchzusetzen. Die Verbindung empfängt alle 30 Sekunden eine Ping -Nachricht, um sie am Leben zu erhalten. Für den normalen Nachrichtenverkehr möchte ich immer noch die Failover -Funktionalität verwenden, um eine verfügbare Verbindung automatisch auszuwählen. Wählen Sie eine der in Schritt 1 erstellten Verbindungen aus, um Nachrichten zu senden und Antworten zu erhalten.>

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post