Dependency:
<dependency><groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<version>1.0.14</version>
</dependency>
Code:
Observable:ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 5, 3000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
Observable<ChannelItemType> myObservable = Observable.create(
new Observable.OnSubscribe<ChannelItemType>() {
@Override
public void call(Subscriber<? super ChannelItemType> sub) {
sub.onNext(item);
sub.onCompleted();
}
}
).subscribeOn(Schedulers.from(executor));
myObservable.subscribe(new ChannelConsumer(channelRepo));
Above snippet, we create an Observable, sent item on onNext() function to Consumer (in this case ChannelConsumer). One interesting thing here is we thread consuming by using executor. We create 3 thread in executor. Obervable automatically pick thread in executor and send data.
Consumer:
public class ChannelConsumer extends Subscriber<ChannelItemType> {
@Override
public void onCompleted() {
//Clean up your process when complete
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(ChannelItemType channelItem) {
//put your code here
}
}