Skip to content
DavidMGross edited this page Jul 16, 2013 · 53 revisions

RxJava is an implementation of Reactive Extensions – a library for composing asynchronous and event-based programs that use observable sequences – for the Java VM.

It extends the observer pattern to support sequences of data/events and adds operators that compose sequences together declaratively while abstracting away low-level threading, synchronization, thread-safety, concurrent data structures, non-blocking IO, and other such concerns.

It supports Java 5 or higher and JVM based languages such as Groovy, Clojure, Scala and JRuby.

Why?

Futures are Expensive to Compose

Java Futures are straightforward to use for a single level of asynchronous execution but they start to add non-trivial complexity when they’re nested.

It is difficult to use Futures to optimally compose conditional asynchronous execution flows (or impossible, as latencies of each request vary at runtime). It can be done of course, but it quickly becomes complicated (and thus error prone) or prematurely blocks on “Future.get()” - eliminating the benefit of asynchronous execution.

Futures are Less-Flexible in terms of the Data They Work With

RxJava’s Observables support not just the emission of single scalar values (as Futures do), but also of sequences of values or even infinite streams. Observable is a single abstraction that can be used for any of these use cases. An Observable has all of the flexibility and elegance associated with its mirror-image cousin the Iterable.

RxJava is More Lightweight, Less Restrictive than Akka Futures

The RxJava implementation is not biased toward some particular source of concurrency or asynchronocity. It also tries to be very lightweight (a single JAR focused on just the Observable abstraction and related higher-order functions).

A composable Future could be implemented just as generically, but Akka Futures for example come tied in with an Actor library and a lot of other stuff. RxJava tries not to restrict you in this way. You can choose to implement your Observables using actors, thread-pools, event loops, non-blocking I/O, or whatever implementation suits your needs, your style, or your expertise.

Callbacks Have Their Own Problems

Callbacks offer a solution to the tendency to block on Future.get() by not allowing anything to block. They are naturally efficient because they execute when the response is ready.

But as with Futures, while callbacks are easy to use with a single level of asynchronous execution, they become unwieldy with nested composition.

RxJava is a Polyglot Implementation

RxJava is meant for a more polyglot environment than just Java/Scala, and it is being designed to respect the idioms of each language. (This is something we’re still working on.)

Functional Reactive Programming (FRP)

RxJava offers efficient execution and composition by providing a collection of operators with which you can filter, select, transform, combine, and compose Observables.

The Observable class can be thought of as a “push” equivalent to Iterable, which is “pull.” With an Iterable, the consumer pulls values from the producer and the thread blocks until those values arrive. By contrast, with an Observable the producer pushes values to the consumer whenever values are available. This approach is more flexible, because values can arrive synchronously or asynchronously.

The Observable type adds two missing semantics to the Gang of Four’s Observer pattern, to match those that are available in the Iterable type:

  1. the ability for the producer to signal to the consumer that there is no more data available
  2. the ability for the producer to signal to the consumer that an error has occurred

With these additions, RxJava unifies the Iterable and Observable types. The only difference between them is the direction in which the data flows. This is very important because now any operation you can perform on an Iterable, you can also perform on an Observable. Here is an example:

/**
 * Asynchronously calls 'customObservableNonBlocking' and defines 
 * a chain of operators to apply to the callback sequence.
 */
def simpleComposition() {
  // fetch an asynchronous Observable<String> 
  // that emits 75 Strings of 'anotherValue_#'
  customObservableNonBlocking()
    // skip the first 10
    .skip(10)
    // take the next 5
    .take(5)
    // transform each String with the provided function
    .map({ stringValue -> return stringValue + "_transformed" })
    // subscribe to the sequence and print each transformed String
    .subscribe({ println "onNext => " + it })
}
 
// output
onNext => anotherValue_10_transformed
onNext => anotherValue_11_transformed
onNext => anotherValue_12_transformed
onNext => anotherValue_13_transformed
onNext => anotherValue_14_transformed

More Information

RxJava Libraries

The following external libraries can work with RxJava:

sidebar

Clone this wiki locally