Microsoft empfiehlt gRPC als Alternative für WCF. In .NET Core ist (bis jetzt) nur noch ein WCF-Client vorhanden. Ein wertvolles Feature von WCF ist die Duplex-Schnittstelle, über welche der Server seine Clients asynchron benachrichtigen kann.
Dieser Beitrag zeigt ein robustes gRPC-Client-Server System mit folgenden Merkmalen:
- Einfacher Self-hosted Server: kein Kestrel, kein IIS.
- Duplex-Verbindung: der Server muss den Client asynchron benachrichtigen können.
- Das System soll Verbindungsunterbrüche vertragen können.
- In der Praxis kann es sein, dass der Client schneller hochgefahren ist als der Server. Der Client muss damit umgehen können.
- Das Beispiel beschreibt eine Peer-to-Peer Verbindung mit einen Server und einem Client.
Das Beispiel
Das Beispiel hat eine full-Duplex Peer-to-Peer Verbindung, also einen Client und einen Server mit asynchroner bidirektionaler Kommunikation. Der Client ruft beim Hochfahren ‘CheckServer()’ vom Server auf. Gibt es eine Exception, dann ist der Server nicht erreichbar und es wird in ‘RetryToConnectMs’ wieder versucht zu verbinden. Wenn der Aufruf erfolgreich war, wird ein Server-Stream geöffnet durch den Aufruf von ‘ServerNotification()’. Der Parameter Zykluszeit definiert, in welchem Abstand der Server den Client informieren soll (im Beispiel alle 5 Sekunden).
Wenn die Verbindung wegfällt, generiert der Server-Stream eine Exception. Der Client räumt den Server-Stream auf und versucht erneut zu verbinden wie beim Aufstarten.
Nach einem Verbindungsunterbruch ist der Server-Stream ungültig, weil es sein kann, dass beim Wiederverbinden der Client mit einem anderen Server verbunden wird (Load-Balancing). Das trifft natürlich bei dieser Peer-to-Peer Verbindung nicht zu.
Der Beispielcode kann hier heruntergeladen werden.
Die proto-Datei
Die Solution ist klassisch aufgeteilt auf je ein Server-, Client- und Contracts-Projekt. Die Protobuf-Datei befindet sich im DuplexWithServerStreamingContracts-Projekt und wird in C# Klassen konvertiert und in eine DLL kompiliert. Die meisten Beispiele im Internet inkludieren die Proto-Dateien vom Server-Projekt als Referenzen ins Client-Projekt. Das hat doppelte Kompilierung zur Folge und führt zu inkonsistenten ‘namespaces’ weil in der proto-Klasse mit ‘csharp_namespace’ der Namensraum der generierten C# Klassen festgelegt wird, diese jedoch beim Server oder Client unterschiedlich sind. Das separate Contracts-Projekt löst dieses Problem.
Die Protobuf-Datei hat folgenden Inhalt.
syntax = "proto3"; option csharp_namespace = "DuplexWithServerStreamingContracts.Protos"; import "google/protobuf/empty.proto"; import "google/protobuf/timestamp.proto"; service DuplexService { rpc CheckServer (google.protobuf.Empty) returns (google.protobuf.Empty); rpc ServerNotification (NotificationIntervalMessage) returns (stream ServerNotificationMessage); } message NotificationIntervalMessage { int32 interval_ms = 1; } message ServerNotificationMessage { google.protobuf.Timestamp server_time = 1; }
Damit die Build-Aktion der proto-Datei auf «Protobuf compiler» gesetzt werden kann, muss zuerst das NuGet-Paket ‘Grpc.AspNetCore’ referenziert werden. Zur Laufzeit wird das Paket aber nicht benötigt. Damit die DLLs nicht in der Ausgabe erscheinen, muss man ‘Exclude Assets’ vom NuGet-Paket auf ‘Runtime’ setzen.
Der Server
Das Beispiel benutzt nicht den IIS- oder Kestrel-Server, sondern den schlanken Server, welcher im Grpc.Core NuGet Paket integriert ist. Das Beispiel braucht nur die NuGet-Paketen ‘Grpc.Core’ und ‘Google.Protobuf’ und den Verweis auf DuplexWithServerStreamingContracts.
Der DuplexService implementiert die zwei Methoden wie folgt:
class DuplexServiceRpc : DuplexService.DuplexServiceBase { public override Task<Empty> CheckServer(Empty request, ServerCallContext context) { return Task.FromResult(new Empty()); } public override async Task ServerNotification(NotificationIntervalMessage request, IServerStreamWriter<ServerNotificationMessage> responseStream, ServerCallContext context) { DateTime nextNotificationTime = DateTime.UtcNow.AddMilliseconds(request.IntervalMs); CancellationToken cancellationToken = context.CancellationToken; do { if (DateTime.UtcNow > nextNotificationTime) { nextNotificationTime = DateTime.UtcNow.AddMilliseconds(request.IntervalMs); await responseStream.WriteAsync( new ServerNotificationMessage { ServerTime = Timestamp.FromDateTime(DateTime.UtcNow) }).ConfigureAwait(false); } await Task.Delay(10, cancellationToken).ConfigureAwait(false); } while (cancellationToken.IsCancellationRequested == false); cancellationToken.ThrowIfCancellationRequested(); } }
Die Methode CheckServer() macht nichts, muss aber trotzdem überladen werden, weil es sonst eine ‘NotImplementedException’ zur Folge hat.
ServerNotification() sendet jede IntervalMs Millisekunden eine ServerNotificationMessage mit der aktuellen Zeit durch den Server-Stream zum Client. Diese Methode simuliert das asynchrone Notifizieren vom Server zum Client. Diese Technik erlaubt es auch, einen Broadcast an alle Clients zu senden. Der Server muss sich dazu den IServerStreamWriterStreams von jedem Client merken und beim Broadcast die Clients einzeln aufrufen. Für ein Beispiel siehe den Post von Damien Bowden.
Weil es egal ist auf welchem Thread der Code nach einem ‘await’ ausgeführt wird, ist an jeden ‘await’ ein ConfigureAwait(false) angehängt. Das spart den Overhead vom Speichern und Zurücksetzen des Synchronization Contextes. Siehe Teil 11 der Serie C# Concurrency für Details.
Der Client
Der Client ist eine einfache Konsolen-Applikation und hat die gleichen Dependencies wie der Server: ‘Google.Protobuf’, ‘Grpc.Core’ und das ‘Contracts’-Projekt.
private const string Host = "localhost"; private const int Port = 8080; private DuplexService.DuplexServiceClient _duplexService; private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); private readonly CancellationToken _cancellationToken; private Task _processServerNotificationTask; public void Start() { Channel channel = new Channel(Host + ":" + Port, ChannelCredentials.Insecure); _duplexService = new DuplexService.DuplexServiceClient(channel); // Task that sets up server stream to allow server to send events to client _processServerNotificationTask = Task.Factory.StartNew(ProcessServerNotification, _cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default);}
Die Methode erstellt den gRPC Client und startet einen Endlos-Thread, welcher die Verbindung kontrolliert und die Benachrichtigungen vom Server verarbeitet. Die Option LongRunning gibt dem Task Scheduler den Hinweis, dass der Thread lange läuft. Die heutige Implementation des Task Schedulers erzeugt in diesem Fall einen dedizierten Thread, statt einen Thread aus dem Threadpool zu verwenden. Siehe Blog C# Concurrency Teil 8.
Heruntergefahren wird der Client wie folgt:
public void Stop() { _cancellationTokenSource.Cancel(); try { _processServerNotificationTask.Wait(1000); } // OperationCanceledException is expected and will be catched and filtered out here... catch (AggregateException ex) { List<Exception> exceptions = ex.Flatten().InnerExceptions.Where(e => !(e is OperationCanceledException)).ToList(); foreach (Exception e in exceptions) { Console.WriteLine(e); } } _cancellationTokenSource?.Dispose(); }
Der Task wird mit dem CancellationToken gestoppt. Wait() blockiert, bis der Task beendet ist. Die erwartete OperationCanceledException wird ignoriert und der TokenSource disposed.
Kommen wir nun zum Herz der Applikation: ProcessServerNotification().
private const int RetryToConnectMs = 10000; ... private void ProcessServerNotification() { AsyncServerStreamingCall<ServerNotificationMessage> serverStreamingCall = null; NotificationIntervalMessage msg = new NotificationIntervalMessage { IntervalMs = 5000 // Tell the server to send its time every 5 seconds }; do { if (serverStreamingCall == null) { try { _duplexService.CheckServer(new Empty()); serverStreamingCall = _duplexService.ServerNotification( msg, null, null, _cancellationToken); Console.WriteLine("Server is online."); continue; } catch { Console.WriteLine("Failed to connect to server..."); } Task.Delay(RetryToConnectMs, _cancellationToken).Wait(_cancellationToken); } else { try { // Did the server send a response (MoveNext() returns true)? Task<bool> moveNextTask = serverStreamingCall.ResponseStream.MoveNext(); moveNextTask.Wait(_cancellationToken); if (moveNextTask.Result) { DateTime serverDateTime = serverStreamingCall.ResponseStream.Current.ServerTime.ToDateTime(); Console.WriteLine( $"Received server notification message. Server time is {serverDateTime}."); } } catch (OperationCanceledException) { throw; } catch { serverStreamingCall.Dispose(); serverStreamingCall = null; Console.WriteLine("Server is offline."); } } // Put task to cancelled if cancellation token is set _cancellationToken.ThrowIfCancellationRequested(); } while (true); }
Beim Aufstarten ist ‘serverStreamingCall’ ‘null’ und ‘CheckServer()’ wird aufgerufen, um zu prüfen, ob der Server online ist. Die Methode wirft eine Exception, wenn der Server nicht erreichbar ist. Es wird dann ‘RetryToConnectMs’ Millisekunden gewartet, bevor wieder versucht wird, den Server zu erreichen. Wenn die Verbindung in Ordnung ist, wird ‘ServerNotification()’ einmal aufgerufen, um den Server-Stream aufzusetzen.
Leider wirft ‘ServerNotification()’ keine Exception wenn der Server nicht erreichbar ist, sonst könnte man komplett auf ‘CheckServer()’ verzichten.
ResponseStream.MoveNext() vom zurückgegebenen Stream liefert ‘Task<bool>’ zurück. Mit ‘Wait(_cancellationToken)’ wird blockierend auf das Resultat gewartet, welches sich danach in der Eigenschaft Current befindet. Wenn die Verbindung zum Server unterbrochen wird gibt es eine RpcException ‘stream removed’ worauf der Server-Stream aufgeräumt wird und das Ganze von vorne anfängt.
Es wird im Client bewusst auf async\await verzichtet, weil das Ganze auf einem dedizierten Thread läuft. Mit async\await würde einen extra Thread aus dem Threadpool verschwendet werden. Siehe Teil 11 der Serie C# Concurrency für Details.
Fazit
Das Beispiel zeigt wie man mit 2 Methoden ein robustes Server-Client-System mit Duplex-Schnittstelle erstellen kann. Es wird der einfache Server von gRpc.Core benutzt und nur die Designtime Features von ASP.NET Core: die ASP.NET Core DLLs werden nicht ins Ausgabe-Verzeichnis kopiert.
Im nächsten Teil wird die Kommunikation zwischen Client und Server abgesichert mit selbst-signierten Client-Zertifikaten. Diese Stufe der Absicherung ist zureichend, wenn nicht von aussen auf die Systeme zugegriffen wird.
Pingback: Noser Blog » gRPC Tutorial Teil 4: Client-Zertifikat Authentifizierung
Pingback: Noser Blog » gRPC Tutorial Teil 2: Streaming mit gRPC
Pingback: Noser Blog » gRPC Tutorial Teil 4: HTTP/2 über HTTPS