-
Notifications
You must be signed in to change notification settings - Fork 135
Example : Converting examples from RxJava
RxJava Observables are closest in behavior to SequenceM streams (from cyclops-streams and included in simple-react). Although LazyFutureStream and Observable share many operators in a very rich api, their implementations and behaviors are very different. LazyFutureStream & SimpleReactStream operate at a higher level of abstraction (roughly in functional terms - Observable is to Monad, what LazyFutureStream is to MonadTransformer).
RxJava Observables (& SequenceM) are sequential streams that may be executed asyncrhonously on a single targeted thread.
LazyFutureStream & SimpleReactStream are Streams of asyncrhonous tasks executed in parallel.
This article Optimizing simple-react Streams gives some insight into the scenarios where each approach will perform best. LazyFutureStream and SimpleReactStream will significantly outperform sequential streams for processing large amounts of I/O data. Where as the sequential Streams will outperform for standard sequential tasks (The overhead of managing a large number of Futures is not worth the effort in that case).
The simple-react Streams are firmly in the realm of standard JDK 8 Streams, RxJava and ReactiveX are popular enough that Observable is it's own standard with a very vibrant and active community. The simple-react Streams seek to benefit from incremental enhancements in the JDK (e.g. Stream enhancements coming in Java 9), and from community enhancements, such as the continued evolution of jOOλ. Implementing java.util.stream.Stream means simple-react Streams work with other community projects such a Proton Pack, the cyclops-monad-api etc.
Both RxJava and simple-react implement the reactive-streams api so it is relatively straight forward for each stream type to inter-connect.
LazyFutureStream has these operators natively.
Very early conversion take. This was written against a very early version of simple-react. simple-react and RxJava take opposite approaches to Streaming data. RxJava pushes data through a Stream of Observables to Subscribers, where as simple-react uses a mixed pull/ push model. For simple-react Streams a pull always starts the chain and the work. Data can be pushed into Streams via async datastructures (see PushableStreamBuilder).
Dan Lew has an excellent blog post on Grokking RxJava. I thought it might be instructive to convert some of the examples into SimpleReact, and highlight what the differences are - as SimpleReact and RxJava take a fundamentally different approach to concurrency.
http://blog.danlew.net/2014/09/22/grokking-rxjava-part-2/
Given a query method that Asynchronously returns a List of URLs :-
RxJava
Observable<List<String>> query(String text);
SimpleReact
CompletableFuture<List<String>> query(String string);
The Api's of our starting points, if you follow the links, are very feature rich, but radically different.
Get the title of each url, filter any null null titles, take the first 5 results, save them to disk and print the title to the console.
RxJava
query("Hello, world!")
.flatMap(urls -> Observable.from(urls))
.flatMap(url -> getTitle(url))
.filter(title -> title != null)
.take(5)
.doOnNext(title -> saveTitle(title))
.subscribe(title -> System.out.println(title));
My understanding of the RxJava code is that it is single threaded but free threaded. The Observable section of the code and the subscriber section of the code are free to target a single (and different) thread to run on.
This choice will result in very different implementations under the hood from SimpleReact. For example see this comment on the RxJava implementation of take :- https://github.com/ReactiveX/RxJava/issues/1334). There is no need to synchronize access to the count in the OperatorTake class, because it is not operating in a multi-threaded environment - https://github.com/benjchristensen/RxBackpressure/blob/master/rx-bp-prototype/src/main/java/rx/operators/OperatorTake.java.
SimpleReact
int count =0;
List<String> titles = new SimpleReact().fromStream(Stream.of(query("Hello, world!")))
.flatMap(Collection::stream)
.<String>then(url -> getTitle(url))
.filter(Objects::nonNull)
.filter(Predicates.take(5))
.peek(title -> saveTitle(title) )
.peek(System.out::println)
The functionally equivalent code in SimpleReact behaves very differently in terms of concurrency. First all of the URLs are processed concurrently once they are extracted asynchronously from the CompletableFuture.
The core SimpleReact api doesn't provide a built-in 'take' implementation (although it does via Stream and Seq). Take is a subset of filter, and so can be implemented as a java.util.Predicate. Examples, of filtering Predicates - including this one are available in the Predicates class in SimpleReact. In the example above we've used a lambda expression with a synchronized block to ensure an atomic increment and check operation of the current number of extracted titles.
As an aside you can target a different taskExecutor for each stage in the flow in SimpleReact.
If we define a Take class (which should be included in the next SimpleReact minor release) :
class Take<T> implements Predicate<T>{
private final int limit;
private int count=0;
public Take(int limit) {
this.limit = limit;
}
@Override
public synchronized boolean test(T o) {
return count++<limit;
}
}
We can reuse that with SimpleReact's filter to method to implement a concurrent take equivalent.
List<String> titles = new SimpleReact().reactToCollection(query("Hello, world!").get())
.<String>then(url -> getTitle(url))
.filter(Objects::nonNull)
.filter(new Take(5))
.peek(title -> saveTitle(title) )
.peek(System.out::println)
Our concurrent take predicate can now be used with any JDK 8 library that uses Predicates.
oops - my bad