Thursday, September 17, 2015

ReactiveX in Java

Dependency:

<dependency>
    <groupId>io.reactivex</groupId>
    <artifactId>rxjava</artifactId>
    <version>1.0.14</version>
</dependency>

Code:

Observable:

ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 5, 3000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());

Observable<ChannelItemType> myObservable = Observable.create(
        new Observable.OnSubscribe<ChannelItemType>() {
            @Override
            public void call(Subscriber<? super ChannelItemType> sub) {
                sub.onNext(item);
                sub.onCompleted();
            }
        }
).subscribeOn(Schedulers.from(executor));
myObservable.subscribe(new ChannelConsumer(channelRepo));

Above snippet, we create an Observable, sent item on onNext() function to Consumer (in this case ChannelConsumer). One interesting thing here is we thread consuming by using executor. We create 3 thread in executor. Obervable automatically pick thread in executor and send data.

Consumer:
public class ChannelConsumer extends Subscriber<ChannelItemType> {
    @Override
    public void onCompleted() {
        //Clean up your process when complete
    }
    @Override
    public void onError(Throwable throwable) {

    }
    @Override
    public void onNext(ChannelItemType channelItem) {
        //put your code here
    }
}