Java中,编写异步代码为了充分利用线程资源,我们通常要创建合适的线程池。对于计算密集型任务,我们的线程池大小和CPU核心数相关,而对于IO密集型任务,我们通常要创建数量更大的线程池,但也要设置上限,避免系统资源耗尽。不过在ReactorCore中,框架试图让我们以并发无关的写法编写异步代码,而线程的管理是通过Scheduler调度器声明式指定的。
ReactorCore中,调度器需要实现Scheduler
接口,不过ReactorCore内置提供了一些调度器实现,我们实际开发中一般都是使用这些调度器来调度线程。
调度器 | 线程模型 | 适用场景 |
---|---|---|
Schedulers.immediate() | 立即在当前线程执行 | 无需切换线程的简单任务 |
Schedulers.single() | 全局单一线程池 | 需要串行执行的任务 |
Schedulers.parallel() | 固定大小线程池(CPU核数) | 计算密集型任务 |
Schedulers.boundedElastic() | 弹性线程池(默认上限10*CPU核数) | IO密集型任务 |
Schedulers.fromExecutor() | 使用自定义ExecutorService | 需自行管理线程池的场景(较少使用) |
响应式流可以理解为一条声明式的处理管线,这个管线只有在订阅时才会真正的被创建,这些内部流程对开发者是不可见的。
subscribeOn()
和publishOn()
在构建这个处理管线的过程中,我们可以用两种方式为响应式流设置调度器,subscribeOn()
和publishOn()
:
subscribeOn()
:该方法从头开始影响整个响应式流的执行线程,subscribeOn()
通常我们只会调用它一次,如果流程中还有publishOn()
,后者仍会继续生效publishOn()
:该方法会改变后续操作符的执行所在线程,直到下一个publishOn()
出现,它允许被多次调用,实现动态的线程调度策略切换subscribeOn()
的典型使用场景是将阻塞式的API切换到非阻塞线程。
Flux.fromIterable(fetchDataSync())
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
publishOn()
的典型使用场景是在对数据进行阶段性处理时,使用该方法切换后续操作的线程。
Flux.fromIterable(data)
.publishOn(Schedulers.parallel())
.map(data -> processData(data))
.publishOn(Schedulers.boundedElastic())
.subscribe(result -> saveResult(result));