Short introduction to Reactive Extensions

The Reactive Extensions (Rx) is a library for programming with asynchronous data streams. It was originally developed at Microsoft for the .NET Framework but is now available as a seperate library. Meanwhile it has been ported to many other languages and platforms as e.g. Java, Scala, C++, Clojure, JavaScript, Python, Groovy, JRuby, and others.

Rx can be used in desktop as well as web-based applications on either server or client side. A few common situations / problems where you might benefit from using Rx are:

  • Unresponsive UI
  • Long running computations that could potentially block other threads
  • Asynchronous programming where multiple sources of data and events have to be combined
  • Enabling and managing concurrency on backend independent of the consumers implementation
  • Handling errors and cancellation of asynchronous tasks

In this post I will introduce some general ideas behind Rx and show how it can help in some of the situations stated above. If you are new to Rx it will give you a basic understanding of what Rx is and where it can be useful to apply.

The examples are all presented in C#. But the concepts apply to all programming languages.

Definition

A definition of Rx could be:

Rx is a library for programming with asynchronous data streams. It is a combination of the observer and the iterator pattern as described by the Gang of Four combined with ideas and concepts from functional programming.

Querying Data

To get an idea when it is useful to use Rx let us look at ways how to retrieve data. Basically a query result can be either a single object or a collection of objects. The data can be queried either synchronous or asynchronous. This produces the following matrix:

SingleMultiple
Sync
Async

Sync Single

A function that returns a single object and is executed synchronously is probably one of the most common scenarios that we encounter. In the example the function is called GetData() and the return value is of the generic type T as shown in the following table.

SingleMultiple
SyncT GetData()
Async

Calling the method and processing the result might look something like this:

string s = GetData();

if(s.Equals(x))
    // do something
else
    // do something else

If this code was implemented behind a user interface the process will be blocked for however long it will take to retrieve the value because this is done synchronously.

Sync Multiple

When we examine the synchronous query of multiple values we will see the same behaviour except that the return value is a collection of objects of type T.

SingleMultiple
SyncT GetData()IEnumerable<T> GetData()
Async

The LINQ library provides a rich toolbox of query operators to create, transform, combine and filter collections. In the example however we want to iterate over all the values and do some processing with each of them.

IEnumerable<string> values = GetData();

foreach(string value in values)
{
    if(value.Equals(x))
        // do something
    else
        // do something else
}

Again this is done synchronously and the UI will be blocked.

Async Single

To make the UI more responsive values can be queried asynchronously. In case of a single value the function might return a Task<T>. A Task in C# can be seen as the equivalent of the Future in Java. It is the result of an asynchronous computation.

SingleMultiple
SyncT GetData()IEnumerable<T> GetData()
AsyncTask<T> GetData()

Processing the result of a Task can be done synchronously which again will block.

Task<T> task = GetData();

if (task.Result.Equals(x))
    // do something
else
    // do something else

So that is not really an improvement. A better workflow when creating a responsive UI would be:

  • Respond to some user action
  • Do work on a background thread
  • Pass the result back to the UI thread
  • Update the UI

To achieve this we should consider the async features in .NET 4.5 or apply a callback. A callback is a function that is passed as an argument and will be called at a later time e.g. with the result of a long running computation. In this case we can use the ContinueWith method of Task that takes a function that is executed asynchronously.

task.ContinueWith(t =>
{
    if (t.Result.Equals(x))
        // do something
    else
        // do something else
});

Async Multiple

Asynchronous computations that yield collections of values can also be handled with callbacks. Actually this is what is done in many cases. But we might quite easily run into several problems as the application becomes more and more complex.

E.g. there is no way to start iterating over the result and process the values before the whole computation completes. If loading the whole collection takes a long time it would be very convenient to start working with the first part of the list while the rest is still processing. But when using callbacks this is not possible.

And what if we want to process each element again asynchronously and then each result of that yet again… and so on? The consequence would be a deeply nested composition of callbacks which will become very unwieldy and won’t be easily maintainable any more.

With nested composition callbacks become unwieldy and lead to the need to synchronize.

This is where Rx comes into play with the main type IObservable<T> – the counter part of IEnumerable<T>.

SingleMultiple
SyncT GetData()IEnumerable<T> GetData()
AsyncTask<T> GetData()IObservable<T> GetData()

IObservable<T> represents an asynchronous data stream of objects of type T. Consuming the result is quite simple:

IObservable<T> o = GetData();

o.Subscribe(t =>
{
    if (t.Equals(x))
        // do something
    else
        // do something else
});

The Subscribe method has several overloads. In the example above it takes an action as a single argument which will be applied to each value from the Observable.

Composable functions

As well as LINQ Rx also includes a rich library of higher-order functions that allow us to transform, combine, filter etc. observable sequences. Those functions are easily composable. This is one of the really great benefits of Rx because it makes querying, combining and transforming asynchronous data streams as simple as applying LINQ queries.

Roughly the set of functions can be divided into the following categories:

Transform: Select, SelectMany, Aggregate …

Filter: Skip, Take, TakeWhile, Where …

Combine: Concat, Merge, Zip …

Boolean Operators: Any, All, Contains …

Mathematical Operators: Max, Min, Average, Count, Sum …

Concurrency: ObserveOn, SubscribeOn …

Error Handling: OnErrorReturn, OnErrorResume …

Here is a simple example of how some of these functions can be combined:

IObservable<int> observable = GetData();

observable
    .Skip(1)
    .Take(3)
    .Where(x => x < 10)
    .Select(x => x + 1)
    .Subscribe(Console.WriteLine);

IEnumerable vs. IObservable

To get an even better understanding of what an IObservable<T> is we will compare it to the IEnumerable<T>.

IEnumerable

The method IEnumerable.GetEnumerator() returns an object of type IEnumerator which iterates over collections and has the following methods:

  • bool MoveNext() – Advances the enumerator to the next element of the collection. Returns true if it succeeds and false if it fails (when the enumerator points to the last element of the collection or empty list).
  • T Current{ get; } – Returns the current element in the list.
  • throws Exception – This is actually not a method of the interface. But an exception might be thrown when calling Current so an exception is a potential implicit return value of Current. In other words the return value of Current could be described as Either<T, Exception> or Tuple<T, Exception> or similar types.
  • void Dispose() – Disposes of all resources used by the enumerator.

The general task of an enumerator is to pull data from a collection.

Below we can see the enumerator in action (Note that this is just for demonstration and is not good code. Usually we would use a foreach statement to do the iteration over a collection.):

var enumerator = new List<int> { 1, 2, 3 }.GetEnumerator();

while (enumerator.MoveNext())
{
    Console.WriteLine(enumerator.Current);
}
enumerator.Dispose();

IObservable

Each method of the IEnumerator interface has its corresponding method in the IObserver interface:

  • OnCompleted() – Notifies the observer that the provider has finished sending data.
  • OnNext(T) – Pushes the next value to the observer.
  • OnError(Exception) – Notifies the observer that an error occurred.

The general task of an observer is to push data into a stream.

Here is an example of how the methods described above can be called in order to create an observable sequence:

public static IObservable<int> GetData()
{
    return Observable.Create<int>(o =>
    {
        o.OnNext(1);
        o.OnNext(2);
        o.OnNext(3);
        o.OnCompleted();

        return Disposable.Empty;
    });
}

For detailed information on how this works and on all the query operators and more check out the amazing online resource about Rx .NET by Lee Campbell.

The methods of IEnumerator and IObservable relate to each other in a special way as shown in the following diagram. Actually they are dual to each other.

IEnumerableIObservable
pullpush
bool MoveNext()void OnCompleted()
T Current { get; }void OnNext()
throws Exceptionvoid OnError(Exception)

On an observable sequence and on an enumerable sequence the same or at least very similar functions can be applied:

// IObservable<string>
// that emits 75 Strings
GetData()
    .Skip(10)
    .Take(5)
    .Select(x => x + "_transformed")
    .Subscribe(x => Console.WriteLine("next => " + x));

// IEnumerablee<string>
// that contains 75 Strings
GetData()
    .Skip(10)
    .Take(5)
    .Select(x => x + "_transformed").ToList()
    .ForEach(x => Console.WriteLine("next => " + x));

Summary

We have seen how Rx can help to make the UI responsive when dealing with computationally expensive tasks.

Also we could get an idea of how easy and comfortable it is to handle asynchronous data from multiple sources with the use of the LINQ-style query operators.

Enabling and managing concurrency on the backend as well as handling exceptions, cancellation etc. are other areas where Rx really shines. But these details go beyond the scope of this post. For more information on that please check out the resources below.

Things to keep in mind:

  • Rx is a library for programming with asynchronous data streams
  • Rx provides a rich library of composable functions to transform, combine, filter etc.
  • The key types are IObserver<T> and IObservable<T>
  • The pair of types IObservable<T> and IObserver<T> is dual to the pair IEnumerable<T> and IEnumerator<T>
  • Rx provides rich functionalities for a scenario where a producer asynchronously pushes data to a consumer

Resources


The header image “Long exposure head lamps & tail lights” by JC+A is licensed under a Creative Commons Attribution 2.0. The picture was modified to fit this article.

Our Hackathon from the ChatGPT

Our team meets at regular intervals to work on projects together, to programme or to hold a “hackathon”. It is important to us that every

How to write cleaner parsing code

When tasked to extract information from a string, most seasoned developers – especially the ones with some kind of Linux background – will resort to