Inspired by Grokking RxJava Tutorial check the code snippet below, in this post or check it here http://pastebin.com/9MR4k5E5
package org.rx.java.tutorial;
import java.util.ArrayList;
import java.util.List;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Func1;
public class Main {
// Check this tutorial:
// http://blog.danlew.net/2014/09/15/grokking-rxjava-part-1/
// Dependency for gradle project:
// compile 'io.reactivex:rxjava:1.1.1'
// ====================================================================
// Key idea #1: Observable and Subscriber can do anything.
// Key idea #2: The Observable and Subscriber are independent of the
// transformational steps in between them.
// Key idea #3: Operators let you do anything to the stream of data. The
// only limit is yourself.
// ====================================================================
// Notes:
// 1. onError() is called if an Exception is thrown at any time.
// 2. The operators don't have to handle the Exception.
// 3. You know when the Subscriber has finished receiving items.
// 4. When you call Observable.subscribe(), it returns a Subscription.
// 5. RxJava has awesome docs / marble diagrams inside code
// 6. using unsubscribe will terminate wherever it is currently executing
// code
// 7. List of operators:
// https://github.com/ReactiveX/RxJava/wiki/Alphabetical-List-of-Observable-Operators
public static void main(String[] args) {
// 1. verbose example
Observable<String> myObservable = Observable.create(new Observable.OnSubscribe<String>() {
public void call(Subscriber<? super String> sub) {
sub.onNext(extracted());
sub.onCompleted();
}
private String extracted() {
return "1. Hello, world!";
}
});
Subscriber<String> mySubscriber = new Subscriber<String>() {
public void onCompleted() {
System.out.println("1. the end");
}
public void onError(Throwable e) {
System.out.println("error");
}
public void onNext(String s) {
System.out.println(s);
}
};
myObservable.subscribe(mySubscriber);
// 2. almost same thing as above
Observable.just("2. Hello, world!").subscribe(s -> System.out.println(s + "-Krzysztof"));
// 3. map operator - transforming
Observable.just("3. Hello, world!").map(s -> s + "-Krzysztof")
.subscribe(s -> System.out.println(s));
// http://reactivex.io/documentation/operators/map.html
// 4. subscriber receive Integer
Observable.just("4. Hello, world!").map(s -> s.hashCode())
.subscribe(i -> System.out.println("4. " + Integer.toString(i)));
// 5. subscriber shouldn't have much logic so...
Observable.just("5. Hello, world!").map(s -> s.hashCode()).map(i -> Integer.toString(i))
.subscribe(s -> System.out.println("5. " + s));
// 6. some transformations
Observable.just("6. Hello, world!").map(s -> s + " -Krzysztof").map(s -> s.hashCode())
.map(i -> Integer.toString(i)).subscribe(s -> System.out.println("6. " + s));
List<String> arrayList = new ArrayList<String>();
arrayList.add("Krzysztof");
arrayList.add("Karol");
arrayList.add("Irek");
// 7. from operator emits one element each time
Observable.just(arrayList).subscribe(names -> {
Observable.from(names).subscribe(name -> System.out.println("7. Name: " + name));
});
// http://reactivex.io/documentation/operators/from.html
// 8. list observable transformed into observable which emits single
// result with after change name, filtering and taking entry at the end
Observable.just(arrayList).flatMap(names -> Observable.from(names))
.flatMap(name -> Observable.just(name + " is awesome"))
.filter(nameChange -> !"Karol is awesome".equals(nameChange)).take(1)
.subscribe(result -> System.out.println("8. " + result + "- this is the result"));
// http://reactivex.io/documentation/operators/flatmap.html
// 9. doOnNext() allows us to add extra behavior each time an item is
// emitted, in this case singing 'lalala'
Observable.just(arrayList).flatMap(names -> Observable.from(names))
.flatMap(name -> Observable.just(name + " is awesome"))
.filter(nameChange -> !"Irek is awesome".equals(nameChange)).take(1)
.doOnNext(nameChange -> System.out.println("9. lalalla"))
.subscribe(result -> System.out.println("9. " + result + "- this is the result"));
// http://reactivex.io/documentation/operators/filter.html
// http://reactivex.io/RxJava/javadoc/rx/Observable.html#doOnNext(rx.functions.Action1)
// 10. error example
Observable.just(arrayList).flatMap(names -> Observable.from(names))
.flatMap(name -> Observable.just(name + " is awesome"))
.filter(nameChange -> getError(nameChange).equals("anything")).take(1)
.doOnNext(nameChange -> System.out.println("10. lalalla"))
.onErrorReturn(new Func1<Throwable, String>() {
@Override
public String call(Throwable t) {
return "error";
}
})
.subscribe(result -> System.out.println("10. " + result + " - this is the result"));
// 11. Schedulers: easy threading
// myObservableServices.retrieveImage(url)
// .subscribeOn(Schedulers.io())
// .observeOn(AndroidSchedulers.mainThread())
// .subscribe(bitmap -> myImageView.setImageBitmap(bitmap));
// subscribeOn(Scheduler scheduler): Asynchronously subscribes Observers
// to this Observable on the specified Scheduler.
// observeOn(Scheduler scheduler): Modifies an Observable to perform its
// emissions and notifications on a specified Scheduler, asynchronously
// with a bounded buffer.
// check: http://reactivex.io/documentation/operators/subscribeon.html
}
private static String getError(String name) {
throw new RuntimeException();
}
// OUTPUT:
// 1. Hello, world!
// 1. the end
// 2. Hello, world!-Krzysztof
// 3. Hello, world!-Krzysztof
// 4. -1028118321
// 5. -1538652498
// 6. -1846029788
// 7. Name: Krzysztof
// 7. Name: Karol
// 7. Name: Irek
// 8. Krzysztof is awesome- this is the result
// 9. lalalla
// 9. Krzysztof is awesome- this is the result
// 10. error - this is the result
}