线程模型和调度器

Java中,编写异步代码为了充分利用线程资源,我们通常要创建合适的线程池。对于计算密集型任务,我们的线程池大小和CPU核心数相关,而对于IO密集型任务,我们通常要创建数量更大的线程池,但也要设置上限,避免系统资源耗尽。不过在ReactorCore中,框架试图让我们以并发无关的写法编写异步代码,而线程的管理是通过Scheduler调度器声明式指定的。

调度器

ReactorCore中,调度器需要实现Scheduler接口,不过ReactorCore内置提供了一些调度器实现,我们实际开发中一般都是使用这些调度器来调度线程。

调度器 线程模型 适用场景
Schedulers.immediate() 立即在当前线程执行 无需切换线程的简单任务
Schedulers.single() 全局单一线程池 需要串行执行的任务
Schedulers.parallel() 固定大小线程池(CPU核数) 计算密集型任务
Schedulers.boundedElastic() 弹性线程池(默认上限10*CPU核数) IO密集型任务
Schedulers.fromExecutor() 使用自定义ExecutorService 需自行管理线程池的场景(较少使用)

响应式流可以理解为一条声明式的处理管线,这个管线只有在订阅时才会真正的被创建,这些内部流程对开发者是不可见的。

subscribeOn()publishOn()

在构建这个处理管线的过程中,我们可以用两种方式为响应式流设置调度器,subscribeOn()publishOn()

  • subscribeOn():该方法从头开始影响整个响应式流的执行线程,subscribeOn()通常我们只会调用它一次,如果流程中还有publishOn(),后者仍会继续生效
  • publishOn():该方法会改变后续操作符的执行所在线程,直到下一个publishOn()出现,它允许被多次调用,实现动态的线程调度策略切换

subscribeOn()的典型使用场景是将阻塞式的API切换到非阻塞线程。

Flux.fromIterable(fetchDataSync())
        .subscribeOn(Schedulers.boundedElastic())
        .subscribe();

publishOn()的典型使用场景是在对数据进行阶段性处理时,使用该方法切换后续操作的线程。

Flux.fromIterable(data)
        .publishOn(Schedulers.parallel())
        .map(data -> processData(data))
        .publishOn(Schedulers.boundedElastic())
        .subscribe(result -> saveResult(result));
作者:Gacfox
版权声明:本网站为非盈利性质,文章如非特殊说明均为原创,版权遵循知识共享协议CC BY-NC-ND 4.0进行授权,转载必须署名,禁止用于商业目的或演绎修改后转载。
Copyright © 2017-2024 Gacfox All Rights Reserved.
Build with NextJS | Sitemap