ReactorContext

前面章节笔记中我们曾介绍过ReactorCore中的线程和调度模型,其中我们可以看出响应式流的执行可以在不同的线程间自由切换,然而这种特性也带来了一个问题,在传统的同步阻塞式代码中,我们常用线程局部变量ThreadLocal在同一线程的不同调用栈之间传递上下文数据(例如用户身份信息、链路追踪ID等),但响应式流执行时可能跨越多个线程,线程局部变量在这种场景下就失效了。Reactor Context正是为了解决这一问题而设计的专用上下文机制,它不依赖线程局部变量,而是基于响应式流中的订阅关系实现。这篇笔记我们简单介绍Reactor Context的用法。

基本概念

Reactor Context是一个与订阅关系绑定的不可变键值对存储结构,它随着响应式流在整个管线中传播。与ThreadLocal不同的是,Reactor Context并不依赖于特定的线程,而是依附于订阅关系本身,因此无论响应式流在哪个线程上执行,我们都可以安全地读取到Reactor Context中的数据。Reactor Context有以下几个重要特性需要我们了解:

不可变性:Reactor Context是不可变的,每次调用写入操作都会返回一个新的Context实例而不是在原有实例上修改。

从下游向上游传播:Reactor Context在响应式管线中是从下游向上游传播的,因此写入Context的操作符必须在读取Context的操作符的下游才能生效,这一点与数据元素的流向恰好相反,理解这一点非常重要。

订阅级别隔离:每次订阅都有独立的Reactor Context,不同订阅之间的Reactor Context互相隔离。

Reactor Context使用

写入Reactor Context

下面例子代码中,我们通过contextWrite()操作符向上下文中写入数据。

package com.gacfox.demo;

import reactor.core.publisher.Mono;

public class Main {
    public static void main(String[] args) {
        Mono.just("Hello")
                .flatMap(value -> Mono.deferContextual(ctx -> {
                    String userId = ctx.get("userId");
                    System.out.println("Operate userId: " + userId);
                    return Mono.just(value);
                }))
                .contextWrite(ctx -> ctx.put("userId", "tom"))
                .subscribe(System.out::println);
    }
}

代码中,contextWrite()接收一个参数和返回值都是Context的函数式接口,我们在其中编写添加上下文信息的逻辑,Mono.deferContextual()用于在数据流中读取上下文中的数据,它的参数是一个接收ContextView(Context的只读视图)并返回Publisher的函数。

读取Reactor Context

读取Reactor Context主要有deferContextual()transformDeferredContextual()两种方式,它们适用于两种不同场景。

deferContextual()

deferContextual()适合在需要根据Context动态创建Publisher的场景使用,Mono和Flux都有一个deferContextual静态方法,它的作用是延迟创建整个流,订阅时执行Lambda拿到上下文,进行一些处理后返回全新的Mono或Flux,它只能出现在链的最开头。下面例子代码演示了这一用法。

package com.gacfox.demo;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class Main {
    public static void main(String[] args) {
        Flux.just("Apple", "Banana", "Cherry")
                .flatMap(value -> Mono.deferContextual(ctx -> {
                    String prefix = ctx.get("prefix");
                    return Mono.just(prefix + value);
                }))
                .contextWrite(ctx -> ctx.put("prefix", "[Fruit] "))
                .subscribe(System.out::println);
    }
}

transformDeferredContextual()

transformDeferredContextual()是Flux和Mono的实例方法,它的作用是改造已有流,订阅时执行Lambda拿到当前流和上下文,返回改造后的新流,它只能出现在链的中间。下面例子代码在流的中间插入了修改流元素的逻辑,运行后结果与上面相同。

package com.gacfox.demo;

import reactor.core.publisher.Flux;

public class Main {
    public static void main(String[] args) {
        Flux.just("Apple", "Banana", "Cherry")
                .transformDeferredContextual((flux, ctx) -> {
                    String prefix = ctx.get("prefix");
                    return flux.map(value -> prefix + value);
                })
                .contextWrite(ctx -> ctx.put("prefix", "[Fruit] "))
                .subscribe(System.out::println);
    }
}

多次写入与覆盖

我们可以多次调用contextWrite()向Reactor Context写入数据,多个contextWrite()操作的结果会被合并,如果存在相同的键,链式调用中更靠后(下游)的contextWrite()写入的值会被链式调用中更靠前(上游)的覆盖。这同样是因为Reactor Context从下游向上游传播的缘故。

package com.gacfox.demo;

import reactor.core.publisher.Mono;

public class Main {
    public static void main(String[] args) {
        Mono.deferContextual(ctx -> Mono.just(
                        "key1=" + ctx.getOrDefault("key1", "not found") + ", " +
                                "key2=" + ctx.getOrDefault("key2", "not found")
                ))
                .contextWrite(ctx -> ctx.put("key1", "value1 from writer1"))
                .contextWrite(ctx -> {
                    ctx = ctx.put("key1", "value1 from writer2");
                    ctx = ctx.put("key2", "value2 from writer2");
                    return ctx;
                })
                .subscribe(System.out::println);
    }
}

运行上面代码输出结果为key1=value1 from writer1, key2=value2 from writer2

作者:Gacfox
版权声明:本网站为非盈利性质,文章如非特殊说明均为原创,版权遵循知识共享协议CC BY-NC-ND 4.0进行授权,转载必须署名,禁止用于商业目的或演绎修改后转载。