gRPC Tutorial Teil 2: Streaming mit gRPC
Dieser Beitrag demonstriert, wie man mit gRPC Streaming grosse Dateien zum Server hoch lädt oder vom Server herunterlädt. gRPC ist entwickelt durch Google, verwendet Protocol Buffers als Schnittstellenbeschreibungssprache und läuft ausschliesslich über HTTP/2. Eine detaillierte Einleitung ist im ersten Teil von diesem Tutorial beschrieben.
gRPC unterstützt ‘Server Streaming’, ‘Client Streaming’ und ‘Bidirectional Streaming’.
Bei Streaming denkt man sofort an Echtzeit-Übertragungen. Das ist aber nicht der einzige Anwendungsfall von Streaming. Man kann Streams auch verwenden um grosse Dateien zu übermitteln, diese zu zerhacken und die Stücke einzeln zu übermitteln (‘Chunking’). Ein anderer Anwendungsfall ist, den Stream offen zu lassen und zur Benachrichtigung des Empfängers zu verwenden. So kann die Duplex-Schnittstelle von WCF mit einem Serverstream nachgebildet werden. Diese Lösung wird im nächsten Teil von dieser Serie erklärt.
Mit bidirektionalem Streaming kann der Server auch Broadcasts machen. Dazu merkt sich der Server den Download-Stream von jedem Client in einer Collection und iteriert bei jedem Broadcast durch die Collection. Im Netzt gibt es ein paar Chat-Beispiele, die diese Technik mit bidirektionalem Streaming verwenden.
Grosse Dateien transferieren
Daten bis 1 kByte kann man als Parameter vom Typ Byte-Array in einem Aufruf übertragen. Grössere Dateien sollten zerhackt und mittels einem Stream übermittelt werden (‘chunking’). gRPC sorgt dafür, dass Reihenfolge und Datenkonsistenz gewährleistet bleiben.
Die optimale Chunk-Grösse liegt zwischen 1kB und 64kB.
Im nächsten Beispiel werden zwei Methoden implementiert: eine, um ein Bild von einer Person zum Server hoch zu laden, und eine, um ein Bild einer Person vom Server herunter zu laden.
Der Quellcode vom Beispiel kann hier heruntergeladen werden.
Die proto-Datei sieht so aus:
syntax = "proto3"; option csharp_namespace = "RpcStreaming.Services"; enum ImageType { IMAGETYPE_UNDEFINED = 0; IMAGETYPE_JPG = 1; IMAGETYPE_PNG = 2; } enum TransferStatus{ TRANSFERSTATUS_UNDEFINED = 0; TRANSFERSTATUS_SUCCESS = 1; TRANSFERSTATUS_FAILURE = 2; TRANSFERSTATUS_INVALID = 3; TRANSFERSTATUS_CANCELLED = 4; } service RpcStreamingService { rpc UploadPersonImage (stream PersonImageMessage) returns (TransferStatusMessage); rpc DownloadPersonImage (PersonMessage) returns (stream PersonImageMessage); } message TransferStatusMessage { string message = 1; TransferStatus status = 2; } message PersonImageMessage { int32 personId = 1; TransferStatusMessage transferStatusMessage = 2; ImageType imageType = 3; bytes imageChunk = 4; } message PersonMessage { int32 personId = 1; }
Die Chunks werden im Feld ‘imageChunk’ übermittelt. Wenn bei der Servicedefinition vor der Message ‘stream’ steht, erfolgt die Übertragung als Stream.
Bilder herunterladen
Beim Herunterladen ist der Rückgabewert ein Stream. Der Client kann dem Server eine Message mitgeben z.B. mit Filterangaben. Im Beispiel wird die ID der Person übermittelt, von welcher der Client das Bild herunterladen möchte.
Hier ist der Client Code für die Methode DownloadPersonImage, welche den Stream rekonstruiert und wenn das Bild komplett ist, das Bild als Datei auf der Festplatte speichert:
private async Task DownloadPersonImage(int personId, string fileName, CancellationToken cancellationToken) { bool success = true; try { PersonMessage personMessage = new PersonMessage {PersonId = personId}; using var streamingCall = Client.DownloadPersonImage(personMessage); await using (Stream fs = File.OpenWrite(fileName)) { await foreach (PersonImageMessage personImageMsg in streamingCall.ResponseStream.ReadAllAsync(cancellationToken).ConfigureAwait(false)) { fs.Write(personImageMsg.ImageChunk.ToByteArray()); } } } // Is thrown on cancellation -> ignore... catch (OperationCanceledException) { success = false; } catch (Exception e) { _logger.LogError(e, "Exception thrown"); success = false; } if (!success) { File.Delete(fileName); } }
Man kann auch die Methode MoveNext() vom ResponseStream aufrufen. Diese gibt ‘true’ zurück, wenn Informationen empfangen wurden, welche dann aus ResponseStream.Current gelesen werden können:
await using (Stream fs = File.OpenWrite(ImageFileName)) { while(await requestStream.MoveNext(cancellationToken)) { fs.Write(requestStream.Current.ImageChunk.ToByteArray()); } }
Weil es nichts ausmacht, auf welchem Thread der Code nach den await Aufrufen läuft, wird immer ConfigureAwait(false) angehängt. Damit wird der Overhead zum Context speichern und wiederherstellen gespart. Nur der äusserste Aufruf soll ConfigureAwait(true) haben, um nach dem await auf dem UI Thread zu bleiben. Auch darauf kann man verzichten, wenn der Code in einer Console Anwendung läuft, weil diese keinen Synchronisationskontext hat.
Siehe dazu die Blogserie ‚C# Concurrency’, die hier beginnt.
Im Server sieht der Code wie folgt aus:
public override async Task DownloadPersonImage(PersonMessage request, IServerStreamWriter<PersonImageMessage> responseStream, ServerCallContext context) { // Example of exception if (File.Exists(ImageFileName) == false) { _logger.LogError($"File '{ImageFileName}' not found."); Metadata metadata = new Metadata() {{"Filename", ImageFileName}}; throw new RpcException(new Status(StatusCode.NotFound, "Image file not found."), metadata, "More details for the exception..."); } PersonImageMessage personImageMessage = new PersonImageMessage(); personImageMessage.TransferStatusMessage = new TransferStatusMessage(); personImageMessage.TransferStatusMessage.Status = TransferStatus.Success; personImageMessage.PersonId = request.PersonId; personImageMessage.ImageType = ImageType.Jpg; byte[] image; try { image = File.ReadAllBytes(ImageFileName); } catch (Exception) { _logger.LogError($"Exception while reading image file '{ImageFileName}'."); throw new RpcException(Status.DefaultCancelled, "Exception while reading image file."); } int imageOffset = 0; byte[] imageChunk = new byte[ImageChunkSize]; while (imageOffset < image.Length) { int length = Math.Min(ImageChunkSize, image.Length - imageOffset); Buffer.BlockCopy(image, imageOffset, imageChunk, 0, length); imageOffset += length; ByteString byteString = ByteString.CopyFrom(imageChunk); personImageMessage.ImageChunk = byteString; await responseStream.WriteAsync(personImageMessage).ConfigureAwait(false); } }
Als erstes wird die Antwort ‘PersonImageMessage’ vorbereitet. Das Feld ‘ImageType’ ist nur für die Show da. Als nächstes wird das Bild von der Festplatte als Bytearray geladen und in ‘ImageChunkSize’ Stücke aufgeteilt. Der Bytearray wird mit einer Google Funktion in einen ByteString verwandelt und in den Stream geschrieben.
Es fällt auf, dass im ResponseStream kein CompleteAsync () vorhanden ist, um anzugeben, dass der Download abgeschlossen ist. Der RequestStream hat ein CompleteAsync().
Bilder hochladen
Beim Hochladen sind die Rollen umgekehrt.
Hier der Client-Code:
private async Task UploadPersonImage(int personId, string fileName, CancellationToken cancellationToken) { var stream = Client.UploadPersonImage(); PersonImageMessage personImageMessage = new PersonImageMessage(); personImageMessage.PersonId = personId; personImageMessage.ImageType = ImageType.Jpg; byte[] image = File.ReadAllBytes(fileName); int imageOffset = 0; byte[] imageChunk = new byte[imageChunkSize]; while (imageOffset < image.Length && !cancellationToken.IsCancellationRequested) { int length = Math.Min(imageChunkSize, image.Length - imageOffset); Buffer.BlockCopy(image, imageOffset, imageChunk, 0, length); imageOffset += length; ByteString byteString = ByteString.CopyFrom(imageChunk); personImageMessage.ImageChunk = byteString; await stream.RequestStream.WriteAsync(personImageMessage).ConfigureAwait(false); } await stream.RequestStream.CompleteAsync().ConfigureAwait(false); if (!cancellationToken.IsCancellationRequested) { var uploadPersonImageResult = await stream.ResponseAsync.ConfigureAwait(false); // Process answer... } }
Nach dem Hochladen von allen Chunks wird explizit CompletAsync() aufgerufen, um dem Server mitzuteilen, dass das Streaming beendet ist. Mit ResponseAsync kann die Server-Antwort abgeholt werden.
Server-Code:
public override async Task<TransferStatusMessage> UploadPersonImage( IAsyncStreamReader<PersonImageMessage> requestStream, ServerCallContext context) { TransferStatusMessage transferStatusMessage = new TransferStatusMessage(); transferStatusMessage.Status = TransferStatus.Success; try { await Task.Run( async () => { CancellationToken cancellationToken = context.CancellationToken; await using (Stream fs = File.OpenWrite(ImageFileName)) { await foreach (PersonImageMessage personImageMessage in requestStream.ReadAllAsync(cancellationToken).ConfigureAwait(false)) { fs.Write(personImageMessage.ImageChunk.ToByteArray()); } } }).ConfigureAwait(false); } // Is thrown on cancellation -> ignore... catch (OperationCanceledException) { transferStatusMessage.Status = TransferStatus.Cancelled; } catch (RpcException rpcEx) { if (rpcEx.StatusCode == StatusCode.Cancelled) { transferStatusMessage.Status = TransferStatus.Cancelled; } else { _logger.LogError($"Exception while processing image file '{ImageFileName}'. Exception: '{requestStream}'"); transferStatusMessage.Status = TransferStatus.Failure; } } // Delete incomplete file if (transferStatusMessage.Status != TransferStatus.Success) { File.Delete(ImageFileName); } return transferStatusMessage; }
Das Sammeln aller versendeten Chunks wird parallel auf einem separaten Task gemacht. Der aufrufende Thread wartet ‘await’, bis alle Daten gesammelt und verarbeitet sind. Dann wird die Rückgabemessage zusammengestellt und versendet.
Was wenn der client den Upload abbrechen möchte? Im Beispiel wird die ‚while‘ Schleife beendet und ‚CompleteAsync()‘ gesendet. Der Server sieht so aber nicht den Unterschied zwischen ‚Fertig‘ oder ‚Abbruch‘. Als Work-around könnte man das Feld TransferMessageStatus in PersonImageMessage verwenden um den Server mitzuteilen, dass entweder den Transfer abgebrochen wurde oder das Bild komplett übermittelt wurde. Besser wäre aber den eingebauten Mechanismus zum Abbrechen zu verwenden. Die Frage liegt im Moment bei StackOverflow…
Fazit
Dieser Blog zeigt, wie Streams zum Up- und Download grosser Dateien verwendet werden, in dem sie zerhackt und in einzelnen Paketen versendet werden. Im nächsten Blog wird die Duplex Funktionalität von WCF mit einem Server-Stream nachgebildet, in dem der Server asynchron den Client benachrichtigen kann. Das Beispiel ist gedacht für einfache und robuste Peer-to-Peer Anwendungen in der Praxis. Wenn eine industrielle Anlage eingeschaltet ist, kann es gut sein, dass der Client bereit ist, bevor der Server hochfahren konnte. Also greift der Client beim Verbinden ins Leere. Auch wird das Problem Verbindungsunterbrechung gelöst. Der offene Serverstream bricht dann ab und muss bei einer neuen Verbindung wieder neu geöffnet werden.