The Raspberry Pi is a credit-card sized System-on-a-Chip computer, featuring the Broadcom BCM2835 ARM chip. It is not just very affordable (CHF 50/brack.ch), but also capable and playful enough to have sparked new and newfound interest for soft- and hardware development amongst students, enthusiasts and professionals. By February of 2014, 2 years after its initial market introduction, 2.5 million units had already been sold. The widespread recognition of devices like the Pi or the Arduino and the consequential innovative capacity are important drivers for helping the fuzzy term “Internet of Things” manifest itself.
I count myself amongst these enthusiasts, as it feels refreshingly different for a line-of-business application developer and strangely empowering in general to have your code move or monitor real-life objects instead of just changing the colors of pixels on the screen. And with this device, this was not only easy to do, but it hardly cost anything, wouldn’t take up much space and consumed little power.
Unfortunately, I didn’t consider how much of my non-existent spare time it was going to devour when I started this hobby project after Christmas 2013. Luckily, my efforts paid off, as the Raspberry Pi later became relevant in the context of a professional focus group at my workplace, serving as hands-on example for study and demonstration of machine-to-machine communication from a web architectural standpoint.
The initial motivation was to build a mobile robot for different usages throughout the home. The first thing needed was a simple machine-to-machine communication system, with the Raspberry Pi as a network-enabled hardware controller servicing lots of local or remote clients over a Wi-Fi connection (receive device manipulation commands and send device data updates).
Just having joined the business unit “Microsoft” shortly before I got one of these beauties, I decided to implement it in C# to grind my skills. As a consequence, and because of the nature of such applications, I encountered the .NET Reactive Extensions API. It caught my attention and is definitely worth taking a closer look.
But let’s go back to the beginning and see how the need for such a library came about. Part 1 is for the readers out there with basic C#/Linux knowledge wanting to “program along”, Part 2 contains architectural considerations, and Part 3 goes into details and usages of the Rx library:
Part 1 – Getting started
Runtime Environment
Windows 7+ will not run on the ARM11 processor. The recommended OS for the Raspberry is a Linux derivate (Debian based) called Raspbian. An SD memory card serves as a boot device onto which the image is burnt. Plugging our Raspberry into the network and a recommended 5V@2 amperes of micro-USB power, we can connect to it using SSH after about 45 seconds:
$ ssh pi@192.168.0.113 pi@192.168.0.113's password: raspberry (this is the default)
(I’ll give you some time to change the password to a shorter one or much better, to copy your public ssh key to the pi)
pi@raspberrypi ~ $ uname -a Linux raspberrypi 3.10.25+ #622 PREEMPT Fri Jan 3 18:41:00 GMT 2014 armv6l GNU/Linux
At the time of writing, Mono (an open source implementation of Microsoft’s .NET Framework) was available in version 3.2.8 in the maintained official repositories. This allows us to use Mono 4.5 which is roughly equivalent to .Net 4.5. Since the rise of Xamarin, a modern C# cross development platform, which is a sponsor of the Mono project, it has already gotten and can be expected to keep getting a lot of attention.
pi@raspberrypi ~ $ sudo apt-get install mono-csharp-shell pi@raspberrypi ~ $ mono --version Mono JIT compiler version 3.2.8 (Debian 3.2.8+dfsg-4+rpi1) pi@raspberrypi ~ $ csharp
Development Environment
Now, the Raspberry Pi is not a very fast machine. To enjoy the experience, I suggest developing locally on your favorite machine using the same Mono compiler version as the target machine. I mainly used MonoDevelop on Linux, which corresponds to Xamarin Studio on Windows. They both are good IDEs that provide many of the functions you would expect.
$ sudo apt-get install monodevelop mono-complete $ monodevelop
The most essential – and so far, only – add-in needed, is the NuGet Package Manager. The latest version of Xamarin Studio finally includes this package manager. (Installation instructions for MonoDevelop)
Anytime you want to deploy, copy the changed assemblies to the target machine (or execute it as a post-build step):
$ rsync -avz --exclude '*.mdb' Server/bin/Debug/ pi@192.168.0.113:/home/pi/RxPi
Controlling GPIO Pins – Wiring the Pi
To control the general purpose input/output (GPIO) hardware pins on the controller, we just use an existing library like the one called WiringPi that is written in C and directly manipulates the correct memory registers of the BCM2835 for us. WiringPi has an active author, and the library can easily be used from C# with a native P/Invoke wrapper class. This is the code for a blinking light, the “Hello World” of hardware control:
Init.WiringPiSetup(); GPIO.pinMode(pin: 0, mode: (int) GPIO.GPIOpinmode.Output ); while(true){ GPIO.digitalWrite(pin: 0, value: 1); Thread.Sleep(500); GPIO.digitalWrite(pin: 0, value: 0); Thread.Sleep(500); }
The WiringPi library requires the existence of the BCM2835 chip. By introducing an interface, we can create mock implementations to inject when running locally or when testing:
Since one of the goals is to remote control hardware (or write programs that run on other machines that do so), these interfaces and (server-side) implementations for some possible components will come in handy:
This code looks more readable and runs locally:
IPiController ctrl = PiControl.Instance (); var led = new Led (ctrl, pin: 0); led.SetOn (); Thread.Sleep (1000); led.SetOff ();
Part 2 – Design Considerations
Since this was a private project, it had two important constraints: it had to progress quickly and it was supposed to be joyful. Many decisions were based on these principles, if an API or framework did not want to play along with those rules, it was swiftly ditched.
The system is made up of two components, the part facing the hardware and the one facing the network.
The machine
The most basic of embedded software architectures is called a Simple Control Loop. The program just loops endlessly and gives each device a chance to change some pins, do simple calculations, or raise an event. A push-button is an example for a device that raises an event. A running stepper motor on the other hand might change its pins to the next set of values in the stepper-sequence.
var cts = new CancellationTokenSource(); Task.Factory.StartNew(() => { while (!cts.IsCancellationRequested) foreach (var device in controller.Devices) device.Step(); }, cts.Token); Console.ReadLine(); cts.Cancel();
This worked and was a good enough abstraction of having a running device with integrated circuitry. This code obviously has room for improvement and we will see, how we can apply Reactive Extensions to this. It does run on a single thread and can blink one LED at a frequency of about 190 kHz. Thanks to using WiringPi, even though being miles away from pure C performance, it is not all that bad in comparison.
I definitely did not want to introduce multi-threading at this point, where each “active” device could run on its own thread, because multi-threading is not something I considered to be simple or joyful (this was about to change).
The communication
But introducing other threads is exactly what I had to do, in order to interact with this hardware loop. For servicing clients, there are two basic models one can choose from:
Threaded vs Evented Services
Threaded services use multiple threads to fulfill client requests, whereas evented services use a single event loop thread to execute all client requests. This excellent article contains important background information and an in-depth comparison of the two, as it looks at scalability aspects of the two models for various application needs.
Our system surely could profit from an evented service, as it deals with lots of little events, some CPU-bound, some I/O bound:
- Incoming connections
- Requests to control the hardware (Turn LED on, stop motor)
- Subscriptions for hardware changes (Please notify me, when motor stops)
- Hardware changes (A button was pressed, a temperature changed)
All of these events occur at an unpredictable time and rate, and have little to no payload attached to them.
Rolling my own
My first approach was to service requests via HTTP web service. Later, I added web sockets to send real-time data to subscribed clients. These are the frameworks I tried or considered:
- WebApi WebApi 2.1 was not supported under Mono 2 (it is now), so I tried the next Microsoft API I could think of, which was
- NET MVC this was possible using Mono’s xsp hosting server, but my deployment cycle became agonizingly slow and it all felt outdated anyway.
- NancyFx was a good find and I ended up using it shortly, as it was the most lightweight, easy-to-use, offered self-hosting and even asynchronous routing in Mono 2.
- SuperWebSockets At one point, I hooked up an infrared receiver to one of the pins. I wanted to see, how fast I could draw the signal steps on a canvas in a browser. This library was easy to use and it gave me the insight that betting on web sockets plus publish-subscribe and some buffering is clearly a justified approach.
- XSockets.NET I have not tried this one, but it looks promising and supposed to be tailored to M2M.
- ServiceStack Definitely worth looking at in-depth, it has raving reviews, this “One framework to power them all”
To reiterate, web sockets are a viable solutions and some implementations are fit enough for production use. I completely agree with my colleague about the need of lightweight messaging protocols and they are noticeably arising. For MQTT, there is GnatMQ as a .NET broker implementation, and several clients for C# waiting to be given a try.
At this point though, I was tempted to write my own protocol, based on TCP, for the following reasons:
- Using the frameworks slowed down my deployment: more dependencies to manage, services to restart.
- I had to know the details of the frameworks threading model and it wasn’t always just a given to make sure only one instance of the controller existed.
- The need for asynchronous routing arose, as I couldn’t afford blocking calls anywhere or the system would get sluggish.
- It wasn’t simple and lightweight and HTTP with its many long headers (compared to the payload) seemed like such an overhead
- I was wondering, how hard it would be to write my own implementation.
Writing your own network listener is something I would recommend doing only for educational purposes, as you might just end up having to do your own thread and memory management, provide security and a lot of additional features and tools you didn’t think you we’re going to need.
Listen
Nevertheless, I started writing a TCP listener. It had to be evented, not threaded, so instead of spawning a new thread per connection, I had to program asynchronously with callbacks or continuations. So I went to read up on Microsoft’s Asynchronous Programming Patterns and even though I was somewhat familiar with the Task-based async pattern, all of them felt either outdated, clumsy to use or weren’t fully available on Mono 2 (async/await). The code became unwieldy and I doubted its correctness.
I had to do more research, and that’s when I learnt about the .NET Reactive Extensions API (Rx).
Part 3 – .NET Reactive Extensions library
The Reactive Extensions (Rx) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators. Using Rx, developers represent asynchronous data streams with Observables, query asynchronous data streams using LINQ operators, and parameterize the concurrency in the asynchronous data streams using Schedulers. Simply put, Rx = Observables + LINQ + Schedulers.
That sounded exactly like what I needed! After inept initial attempts and misleading detours to stackoverflow.com posts from the year 2010, I almost abandoned it for being too complex. It wasn’t until I found the Extensions to Reactive Extensions, or short Rxx, that had already done the tidious task of encapsulating the APM into Observables. Thus, I could write the listener as sweetly as I had envisioned:
var listener = new TcpListener(new IPEndPoint(IPAddress.Any, ListenPort)); var messageStream = listener.StartSocketObservable(MaxConcurrent) .Finally(listener.Stop) .SelectMany(sock => CreateServerConnection(sock).IncomingMessageStream()); messageStream.Subscribe(x => Console.WriteLine(“Got msg {0} from client {1}”, x.Text, x.Connection.Id));
Now it had my full attention and I wanted to know more. Here are two good resources to get started: “Intro to Rx” and “101 Rx Samples”. The following contains excerpts from the first one.
Observables, Observers and Subjects
The framework provides tools to deal with observing streaming sequences. It does this using the well-known observer pattern. Its two core interfaces are:
IObserver<T>
void OnNext(T value) void OnError(Exception ex) void OnCompleted()
IObservable<T>
IDisposable Subscribe(IObserver<T> observer)
These are quite self-explanatory, and would be easy to implement. But implementing the interfaces should hardly ever be necessary and is also not recommended practice. Instead, the library provides a rich set of factory methods to create Observables.
The library also contains the class Subject<T> which implements both interfaces above. It serves as good sample to demonstrate basic utilization. The following program outputs “a”, “b” and “c” to the console:
static void Main(string[] args) { var subject = new Subject<string>(); subject.Subscribe(s => Console.WriteLine(s)); //or Action subject.OnNext("a"); subject.OnNext("b"); subject.OnNext("c"); subject.OnCompleted(); Console.ReadKey(); }
Sequences can be completed by calling OnCompleted() or OnError(Exception). After that, no further operations should be executed on that sequence.
Factory Methods
As mentioned, the framework-provided factory methods should be used to create instances of observable sequences. The simple ones include Observable.Return, Observable.Empty, Observable.Never and Observable.Throw, which pretty much do what they say.
This demonstrates Observable.Throw and the most common form of exception handling:
Observable.Throw(new InvalidOperationException()). Subscribe(x => Console.WriteLine("Never reached"), ex => Console.WriteLine("Caught exception: {0}", ex.Message));
Observable.Create provides a way to instantiate more complex sequences:
private IObservable NonBlocking(){ return Observable.Create( (IObserver observer) => { observer.OnNext("a"); observer.OnNext("b"); observer.OnCompleted(); Thread.Sleep(1000); return Disposable.Create(()=>Console.WriteLine("Observer has unsubscribed")); //or can return an Action like: return () => Console.WriteLine() }); }
Observable.Generate uncovers more of the functional nature of reactive programming. It’s a convenient method to create (possibly) infinite observable sequences (unfolding). The simple version takes the following parameters:
- an initial state
- a predicate that defines when the sequence should terminate
- a function to apply to the current state to produce the next state
- a function to transform the state to the desired output
public static IObservable Generate<TState, TResult>( TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector)
This is useful for generating ranges of e.g. numbers, more complex than what Observable.Range(int,int) provides:
Observable.Range(0,9).Subscribe(Console.WriteLine);
Here is a quick attempt at the Fibonacci-Sequence:
public static IObservable Fibonacci(){ return Observable.Generate( new long [] {0,1}, x => x[0] + x[1] > 0, x => x[0] == 0 ? new long []{1,1} : new long[]{x[1], x[0]+x[1]}, x => x[1] ); } Fibonacci().Take (93).Subscribe (Console.WriteLine);
Unfortunately, this sequence does not go any higher than Fibonacci(91) before it terminates (overflows).
Observable.Generate can take a fourth parameter, a Func<TState,TimeSpan> to specify the due time for the next event. Which brings us to the topic of time-based sequences:
Instead of using Timers within an Observable to create time based sequences, the library suggests Observable.Interval(TimeSpan) and Observable.Timer(TimeSpan), which serve as common interfaces to timers, letting Rx choose the correct implementation (e.g. Threading.Timer, DispatcherTimer) for the current scheduler in use. We’ll talk a little more about schedulers towards the end.
Here this insight is applied to the Control Loop, as it can be rewritten to the following:
var t = Observable.Interval(TimeSpan.FromTicks(10000)); using (t.Subscribe(x => controller.StepDevices())) { Console.WriteLine("Press any key to unsubscribe"); Console.ReadKey(); }
LINQ
Rx provides a slew of LINQ-methods to operate on observable sequences. They can be filtered, aggregated, inspected, transformed, joined, buffered and time-shifted, to name a few.
Many of the LINQ-methods known from working with the IEnumerable interface are present in this library, but since IObservable-sequences are possibly infinite, some of these calls are blocking or result in new Observables that produce a value once the source sequence completes. The “Intro to RX” website contains a lot of good examples and explanation for the similar, yet quite different behaviors.
As an example of filtering, let us write out only odd items of the Fibonacci sequence:
Fibonacci().Where(x => x%2==1).Subscribe(x => Console.WriteLine("Odd Fib: {0}",x));
Aggregation is needed when the data is in raw form and we need to condense it or make some qualitative statements about it because of the data’s high rate.
This very simple example confirms the upper limit of our skimpy sequence:
Fibonacci().Count().Subscribe(Console.WriteLine); 91
Count is an example for a sequence that does not produce a value before its source sequence completes. The Aggregate function itself, which is the base for most other aggregates like Min, Max, Count, etc, does not provide a value until the source sequence completes. The Scan extension method however can be used to accumulate values, e.g. running totals:
fib.Scan ((long)0, (a, c) => a + c).Subscribe(x => Console.WriteLine("tot: {0}",x));
Partitioning can be useful to distribute data to different consumers, one example is the method GroupBy that is, again, similar to the one for IEnumerable, but not the same, as it returns an IObservable<IGroupedObservable<TKey,T>> instead of an IEnumerable<IGrouping<<TKey,T>> :
Fibonacci().GroupBy( x => x%2 ) .SelectMany( grp => grp.Count().Select( count => new {grp.Key,count} )) .Subscribe( _ => Console.WriteLine("{0} items in group {1}", _.count, _.Key)); 61 items in group 1 30 items in group 0
So about twice as many odd numbers in the Fibonacci sequence to 91.
Sequences can be sequentially concatenated with Concat. Or they can be repeated infinitely or a certain number of times, or retried on exception, which is a handy feature for things like network connection attempts.
Concurrent sequences can be combined using several strategies, like Merge, Zip or Switch. They can be time-shifted for delays, throttles or timeouts, gathered in windows of time spans or memory buffers, and they can be configured to run on different schedulers.
There is so much more one can do with observable sequences, it is hard to know, where to stop for this introduction. I highly recommend reading introtorx.com for all the details.
Schedulers
Again, introtorx.com contains the needed basics, but this quote captures the essence:
So, just as a recap: if you do not introduce any scheduling, your callbacks will be invoked on the same thread that the OnNext/OnError/OnCompleted-methods are invoked from.
Both, subscribing and receiving notifications can be configured to use its own Scheduler. The following Scheduler types are interesting (some Schedulers have become obsolete, use the corresponding SynchronizationContext classes):
Immediate: action is not scheduled, but executed immediately.
CurrentThread: action is performed on the thread that made the original call. This is different from Immediate, as CurrentThread will queue the action to be performed.
NewThread: schedules work on a new thread.
ThreadPool: schedules all actions to take place on the Thread Pool.
Schedulers are a large topic, as there can be pitfalls like lock-ups and challenges like supporting cancellation. Having Rx at hand, concurrency issues becomes much more manageable.
I do not intend on reiterating the entire Rx-documentation here, as I want to get back to the example at hand, our M2M system, so I’ll close this short introduction to schedulers with a practical example, where scheduling solved something otherwise much more involved in terms of lines of code. Filling an ObservableCollection in a ViewModel vm bound to a Wpf Grid (using a remote data context):
_context.Educations .Where(x => x.EmployeeId.Equals(user.Id)) .ToObservable() .SubscribeOn(NewThreadScheduler.Default) .ObserveOn(SynchronizationContext.Current) .Subscribe(vm.Educations.Add);
With lines Nr. 4 and 5 added, the otherwise locking GUI stays responsive while filling multiple grids.
Back to the machine
If reactive extensions are the hammer, every problem looks like a nail. They are indeed very handy.
The listener bundles all messages from all connections into a single message stream:
var listener = new TcpListener (new IPEndPoint (_serverIp, _listenPort)); var messageStream = lstnr.StartSocketObservable (MaxConcurrent) .Finally(listener.Stop) .SelectMany(socket => CreateServerConnection(socket) .IncomingMessageStream());
To which it subscribes:
_messagesSubscription = messageStream.Subscribe(msg => { var device = controller.GetDeviceById(msg.DeviceId); if (msg.Command.Equals("SUBSCRIBE")) { Console.WriteLine("Got subscription for {0}", msg.DeviceId); msg.Connection.AddSubscription(device.Events()); } else { Console.WriteLine("Got control message for {0}", msg.DeviceId); device.Execute(msg.Command); } });
For the Events() of a device, I just exposed a Subject as an Observable. This might not be the recommended approach, but it works and is easy to use:
private Subject<IDeviceEvent> _subject = new Subject<IDeviceEvent>(); public IObservable<IDeviceEvent> Events (){ return _subject.AsObservable (); }
This allows the individual devices to send events to subscribers:
protected void Event (string eventName, string data = null) { _subject.OnNext (new DeviceEvent (Id, eventName, data)); }
This is how subscriptions are added server-side after having been requested by clients as shown above:
public void AddSubscription (IObservable<IDeviceEvent> events) { var sub = events.Select (x => _tcpClient.SendLine (x.Text)) .Subscribe (_ => Console.WriteLine ("Sent event to client {0}", this.Id)); _subscriptions.Add (sub); }
(Note: Subscriptions need to be tracked and disposed of).
Running “active” devices can also be done with Observables. We can get rid of the original Step interface-method in IServerDevice and have every “active” device run with a time-based observable sequence that can be configured to match the devices needs. As an example, here is the code for the stepper motor:
private static readonly int[,] Seq = { { 2, 0 }, { 0, 1 }, { 3, 0 }, { 1, 1 }, { 0, 0 }, { 2, 1 }, { 1, 0 }, { 3, 1 } }; public void Start(){ _running = Observable.Generate( 0, x => true, x => x + (_reverse ? 1 : -1), x => Math.Abs(x) % Seq.GetLength(0), x => TimeSpan.FromTicks(10000)) //or variable motor speed .Subscribe(cur => Controller.DigitalWrite(Pins[Seq[cur, 0]], Seq[cur, 1]) ); Event("STARTED"); }
On the client, the Retry feature can be used to attempt a connection several times before failing:
public IObservable<Unit> Connect (){ return Observable.Defer (() => { _tcpClient = new TcpClient (); _connectionAttempts = _tcpClient.ConnectObservable (_hostIp, _hostPort); return _connectionAttempts; }).Retry (RetryAttempts, (ex, i) => { _tcpClient.Close (); var next = System.Math.Pow (i, 2); Console.WriteLine ("Failed to connect: {0}, retrying in {1} secs", ex.Message, next); return TimeSpan.FromSeconds (next); }).TakeLeft (); }
(Note: Defer will not start the sequence until it has subscribers. Retry splits the stream into two channels, the data channel and the exception channel. TakeLeft references the data-channel that we are interested in.)
Once connected, we convert the incoming byte stream to UTF-8 lines and turn everything into a message object stream to which we later subscribe:
_incomingMessages = _connectionAttempts.SelectMany (_ => Observable.Defer (() => _tcpClient.GetStream () .ReadToEndObservable () .BytesToLines ()) .Select(str => new ClientMessage(this,str)));
Finally, here’s a way to capture user input from the console and to consume it using Observables:
private static IEnumerable<string> GetConsoleInput(){ while (true){ Console.Write("{0}> ", DateTime.Now); yield return Console.ReadLine(); } } Console.WriteLine(“Enter ‘exit’ to quit”); var cons = GetConsoleInput().ToObservable().Select(client.SendLine); var sub = cons.Subscribe(); var quit = cons.Where(x => x.Equals(“exit”)) .Subscribe(x => sub.Dispose()); event.WaitOne();
Wrapping it up
Demo
Some of the server code has been shown above. The following are snippets from a GTK# client and its used methods to drive the caterpillar seen in the video below:
public MainWindow () : base (Gtk.WindowType.Toplevel){ Build (); var ipAddress = IPAddress.Parse ("192.168.0.114"); var client = new PiClient (ipAddress); client.Connect ().Subscribe (_ => { Console.WriteLine ("Connected!"); _motorLeft = new MotorClientDevice (client, "LEFT"); _motorRight = new MotorClientDevice (client, "RIGHT"); var wm = new Wiimote (); wm.WiimoteChanged += (object sender, WiimoteChangedEventArgs e) => { var speeds = SpeedsFromAccelState (e.WiimoteState.AccelState); Gtk.Application.Invoke (delegate { _motorLeft.SetSpeed ( speeds.Item1 ); _motorRight.SetSpeed ( speeds.Item2 ); }); }; wm.Connect (); wm.SetReportType (InputReport.IRAccel, true); }, ex => Console.WriteLine ("Connection failed: {0}", ex.Message)); } public class MotorClientDevice : ClientDevice, IMotor{ public void SetSpeed (int speed){ client.SendLine (string.Format ("{0}:SPEED:{1}", Id, speed )); } } public class TcpClientEx : TcpClient{ public IObservable<Unit> SendLine (string line){ var data = string.Format ("${0}\r\n", line); var bytes = Encoding.UTF8.GetBytes (data); return GetStream ().WriteObservable (bytes, 0, bytes.Length); } }
And here is what it looks like. The Wiimote is paired with my Windows laptop, which sends the commands to the Raspberry over WiFi (USB dongle). The Raspberry Pi is powered by a USB Powerbank (2.5Amps output) and uses an Arduino Motor Shield with an external 9V power supply to power the motors (not well visible from above).
Next Steps
The next steps would include mounting a camera and distance sensors, using more appropriate protocols both for web and embedded communication, write the mobile app for it and then get my cats accustomed to their new roommate. But for now I am just glad to have finally finished this blog post and will quit roboting for a while and instead invest more time exploring the pros and cons of using Rx in future projects.
Conclusion
The “Internet of Things” is all about handling data at different speeds, aggregating it to draw useful information out and providing efficient control over the interconnected systems. The .NET Reactive Extensions provide a powerful set of tools to elegantly define the processing logic of finite and infinite sequences or high bandwidth streams while simplifying the complicated, but essential concurrency details.
The functional programming style of Rx may seem odd at first. As pieces fall into place, it starts to feel very elegant. The resultant code is compact and legible.
The “Observer Pattern” or the “Publish-Subscribe Mechanism” have always been two important assets in a programmer’s tool belt. With the rise of the robot species, containing every imaginable network-connected hardware device, we can expect to soon deal with a majority of situations, where synchronous blocking calls or polling for data just won’t cut it anymore. We’ll face times of such ridiculously high event rates per second and from such diverse sources, that it is helpful to have performant tools to inspect, filter and partition these mountains of data to quickly get to the knowledge we envision.