Lightweight Reactive Programming
Motivation for reactive programming
Some applications (not only in embedded computing) are dealing with the visualization of states of an underlying process or hardware. Other applications may have a lot of user interface (UI) components that are depending on each other.
In both cases the UI content is updated (directly or indirectly) based on asynchronous external events (e.g. user input or hardware states). It may seem trivial at the beginning to show some live updated UI states but it becomes much more complex if there are a lot of dependencies:
- The user can switch between different views at any time.
- UI buttons are enabled/disabled depending on other states.
- A different set of UI elements is shown depending on a mode.
- There may be multiple or alternative UIs (e.g. remote access) able to change the states and any changes made should be reflected on all UIs.
The application easily becomes a monolithic piece of software that is difficult to maintain. A particular piece of code probably knows too much about which UI components are affected by state changes and it must update them actively. So decoupling measures are required.
One such measure is the well-known observer pattern. It helps transforming a monolithic system into one composed of loosely coupled components. But the pattern is very clumsy for using it repetitively and all over the place (think of add/remove observers, notify, update).
So we have to raise the level of abstraction even further and use the reactive programming style instead: The application is regarded as a collection of objects with varying states and automatic propagation of state changes. Ideally the core logic (relationship between the object states) is only expressed in terms of rules that are valid over the whole program life time (so-called “invariants”) instead of dealing with low-level issues such as observer implementations, processing individual events and the initialization of object states. Rules could be defined with arbitrary lambda expressions instead of predefined functions for more flexibility.
Limitations in Android / Using libraries
The motivation for this blog article comes from problems faced during the implementation of reactive parts in an Android application. At that time (API level 17, Android 4.2) there has been almost no support for reactive programming in Android (beside the non-generic inheritance-based java.util.Observable/Observer and all those legacy lambda-incompatible “listener” APIs)
Reactive programming in the context of UIs is often also known as “data binding“. There are some existing libraries for Android in this area:
But for several reasons they didn’t match our needs: It should facilitate reactive programming in an existing code base without requiring the integration of a complete data binding tool set (e.g. many XML declarations and new data classes). The logic should be highly customizable e.g. by using lambda expressions instead of predefined functions. It also shouldn’t be an unmaintained “private” project (higher risk for rework?).
So we have written our own custom library that provides a minimalistic skeleton for wrapping of data sources with a generic interface, expressing rules and binding to any UI components. Customizing is done via lambda expressions (single method Java interfaces). Although the Android tool chain still doesn’t support lambda expressions directly a code folding feature in the Android Studio can show the affected code parts in a compact lambda-like manner.
NOTE: If you start a new project you may also want to consider alternative libraries for reactive programming. See the corresponding section below. |
Code example
The following example application demonstrates how reactive programming with the library could look like. It has two input fields for the name and the date of birth. Then there’s a text field showing a message depending on both inputs:
Here’s the most important part of the application source code:
public class MainActivity extends AppCompatActivity { private Binding binding; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); // ------------------------------------------------------------------------------ // get views // ------------------------------------------------------------------------------ EditText nameEditText = (EditText)findViewById(R.id.nameEditText); DatePicker birthdayDatePicker = (DatePicker)findViewById(R.id.birthdayDatePicker); final TextView messageTextView = (TextView)findViewById(R.id.messageTextView); // ------------------------------------------------------------------------------ // (1) basic observables // ------------------------------------------------------------------------------ ObservableData<CharSequence> nameObservable = asObservable(nameEditText); ObservableData<Calendar> birthdayObservable = asObservable(birthdayDatePicker, 2000, 1, 1); ObservableData<Calendar> currentDateObservable = createCurrentDateObservable(); // ------------------------------------------------------------------------------ // (2) derived observables // ------------------------------------------------------------------------------ ObservableData<Integer> ageObservable = DataFactory.map2(birthdayObservable, currentDateObservable, new Mapping2<Calendar, Calendar, Integer>() { @Override public Integer map(Calendar birthday, Calendar currentDate) { return calculateAgeInYears(birthday, currentDate); } }); ObservableData<String> messageObservable = DataFactory.map2(nameObservable, ageObservable, new Mapping2<CharSequence, Integer, String>() { @Override public String map(CharSequence name, Integer age) { return name.length() > 0 ? "Hello " + name + ". You are " + age + " year(s) old." : ""; } }); // ------------------------------------------------------------------------------ // (3) bind result to view // ------------------------------------------------------------------------------ binding = DataBinder.bind(messageObservable, new Updater<String>() { @Override public void update(String value) { messageTextView.setText(value); } }); } @Override protected void onDestroy() { binding.close(); super.onDestroy(); } // ... }
Hint: Using the code folding feature in Android Studio (“Closures” option) the code is even shown more compact: It collapses anonymous single method interface implementations to a lambda-like form.
The full source code can found be here: https://gitlab.com/noengblog/reactive-example-app |
Layering
As you can see the code above is divided into calls on three abstraction layers.
Layer | Purpose |
(3) Data binding | Binding of data sources to any target objects (data sinks):
|
(2) Derived observables |
Deriving new data sources from other data sources: New data sources are derived by transforming the current values of one or more other data sources. The value of the new data source is automatically updated if any data source is updated that it depends on. |
(1) Basic observables |
Representation of data sources: Existing data sources are wrapped with a generic interface. The |
Java interfaces
The following code snippet shows the most important Java interfaces in the library:
public interface ObservableData<T> { public T getValue(); public Binding observe(Observer observer); } public interface Observer { public void update(); } public interface Binding { public void close(); }
Adding/removing observers
The mechanism for adding and removing observers is provided with the single method observe
. It returns a helper object of type Binding
that contains the context to remove the observer later.
This approach simplifies the management the observer subscriptions because you don’t need to save a reference to the data source AND the observer at the same time and you don’t have to correctly match them together in the clean up code. Furthermore all subscriptions can be saved in a list collecting the Binding
objects.
Why isn’t the data provided when notifying the observer?
The observables are only notified of a (possible) change but the actual value isn’t provided in the callback. Why?
At first it seems easier if the callback on the observer would also receive the value as argument. But there are some reasons not to do it this way:
- Possible duplication of code in the implementation of
getValue
and the notification of the observers. - Possible race conditions if asynchronous notifications of the observers and synchronous calls of
getValue
(e.g. during initialization or later out of band calls) happen at the same time and update the same target. The order of thegetValue
result and the asynchronous value update could be wrong so that an newer value is overwritten by an older value at the target. This is especially true for multithreaded environments if delegation to other threads (queuing of runnables) is involved.
Implementation of a basic data source
The following code snippet shows the wrapping of an EditField
view so that it serves as a data source:
private ObservableData<CharSequence> asObservable(final EditText editText) { final AbstractObservableData<CharSequence> observableData = new AbstractObservableData<CharSequence>() { @Override public CharSequence getValue() { return editText.getText(); } }; editText.addTextChangedListener(new TextWatcher() { @Override public void beforeTextChanged(CharSequence s, int start, int count, int after) { } @Override public void onTextChanged(CharSequence s, int start, int before, int count) { } @Override public void afterTextChanged(Editable s) { observableData.notifyObservers(); } }); return observableData; }
The implementation uses an anonymous class derived from AbstractObservableData<T>
(we will see its implementation later) providing a concrete implementation of getValue
. Then a listener is added to the EditField that calls the helper method AbstractObservableData<T>.notifyObservers
.
Data binding
The data binding is build on top of the observables. It interconnects the data source with a particular target. Therefore it has the following responsibilities:
- Read the value from the source
- Initialize the target
- Update the target on changes
The DataBinder.bind
method provides a basic data binding for general purposes (not only for Views). It requires an Updater
object that actually implements the access to the target.
Surprisingly the method doesn’t require a reference to the target. It turned out to be easier to save the reference to the target in the provided Updater
instead (e.g. via surrounding context of an anonymous inner class). Data binding methods generally return an Binding
object that is used to remove the binding.
Library implementation
Here are some code snippets showing the most important implementation parts of the library:
/** * Basic read-only implementation of ObservableData already including the observer management (but without getValue). It provides * a method to notify observers explicitly allowing bridging to other asynchronous mechanisms. * * @param <T> */ public abstract class AbstractObservableData<T> implements ObservableData<T> { private ObservableImpl observable = new ObservableImpl(); @Override public Binding observe(Observer observer) { return observable.observe(observer); } public void notifyObservers() { observable.notifyObservers(); } }
/** * Helper class providing the observer management to implement the observable part of the <code>ObservableData<T></code> * interface. It is thread safe. */ public class ObservableImpl { // We use CopyOnWriteArrayList instead of ArrayList to allow adding/removing observers (by the same thread) while iterating // through the observer list. private CopyOnWriteArrayList<Observer> observers = new CopyOnWriteArrayList<Observer>(); public Binding observe(final Observer observer) { synchronized (observers) { observers.add(observer); } return new Binding() { @Override public void close() { synchronized (observers) { observers.remove(observer); } } }; } public void notifyObservers() { synchronized (observers) { for (Observer observer : observers) { observer.update(); } } } }
public class DataFactory { public static interface Mapping<T, TResult> { public TResult map(T value); } public static interface Mapping2<T1, T2, TResult> { public TResult map(T1 value1, T2 value2); } public static <T, TResult> ObservableData<TResult> map(final ObservableData<? extends T> data, final Mapping<? super T, ? extends TResult> mapping) { return new ObservableData<TResult>() { @Override public TResult getValue() { return mapping.map(data.getValue()); } @Override public Binding observe(Observer observer) { return data.observe(observer); } }; } public static <T1, T2, TResult> ObservableData<TResult> map2(final ObservableData<? extends T1> data1, final ObservableData<? extends T2> data2, final Mapping2<? super T1, ? super T2, TResult> mapping) { return new ObservableData<TResult>() { @Override public TResult getValue() { return mapping.map(data1.getValue(), data2.getValue()); } @Override public Binding observe(Observer observer) { return observeMany(new ObservableData<?>[] { data1, data2 }, observer); } }; } private static Binding observeMany(ObservableData<?>[] observables, final Observer observer) { Observer combinedObserver = new Observer() { @Override public void update() { observer.update(); } }; BindingList bindings = new BindingList(); for (ObservableData<?> observable : observables) { bindings.add(observable.observe(combinedObserver)); } return bindings; } }
public class DataBinder { /** * Interface for updating a value on an object (although the reference on it has to be provided by the implementor of the * interface). It is guaranteed to be called at the beginning for initialization and then on every update. Updates with the same * value as before aren't filtered. * * @param <T> */ public static interface Updater<T> { public void update(T value); } public static <T> Binding bind(final ObservableData<? extends T> data, final Updater<? super T> updater) { Binding binding = data.observe(new Observer() { @Override public void update() { updater.update(data.getValue()); } }); // NOTE: the Updater is called at the beginning to do initialization // // We do this after adding the observer to ensure we don't miss any update. On the other hand there's a small chance that // the Updater is called twice with the same value because we do initialization after already an update has happen. updater.update(data.getValue()); return binding; } }
The complete library also contains other useful helper methods:
DataFactory.map
: Other variations of the method that accept more than two input data sources.DataFactory.cached
: Wrap a data source so that the return value ofgetValue
is cached for subsequent calls (until the original value is updated next time).DataFactory.notifiedOnMainThread
: Wrap a data source so that observer calls are delegated to the main thread.DataBinder.bind
: Other variations of the method that accept more than one data source (for convenience only: reduce the need for another derived data source in simple cases).
Optimizations
The classes are designed to be lightweight. They only contain minimal code for thread safety or performance optimizations. Such measures have to be added manually where required.
In particular the following measures are NOT implemented:
- Thread safety for callers and observers
- Reentrancy countermeasures (posting to handlers)
- Caching of values
- Filtering of value updates without value changes
- Throttling if too many updates happen in a short time (e.g. by skipping intermediate updates)
Background information
Semantics: Data as state (and not as event)
The data is regarded as a state. So we are focusing on the latest value. The order of multiple value updates in a short time is irrelevant as long as we are up-to-date at the end.
Asynchronous updates aren’t regarded as ordered “(event) messages”. They are just a means to propagate state changes.
This assumption allows us to make some tradeoffs:
- Multiple asynchronous update callbacks with the same value may happen (e.g. due to a special behavior of the original data source or due to accepted race conditions). They should not trigger any side effects.
- On the other hand the library may later decide to suppress intermediate value changes (if many updates happen in a short time) or suppress updates with the same value for optimization or throttling reasons.
- One should not rely on any “data refreshs” (set a data source to the same value again) to trigger any desired side effects.
- One should not rely on any particular sequence of value changes to trigger side effects.
As a consequence we can simplify the design:
- During the initialization of the data binding no sophisticated locking scheme is applied. If an asynchronous update happens happens around the same time of reading the value for initialization there is a small chance for a race condition: The target could be double updated with the same value.
Data type to be used: Nullable and following value semantics
Due to the use of generics (ObservableData<T>
, Updater<T>
, etc.) the data types have to be reference types. Primitive types have to be boxed (e.g. Integer
instead of int
).
As a consequence all data types are nullable. It is legal to use a value of null
and give it a special meaning (e.g. no alert). Note that we are talking of a null
value inside a data source and not about setting the data source reference to null
.
In general the libary doesn’t require that data types have a correct implementation of equals
. But this may become important later if optimizations for skipping multiple updates with the same value are done.
Caution: Don’t use mutable data types to avoid unwanted side effects if a data source is accessed from multiple places.
Where is the setValue
method?
Data binding is often used in contexts of data/form editing. Consequently it is rather based on a data-centric model (with objects being read and written on the same abstraction level and with only limited transformation of data to/from the UI).
But this library has been designed for use cases of visualizing states of running processes and hardware and for use cases of complex dependencies between UI elements. Manipulations on the model only happen sporadically at particular places with predefined commands or user interactions. So the data sources don’t have to be writable in general. Specific ways for manipulations can be provided by the concrete object implementations (e.g. using a clearText
method as higher abstraction level interface instead of a generic setValue("")
call).
Nevertheless the library provides some interfaces for writable data (e.g. ObservableWritableData
) but without support by the rest of the library. Anyway data sources should only be writable where it is really needed and not by default. If deriving of writable data sources will be implemented later then it should be done with care:
- A derived data source should not cache the argument of
setValue
for a latergetValue
call. It should just simply delegate the calls to the original data source. - No assumption shall be made in a derived data source that the
setValue
call on the original data source actually sets a given value or even that an update happens at all.
Rationale:
The view should show the “actual state” and not an assumed state: Imagine that the setValue
call is somehow rejected by the underlying data source or not correctly applied due to bugs. One should be confident that getValue
really returns the truth. This approach even transparently visualizes any delays that happen during the update.
Thread safety and reentrancy
The library is designed to be lightweight. There’s no thread safety out of the box (at least not for the user of the library). So the thread safety depends on the concrete implementation of the data sources and observers.
If you have to break a circular chain of synchronous callbacks or if reentrancy is an issue (during a call from class A to class B there’s a callback to class A, in the same thread) then you have to implement countermeasures yourself (e.g. posting to a handler).
Be careful:
- The observers on a data source (
ObservableData
implementation) may be notified from another thread (depending on the specification of the data source). This is also true for theUpdater
callback in the data binding. - If other observers are removed within the
Observer.update
callback they may still be called until the current update cycle (iterating through the observer list) is finished.
On the other hand the library gives you some weak guarantees:
- It is safe to call
getValue
from the context of theObserver
/Updater
callback. - You can add/remove an observer or binding at any time (without caring about possible asynchronous updates from other threads happening at the same time): The internal list of observers is thread-safe.
- You can add/remove an observer even from the context of an
Observer.update
callback on the same data source: It is allowed to modify the observer list during iteration (e.g. because of an internal use ofCopyOnWriteArrayList
). - During an update cycle (iterating through the observer list) the observers are notified sequentially (no parallelization over multiple threads).
- The helper functions for wrapping/deriving data sources (e.g.
map
) provided by theDataFactory
are thread-safe in itself. This means they inherit the thread safety characteristics from the underlying data and they don’t degrade them.
Summary:
The library is rather a skeleton and contains almost no extra checks or measures. You still have to know what you are doing.
Alternative libraries
For a new project you may also consider existing libraries before deciding to write your own custom library.
Probably you should head for libraries that are suitable for general reactive programming (and not only specialized UI data binding tool sets). Here are some examples:
Sodium |
The library concentrates on the basic elements of functional reactive programming and is available for multiple languages (e.g. Java, C#, C++, Scala). Its Also see the related book “Functional Reactive Programming“. |
ReactiveX |
The library started with the name “Reactive Extensions” for .NET at Microsoft. It has gained momentum now and is available for many languages and platforms. It focuses on event streams and has plenty of functionality to build and manipulate them. But the support for state semantics (see the section “Data as state” above) is limited: e.g. class |
Conclusion
You don’t need a large library to start with reactive programming in a project. The core logic can be implemented within a few pages of code. But you still have to take a lot of things into consideration (e.g. thread safety) and understand the concepts.