ZeroMQ / NetMQ mit Protocobuf
Im Blog «Tutorial: Protocol Buffers in ASP.NET Core» wird die Anwendung von Protocol Buffers in .NET und Visual Studio erklärt. Protocol Buffers (Protobuf) ist eine Methode zur Serialisierung strukturierter Daten. Sie ist die Schnittstellebeschreibungssprache für gRPC. gRPC ist Google’s Alternative für Microsoft’s WCF und wurde in einer siebenteiligen Blog-Serie auf dieser Plattform ausführlich erklärt.
Wie gRPC ist auch ZeroMQ eine gute Alternative für WCF. ZeroMQ (auch ØMQ, 0MQ oder ZMQ genannt) ist eine leichtgewichtige asynchrone Messaging-Bibliothek, die für den Einsatz in verteilten oder concurrent Anwendungen gedacht ist. Wie die meisten Frameworks arbeitet auch ZeroMQ mit Message Queues (ZeroMQ), verzichtet aber auf einen Message Broker, deshalb der Namen ‘Zero’ (ZeroMQ).
Im ersten Teil dieses Blogs wird kurz ZeroMQ erklärt und die .NET Variante NetMQ. Im zweiten Teil wird eine Client-/Server-Applikation gebaut, wobei Protobuf für die (de-)Serialisierung verwendet wird. Das Beispiel demonstriert, wie der Client Werte im Server schreiben oder lesen kann und wie der Server seine Clients über das Publisch-subscribe Muster informieren kann.
Der Beispielcode kann hier als ZIP-Datei heruntergeladen werden.
ZeroMQ
ZeroMQ wurde entwickelt zwischen 2007 und 2010 und gibt es mittlerweile für fast jede Sprache als Bibliothek. ZeroMQ implementiert verschiedene Message Patterns wovon im Beispiel dieses Blogs nur zwei verwendet werden: Request-reply um zu lesen und zu schreiben und Publish-subscribe damit der Server seine Clients informieren kann, wenn ein Ereignis auftritt.
Der Transport von Daten wird mit einem Connection String definiert. Es gibt folgende Möglichkeiten:
- Inproc – thread zu thread im gleichen Prozess
- Ipc – in-Memory, Interprozess Kommunikation (Linux only)
- Tcp
- Epgm, pgm – multicast Protokoll (out of scope)
Im Gegensatz zu WCF ist es mit ZeroMQ unter Windows nicht möglich «named pipes» zu verwenden.
Für Interprozess Kommunikation muss ausgewichen werden auf TCP mit Localhost.
NetMQ
Die meisten Sprachimplementationen von ZeroMQ kapseln die Basisbibliothek Libzmq. Da Libzmq in C++ geschrieben ist, kann man die C# Implementation von ZeroMQ nicht in .NET Core oder .NET 5/6+ verwenden. Zum Glück gibt es die pure C# Implementation NetMQ welche auch in .NET Core und .NET 5/6+ verwendet werden kann.
Beispiel
Im Beispiel generiert der Server nach einer einstellbaren Intervallzeit (NotificationIntervalMs) einen Event mit der Serverzeit als Payload. Die Intervallzeit wird mit SetNotificationIntervalMessage gesetzt und mit GetNotificationIntervalMessage gelesen.
Protobuf Verträge
Etwas speziell an dieser Lösung ist die Kapselung der Protobuf Berichte. NetMqRequestMessage kapselt alle Request-Berichte und NetMqResponseMessage fügt zu dem Response-Bericht noch einen Status und optionalen Fehlertext hinzu. Der Server informiert seine Clients mit Events, die durch die NetMqEventMessage übermittelt werden.
syntax = "proto3"; option csharp_namespace = "NetMqProtobufContracts.Protos"; import "google/protobuf/any.proto"; import "google/protobuf/timestamp.proto"; enum NetMqResult { NET_MQ_RESULT_UNDEFINED = 0; NET_MQ_RESULT_OK = 1; NET_MQ_RESULT_ERROR = 2; } message NotificationIntervalMessage { int32 interval_ms = 1; } message GetNotificationIntervalMessage { } message SetNotificationIntervalMessage { int32 interval_ms = 1; } message NotificationEventMessage { google.protobuf.Timestamp server_time = 1; } message NetMqEventMessage { google.protobuf.Any data = 1; } message NetMqRequestMessage { google.protobuf.Any data = 1; } message NetMqResponseMessage { NetMqResult result = 1; string error_message = 2; google.protobuf.Any data = 3; }
Der NetMQ Server
Im Beispiel kommunizieren Client und Server über eine TCP-Verbindung. Auf einem Thread wird zyklisch geprüft, ob neue Daten im Response-Socket vorhanden sind und ob es Zeit ist, eine Servernotification zu senden.
Der Server benutzt zwei Sockets:
- Response-Socket – zum Empfangen von Requests und Senden von Responses
- Publisher-Socket – um die Client Events zu senden.
Mit NetMQ wird ein Socket wie folgt kreiert und geöffnet:
private const string ResponseSocketAddress = "tcp://*:5555"; … try { _responseSocket = new ResponseSocket(); _responseSocket.Bind(ResponseSocketAddress); } catch (Exception e) { Log($"Exception while binding response socket. Exception {e}"); return; }
Um das Socket aufzuräumen muss Unbind und Dispose aufgerufen werden:
_responseSocket.Unbind(ResponseSocketAddress); _responseSocket.Dispose(); _responseSocket = null;
Das Empfangen und Senden von Berichten mit NetMQ ist einfach. Leider ist die Deserialisierung des Protobuf-Inhalt etwas kompliziert. SetNotificationIntervalMessage setzt die Eigenschaft NotificationIntervalMs und GetNotificationIntervalMessage bittet den Server, einen Bericht zurückzusenden mit dem Wert der NotificationIntervalMs Eigenschaft.
if (_responseSocket.TryReceiveFrameBytes(_receiveTimeout, out byte[] bytes)) { NetMqRequestMessage requestMsg = new NetMqRequestMessage(); requestMsg.MergeFrom(bytes); string typeName = Any.GetTypeName(requestMsg.Data.TypeUrl); NetMqResponseMessage response = new NetMqResponseMessage(); if (typeName == nameof(SetNotificationIntervalMessage)) { SetNotificationIntervalMessage msg = requestMsg.Data.Unpack<SetNotificationIntervalMessage>(); NotificationIntervalMs = msg.IntervalMs; Log($"SetNotificationIntervalMessage request with IntervalMs={NotificationIntervalMs} received"); response.Result = NetMqResult.Ok; } else if (typeName == nameof(GetNotificationIntervalMessage)) { Log("GetNotificationIntervalMessage request received"); NotificationIntervalMessage notificationIntervalMessage = new NotificationIntervalMessage(); notificationIntervalMessage.IntervalMs = NotificationIntervalMs; response.Result = NetMqResult.Ok; response.Data = Any.Pack(notificationIntervalMessage); } else { response.Result = NetMqResult.Error; response.ErrorMessage = $"Unknown message type '{typeName}' received"; } if (_responseSocket.TrySendFrame(_sendTimeout, response.ToByteArray())) { IsClientConnected = true; } }
Das Senden von Events geht gleich wie das Senden von Responses, nur über das Publisher-Socket:
if (DateTime.UtcNow > nextNotificationTime) { nextNotificationTime = DateTime.UtcNow.AddMilliseconds(NotificationIntervalMs); NotificationEventMessage notificationEventMessage = new NotificationEventMessage() { ServerTime = Timestamp.FromDateTime(DateTime.UtcNow) }; NetMqEventMessage eventMessage = new NetMqEventMessage(); eventMessage.Data = Any.Pack(notificationEventMessage); _publisherSocket.TrySendFrame(_sendTimeout, eventMessage.ToByteArray()); }
Der NetMQ Client
Der Client verwendet ein Request-Socket und Subscriber-Socket. Die SendRequest Methode im Beispiel öffnet jedes Mal einen neuen Socket, welcher mit dem Server über TCP verbunden wird. Nach TrySendFrame folgt TryReceiveFrame, welcher die Antwort in einer NetMqResponseMessage Instanz verpackt.
public NetMqResponseMessage SendRequest(IMessage message) { NetMqResponseMessage netMqResponseMessage = new NetMqResponseMessage(); try { if (_requestSocket == null) { _requestSocket = new RequestSocket(); _requestSocket.Connect(RequestSocketAddress); } if (_requestSocket.TrySendFrame(_sendTimeout, message.ToByteArray())) { if (_requestSocket.TryReceiveFrameBytes(_responseTimeout, out byte[] bytes)) { netMqResponseMessage.MergeFrom(bytes); IsConnected = true; return netMqResponseMessage; } Log($"NetMQ server didn't send response after '{_responseTimeout}'."); } } catch (Exception ex) { Log(ex.ToString()); } _requestSocket?.Disconnect(RequestSocketAddress); _requestSocket?.Dispose(); _requestSocket = null; IsConnected = false; return null; }
Die Methode, um die NotificationInterval Zeit auf dem Server zu setzen sieht dann so aus:
public bool SendSetNotificationIntervalMessage(int intervalMs) { SetNotificationIntervalMessage setNotificationIntervalMessage = new SetNotificationIntervalMessage() { IntervalMs = intervalMs }; NetMqRequestMessage msg = new NetMqRequestMessage(); msg.Data = Any.Pack(setNotificationIntervalMessage); NetMqResponseMessage response = SendRequest(msg); return response?.Result == NetMqResult.Ok; }
Um Events vom Server zu empfangen, wird ein Thread gestartet, der ein Subscriber-Socket zyklisch auf Daten prüft. Bevor Events empfangen werden können, muss man dem Server erst noch mitteilen, an welcher Kategorie der Client interessiert ist. Das Beispiel abonniert sich auf alle Kategorien (SubscribeToAnyTopic).
private void DoWork() { try { _subscriberSocket = new SubscriberSocket(); _subscriberSocket.Connect(SubSocketAddress); _subscriberSocket.SubscribeToAnyTopic(); } catch (Exception e) { Log($"Exception while binding publisher socket. Exception {e}"); return; } try { do { _cancellationToken.ThrowIfCancellationRequested(); NetMqEventMessage eventMessage = new NetMqEventMessage(); if(_subscriberSocket.TryReceiveFrameBytes(_sendTimeout, out byte[] bytes)) { IsConnected = true; eventMessage.MergeFrom(bytes); string typeName = Any.GetTypeName(eventMessage.Data.TypeUrl); if (typeName == nameof(NotificationEventMessage)) { NotificationEventMessage v = eventMessage.Data.Unpack<NotificationEventMessage>(); ServerDateTimeReceivedEvent?.Invoke(this, new EventArgs<DateTime>(v.ServerTime.ToDateTime())); } } } while (true); // Setup listener again and wait for client to connect } catch (ThreadInterruptedException) { // Expected exception: triggered by client code } catch (OperationCanceledException) { // Expected exception: triggered by client code } catch (Exception ex) { Log(ex.ToString()); } finally { _subscriberSocket.Unsubscribe(string.Empty); _subscriberSocket.Disconnect(SubSocketAddress); _subscriberSocket?.Dispose(); _subscriberSocket = null; } }
Fazit
NetMQ ist eine gute Alternative zu WCF. Im Vergleich mit gRPC muss man im Bereich der (De-)Serialisierung ein wenig mehr Handarbeit machen, welche bei gRPC vom Codegenerator übernommen wird. Ein weiterer Nachteil ist, dass NetMQ auf Windows keine Interprocess-Communication unterstützt. WCF verwendet dafür named-pipes und gRPC benutzt dafür Unix Domain Sockets, welche in moderneren Versionen von Windows unterstützt werden.