Wie implementieren Sie die NetMQ (Zeromq) -Lösung ordnungsgemäß?C#

Ein Treffpunkt für C#-Programmierer
Anonymous
 Wie implementieren Sie die NetMQ (Zeromq) -Lösung ordnungsgemäß?

Post by Anonymous »

Ich verwende Zeromq (NetMQ, neueste Version) in einem Client (WinForms) / Service (WCF) -Setup. Die aktuelle Implementierung verfügt über mehrere Thread -Schleifen für die Meldung und einen Herzschlagmechanismus, ist jedoch unordentlich und verliert manchmal die Verbindung, wahrscheinlich aufgrund einer Gewindesperrung (in Tests nicht reproduzierbar). Dies scheint die Stabilität zu verbessern, und die Reinigung der Ressourcen ist besser mit Anweisungen und netmqconfig.cleanUp (Falsch) und verhindert, dass Ports über Time_wait (2–4 min) hinausgeschlossen werden.public Task EstablishConnection(string endpoint, CancellationToken token)
{
var connectionTask = new TaskCompletionSource();
_logger.Log(LogState.Info, "Initializing request socket.");

Task.Run(() =>
{
try
{
_isShuttingDown = false;
_shutdownSignal = new TaskCompletionSource();
Thread.CurrentThread.Name = this.GetType().Name;

using (_messageQueue = new NetMQQueue())
using (var socket = new RequestSocket(endpoint))
using (_eventPoller = new NetMQPoller { _messageQueue })
using (var eventMonitor = new NetMQMonitor(socket, $"inproc://monitor.client.{Guid.NewGuid()}", SocketEvents.Disconnected | SocketEvents.BindFailed | SocketEvents.AcceptFailed))
{
_messageQueue.ReceiveReady += (s, e) =>
{
ProcessClientRequests(socket);
};

eventMonitor.EventReceived += (s, e) =>
{
string errorMessage = $"RequestSocket encountered an issue: {e.SocketEvent.ToString()}";
_logger.LogException(errorMessage, new Exception(errorMessage));
_eventPoller.Stop();
};

connectionTask.TrySetResult(true);
_isRunning = true;

using (var cancelRegistration = token.Register(() => _eventPoller.Stop()))
{
_eventPoller.Run();
}

lock (_shutdownLock)
{
_isShuttingDown = true;
}
}
}
catch (Exception ex)
{
_logger?.LogException("Client request processing failed", ex, fatal: true);
}
finally
{
_isRunning = false;
_shutdownSignal.TrySetResult(true);

if (!connectionTask.Task.IsCompleted)
connectionTask.SetResult(false);

_logger.Log(LogState.Info, "Request socket shutting down.");
}
}, token);
NetMQConfig.Cleanup(false)
return connectionTask.Task;
}

public void ProcessClientRequests(RequestSocket socket)
{
ProcessQueuedMessages(socket);
LoopCompleted?.Invoke();
}

private void ProcessQueuedMessages(RequestSocket clientSocket)
{
try
{
while (_messageQueue.TryDequeue(out IRequestMessage message, TimeSpan.Zero))
{
try
{
_logger?.Log(LogState.Info, $"Processing queued request: {message.Request.GetType()}, Session ID: {message.SessionId}, Timeout: {message.Timeout}");

var response = clientSocket.SendRequest(message.Request, message.SessionId, message.Timeout);
message.Complete(response);

LastSentMessageTimestamp = DateTime.Now;
Thread.Sleep(10); // Prevents excessive CPU usage
}
catch (Exception ex)
{
_logger?.LogException("Error while sending request.", ex, fatal: true);
message.Fail(ex);
}
}
}
catch (Exception ex)
{
_logger?.LogException("Queued message processing failure", ex, fatal: true);
}
}
< /code>
Hier ist ein Beispiel für das Client -Abonnenten, das das NetMQtimer implementiert, um Abonnenten zu überprüfen: < /p>
public Task EstablishConnection(string endpoint, Guid serverIdentifier, Guid sessionIdentifier, CancellationToken cancellationToken, TimeSpan connectionTimeout, ClientSessionInfo previousSession)
{
_endpoint = endpoint;
_activeServerId = serverIdentifier;
_shutdownSignal = new TaskCompletionSource();
var connectionTask = new TaskCompletionSource();

Task.Run(() =>
{
try
{
_isRunning = true;
using (_subscriberSocket = new SubscriberSocket(endpoint))
{
_subscriberSocket.Subscribe(TopicConstants.Heartbeat);
_subscriberSocket.Subscribe(sessionIdentifier.ToString());
var receivedMessage = _subscriberSocket.RecievePubMessage(connectionTimeout);
if (receivedMessage == null)
{
connectionTask.SetResult(false);
return;
}
LastReceivedMessageTimestamp = DateTime.Now;
connectionTask.SetResult(true);
Task.Run(() => HandleCallbacks(cancellationToken), cancellationToken);
RestorePreviousSession(previousSession);

using (_eventPoller = new NetMQPoller { _subscriberSocket })
using (var monitor = new NetMQMonitor(_subscriberSocket, $"inproc://monitor.client.{Guid.NewGuid()}", SocketEvents.Disconnected | SocketEvents.BindFailed | SocketEvents.AcceptFailed))
{
_subscriberSocket.ReceiveReady += (s, e) =>
{
var message = _subscriberSocket.RecievePubMessage(TimeSpan.Zero);
ProcessClientMessage(message);
};

monitor.EventReceived += (s, e) =>
{
string errorMessage = $"SubscriberSocket encountered an issue: {e.SocketEvent.ToString()}";
_logger.LogException(errorMessage, new Exception(errorMessage));
_eventPoller.Stop();
};

monitor.AttachToPoller(_eventPoller);

var timer = new NetMQTimer(100);
timer.Elapsed += (s, e) =>
{
try
{
if (_subscriberSocket != null)
ManageSubscriptionRequests(_subscriberSocket);
}
catch (Exception ex)
{
_logger.LogException("Subscription request handling failed", ex, fatal: false);
}
};
_eventPoller.Add(timer);

using (var registration = cancellationToken.Register(() =>
{
if (_eventPoller != null && !_eventPoller.IsDisposed)
{
_eventPoller.Stop();
}
}))
{
_eventPoller.Run();
}
}
}
}
catch (Exception ex)
{
_logger.LogException("SubscriberSocketService: Connection failure", ex);
}
finally
{
_isRunning = false;
_shutdownSignal.SetResult(true);
connectionTask.TrySetResult(false);
}
}, cancellationToken);

return connectionTask.Task;
}
< /code>
Das letzte Beispiel Zeigen Sie die Publishersocket des Servers mit einer Heartbeat -Implementierung an. Möglicherweise kann dies durch den NetMQMonitor ersetzt werden? < /p>
public override void Execute(NetMQSocket socket)
{
_lastHeartbeatSent = DateTime.Now;

try
{
_logger?.Log(LogState.Info, "Initializing event poller.");
using (_eventPoller = new NetMQPoller())
{
_logger?.Log(LogState.Info, "Setting up message queue.");
using (_messageQueue = new NetMQQueue())
{
_messageQueue.ReceiveReady += (s, e) => HandleMessageQueue(socket);
_eventPoller.Add(_messageQueue);

var heartbeatTimer = new NetMQTimer(_config.Options.Publisher.HeartbeatInterval);
heartbeatTimer.Elapsed += (s, e) =>
{
if (DateTime.Now - _lastHeartbeatSent >= _config.Options.Publisher.HeartbeatInterval)
{
SendHeartbeat();
}
};
_eventPoller.Add(heartbeatTimer);

try
{
_logger?.Log(LogState.Info, "Starting event poller.");
_eventPoller.Run();
}
finally
{
_shutdownSignal.SetResult(true);
}
}
}
}
catch(Exception ex)
{
_logger?.LogException("Execution error in event loop", ex);
throw;
}
}
< /code>
Ist dies der richtige Ansatz? Kann NetMqMonitor verlorene Verbindungen zuverlässig erkennen oder ist ein manueller Herzschlag für Stabilität noch benötigt? Sicherstellen, dass eine automatische Wiederverbindung von entscheidender Bedeutung ist, wenn eine Trennung auftritt. Ich weiß>

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post