Scheduler即调度器,用于控制回调函数的线程,RxJava是一个异步操作的简化库,异步操作显然应该运行在子线程中,让使用者手动创建子线程就太不方便了,所以RxJava提供了调度器,供使用者进行线程控制。
Schedulers.immediate()
:直接在当前线程运行,是默认的调度方式。Schedulers.newThread()
:启用新线程,在新线程中运行。Schedulers.io()
:IO操作使用的调度器,可以看做另一种Schedulers.newThread(),其内部实现对IO操作的线程调度进行了一些优化。Schedulers.computation()
:计算操作使用的调度器,其内部实现对计算操作的线程调度进行了一些优化。IO调度器和计算调度器区别:不要将两者混用,实际上,IO调度器内部可能是一个线程池,计算调度器则固定线程数是CPU执行线程数。混用性能会下降。
observable.subscribeOn()
:事件产生的线程observable.observeOn()
:事件消费的线程打印线程ID例子:
System.out.println("主线程" + Thread.currentThread().getId());
//简化的代码
Observable
.create((subscriber) ->
{
System.out.println("发送事件" + Thread.currentThread().getId());
subscriber.onNext("hello1");
subscriber.onNext("hello2");
subscriber.onNext("hello3");
subscriber.onCompleted();
})
.subscribeOn(Schedulers.immediate())
.observeOn(Schedulers.newThread())
.subscribe((s) ->
{
System.out.println("处理事件" + Thread.currentThread().getId());
System.out.println(s);
}, (throwable) ->
{
}, () ->
{
System.out.println("事件完成" + Thread.currentThread().getId());
System.out.println("completed");
});
运行结果:
主线程1
发送事件1
处理事件13
hello1
处理事件13
hello2
处理事件13
hello3
事件完成13
completed
Process finished with exit code 0
注意:三个onNext是在子线程中顺序执行的,不是创建多个子线程并行执行的。