往简单的方向深入理解,或许反应式编程更容易入门
反应式编程虽然能提升性能,有诸多好处,却也带来一些弊端,增加代码的复杂度、高度的API侵入(相当于依赖了一个JDK)。笔者个人认为,反应式编程不适用于业务开发,特别是复杂业务系统的开发,这或许就是反应式编程从推出到现在依然不温不火的原因吧。当然,这并不是劝说大家从入门到放弃。反应式编程适合做对性能要求高的中间件,或者脱离业务的底层系统,例如网关、消息推送服务。
- Reactive Streams规范
- Reactor是如何实现Reactive Streams规范的
- 拨开反应式编程中Context实现的神秘面纱
- 委托模式的使用与BaseSubscriber
- 为spring-data-r2dbc实现多数据源动态切换
反应式编程Reactor库完全实现了Reactive Streams规范,Reactive Streams定义了反应式编程的规范,如果你到Github查看它,你将只会看到这四个接口:Publisher、Subscriber、Subscription、Processor。
在了解这几个接口之前,我们需要先了解什么是反应式编程。
本文观点仅站在笔者的个人角度理解,正确性与笔者的水平有关,读者在阅读本篇文章过程中,如果有疑惑的地方欢迎留言探讨!
以往阻塞式编程我们发起远程调用等I/O操作都是阻塞当前线程以等待接口的响应,待接收到响应后再消费响应结果。在等待过程中该线程会一直处于空闲状态,而如果是反应式编程,在发起请求后,当前线程就会转去做别的事情,直到接收到响应结果,再发布响应结果给订阅者继续消费响应结果,当然,发起网络请求不在Reactor库的职责范围内。
反应式编程,由发布者通过发布数据传递给订阅者消费数据。
反应式流指的是一个原始数据经过多重操作或者转化后,最终被订阅者消费。而每一步操作或转化也都是一次数据的发布订阅,这些发布订阅按顺序组合到一起就构成了反应式流。
Reactive Streams规范
- Publisher:发布者;
- Subscriber:订阅者;
- Subscription:订阅,用于连接发布者和订阅者;
- Processor:处理器,即是订阅者也是发布者;
由于这几个接口只是Reactive Streams定义的规范,详细执行过程需要结合Reactive Streams规范的实现库理解,因此我们先简单熟悉一下这几个接口,然后再介绍Reactor如何实现这些接口。
Publisher(发布者)
public interface Publisher {
void subscribe(Subscriber super t> s);
}
复制代码
- subscribe:订阅者订阅发布者;
如果接触过WebFlux或者用过Reactor库应该对subscribe方法不陌生,尽管还不了解工作原理,但至少我们知道,只有调用Mono/Flux的subscribe方法才会触发整个流执行。
即便没有接触过WebFlux或者Reactor,我们也应该都接触过Java8提供的Stream。Java8 Stream只有遇到终止操作才会触发流的执行,而反应式编程Publisher的subscribe方法就相当于Java8 Stream的终止操作。
在没有调用Publisher#subscribe方法之前,一切操作都只是我们定义的执行计划,执行计划制定整个流的执行过程。
Subscriber(订阅者)
public interface Subscriber {
void onSubscribe(Subscription s);
void onNext(T t);
void onError(Throwable t);
void onComplete();
}
复制代码
Reactive Streams规范定义订阅者(Subscriber)可以订阅四种事件:
- onSubscribe:通常由Publisher调用,被此订阅者订阅的发布者执行Publisher#subscribe方法时调用;
- onNext:通常由Subscription调用,Subscription#request请求到数据时调用;
- onError:通常由onNext方法调用,当onNext捕获到数据消费异常时被调用;
- onComplete:通常由Subscription调用,在数据全部被正常消费完成、没有错误导致订阅终止时被调用;
Subscription(订阅)
public interface Subscription {
void request(long n);
void cancel();
}
复制代码
订阅操作(Subscription),相当于是一个场景类。
- request:订阅者调用此方法请求指定数量的数据,在请求到数据时调用订阅者的onNext方法传递数据给订阅者,通常在Subscriber的onSubscribe方法中被调用;
- cancel:通常由订阅者调用此方法来取消订阅,此方法被调用后request不再产生数据、不再触发订阅者的onNext;
Reactor是如何实现Reactive Streams规范的
一个简单的Mono使用例子如下。
public class MonoStu{
public static void main(String[] args){
// (1)
Mono.just(1)
// (2)
.subscribe(System.out::println);
}
}
复制代码
我们将一步步分析此案例的执行流程,以此了解Reactor是如何实现Reactive Streams规范的。
阅读本文不需要读者去翻阅源码,当然,如果能结合源码一起看效果更佳。Mono源码在reactor-core库的reactor.core.publisher包下。
public abstract class Mono
implements Publisher {
}
复制代码
Mono是一个抽象类,它实现了Reactive Streams规范的Publisher接口,并扩展发布者的操作以提供流式编程(反应式流)。
我们先看Mono#just静态方法:
public abstract class Mono implements Publisher {
public static Mono just(T data) {
return onAssembly(new MonoJust<>(data));
}
}
复制代码
初次阅读源码,并且在不了解Reactor库的情况下,笔者不建议大家去纠结onAssembly方法,这也是学习方法,先掌握主干,再去关心细枝末节,所以我们选择忽略onAssembly方法。
忽略onAssembly之后的Mono#just静态方法(后文同):
public abstract class Mono implements Publisher {
public static Mono just(T data) {
return new MonoJust<>(data);
}
}
复制代码
just方法返回一个MonoJust对象,此类继承Mono。类似于我们使用Builder构造者模式,每调用一个方法都会返回this,直到调用build方法。只是Reactor的Mono#just返回的是一个新的Mono对象。
本着阅读源码先掌握主干的宗旨,我们去掉了MonoJust实现的其它接口,只关心它继承Mono实现的方法。经过精简后MonoJust的源码如下。
final class MonoJust extends Mono {
final T value;
MonoJust(T value) {
this.value = value;
}
// 把注意力集中到这
@Override
public void subscribe(CoreSubscriber super t> actual) {
actual.onSubscribe(Operators.scalarSubscription(actual, value));
}
}
复制代码
由于Mono也是一个发布者,它实现了Publisher接口,所以我们重点关注的是MonoJust实现的subscribe方法。
如上面源码所示,MonoJust#subscribe方法的入口是一个CoreSubscriber,说明在父类(Mono)已经实现过Publisher接口的subscribe方法,源码如下。
public abstract class Mono implements Publisher {
@Override
public final void subscribe(Subscriber super t> actual) {
// 原本是onLastAssembly(this) ,
// 我们忽略onLastAssembly,直接使用this
this.subscribe(Operators.toCoreSubscriber(actual));
}
// 子类MonoJust实现
public abstract void subscribe(CoreSubscriber super t> actual);
}
复制代码
现在我们只知道CoreSubscriber应该是Subscriber的子类,目前来说,了解到这一层足够了。
我们先继续看案例中的下一句#.subscribe(System.out::println),然后再回头分析MonoJust#subscribe方法。
Mono提供很多个subscribe方法的重载,无论我们使用哪个重载方法,最后都会调用Mono实现Publisher接口的subscribe方法,也就会调用到Mono子类实现的subscribe方法。
此案例中,我们调用subscribe传入的是一个lambda,对应是实现Consumer接口的accept方法, 该Consumer最终被包装成LambdaMonoSubscriber,代码如下。
public abstract class Mono implements Publisher {
public final Disposable subscribe(Consumer super t> consumer) {
// 创建LambdaMonoSubscriber
return subscribeWith(new LambdaMonoSubscriber<>(consumer, null, null, null));
}
}
复制代码
请把注意力集中在consumer、new LambdaMonoSubscriber()以及subscribeWith方法上,不要走神哦,因为逻辑有点绕,一走神就看不懂了。
subscribeWith方法源码如下:
public abstract class Mono implements Publisher {
public final <E extends Subscriber super t>> E subscribeWith(E subscriber) {
// 调用的是Mono实现Publisher接口的subscribe方法
subscribe(subscriber);
// 刚入门不用考虑返回值Disposable,这是用于取消订阅的
// return subscriber;
}
}
复制代码
subscribeWith调用Mono实现Publisher接口的subscribe方法,因此LambdaMonoSubscriber必然是一个Subscriber。
到此,我们应该关心发布者Mono是如何传递数据给到订阅者(LambdaMonoSubscriber)的,为此我们需要回头分析MonoJust#subscribe方法。
final class MonoJust extends Mono {
final T value;
MonoJust(T value) {
this.value = value;
}
// 参数actual是LambdaMonoSubscriber
@Override
public void subscribe(CoreSubscriber super t> actual) {
// 创建Subscription
Subscription subscription = Operators.scalarSubscription(actual, value);
actual.onSubscribe(subscription);
}
}
复制代码
此处subscribe的参数actual就是订阅者LambdaMonoSubscriber对象,发布者MonoJust直接在subscribe方法中调用订阅者的onSubscribe方法。
Reactive Streams规范中,订阅者Subscriber的onSubscribe方法要求传入的是一个Subscription,所以MonoJust#subscribe方法中需要将真实订阅者LambdaMonoSubscriber和数据value封装成一个Subscription,这个Subscription同时也是一个Subscriber。
LambdaMonoSubscriber会在onSubscribe方法中调用Subscription的request请求数据。
LambdaMonoSubscriber源码如下,我们忽略LambdaMonoSubscriber构造方法传入的空参数,以简化LambdaMonoSubscriber的代码,便于阅读。
final class LambdaMonoSubscriber implements InnerConsumer, Disposable {
// consumer是案例传递的:System.out::println
final Consumer super t> consumer;
// 现在不要去考虑为什么要使用volatile
volatile Subscription subscription;
LambdaMonoSubscriber(@Nullable Consumer super t> consumer) {
this.consumer = consumer;
}
// s = MonoJust中的Operators.scalarSubscription(actual, value)返回值
@Override
public final void onSubscribe(Subscription s) {
this.subscription = s;
// 请求数据
s.request(Long.MAX_VALUE);
}
}
复制代码
由于我们研究的是Mono,所以onSubscribe方法请求获取所有的数据s.request(Long.MAX_VALUE)。
在分析此方法之前,得要知道
Operators.scalarSubscription返回的Subscription是一个ScalarSubscription(同步订阅的实现)。
static final class ScalarSubscription {
// 这里是LambdaMonoSubscriber
final CoreSubscriber super t> actual;
// 数据
final T value;
ScalarSubscription(CoreSubscriber super t> actual, T value) {
this.value = Objects.requireNonNull(value, "value");
this.actual = Objects.requireNonNull(actual, "actual");
}
@Override
public void request(long n) {
Subscriber super t> a = actual;
a.onNext(value);
a.onComplete();
}
}
复制代码
为了简单,笔者又把ScalarSubscription代码去掉了很多,这样看起来较为容易理解。
真实订阅者actual与数据value都是在构造方法中传入的,在案例中,actual就是LambdaMonoSubscriber,value就是调用Mono#just传入的1。
当调用LambdaMonoSubscriber#onSubscribe方法时,ScalarSubscription#request被调用,request方法中直接调用真实订阅者actual (LambdaMonoSubscriber)的onNext方法传递数据value,并且在onNext方法执行结束之后,订阅者actual(LambdaMonoSubscriber)的onComplete方法被调用。
此处订阅者actual是LambdaMonoSubscriber,LambdaMonoSubscriber#onNext方法经过笔者修剪后的源码如下:
final class LambdaMonoSubscriber implements InnerConsumer, Disposable {
@Override
public final void onNext(T x) {
if (consumer != null) {
try {
// 执行main方法中subscribe传入的lambda==>System.out::println
consumer.accept(x);
}catch (Throwable t) {
Operators.onErrorDropped(t, Context.empty());
}
}
}
}
复制代码
onNext中调用的consumer正是我们传递的lambda表达式,也就是打印输出订阅到的数据。
此案例分析并未看到异步的实现,这是因为我们往最简单的不需要异步的场景分析。
总结此案例的执行流程如下:
- 1、调用Mono#just创建一个MonoJust发布者,并且参数传递的value将是该发布者需要发布的数据;
- 2、调用MonoJust#subscribe(Consumber lambda)传递一个消费者消费数据,而该消费者会被包装成一个订阅者LambdaMonoSubscriber;
- 3、MonoJust#subscribe(CoreSubscriber)被调用,在该方法中调用了Operators.scalarSubscription(actual, value)创建ScalarSubscriptin,
并且调用了订阅者LambdaMonoSubscriber的onSubscribe方法;
- 4、LambdaMonoSubscriber#onSubscribe被调用,在该方法中调用了ScalarSubscription#request请求数据;
- 5、ScalarSubscription#request调用真实订阅者LambdaMonoSubscriber的onNext方法并传递数据,在onNext方法执行完成后调用onComplete方法;
- 6、LambdaMonoSubscriber#onNext调用案例中的lambda表达式消费数据。
现在我们再来丰富下案例:
public class MonoStu{
public static void main(String[] args){
// (1)
Mono.just(1)
// (2)
.map(String::valueOf)
// (3)
.subscribe(System.out::println);
}
}
复制代码
我们增加了map(String::valueOf)操作,计划在订阅到MonoJust传递的数据时,将数据转为字符串,再将转化为字符串后的数据传递给真实订阅者订阅。
我们已经知道Mono.just返回的是一个MonoJust,那么map返回的是什么呢?
以下是笔者修剪后的Mono#map源码:
public abstract class Mono implements Publisher {
public final Mono map(Function super t extends r> mapper) {
return new MonoMap<>(this, mapper);
}
}
复制代码
可见, map返回的是一个MonoMap:
final class MonoMap extends MonoOperator {
final Function super t extends r> mapper;
MonoMap(Mono extends t> source, Function super t extends r> mapper) {
super(source);
this.mapper = mapper;
}
@Override
public void subscribe(CoreSubscriber super r> actual) {
source.subscribe(new FluxMap.MapSubscriber<>(actual, mapper));
}
}
复制代码
这里出现了一个新的抽象类MonoOperator,MonoOperator我们以后会用得很多,例如:R2DBC动态数据源切换、Sentinel对WebFlux的支持都会用到。
MonoOperator是将一个Mono(source)转为一个新的Mono(此处是MonoMap),当MonoMap的subscribe方法被调用时再调用source的subscribe方法,这样就能将两个发布者Mono串连起来了。
由于逻辑比较绕,我们先整理下逻辑:
- 1、顺序操作
首先案例中,我们是先创建MonoJust,然后才创建MonoMap,所以最后调用的是MonoMap的subscribe方法;
- 2、倒序订阅
在(1)之后,先是MonoMap的subscribe方法被执行,然后由MonoMap调用MonoJust的subscribe;
// source是MonoJust
source.subscribe(new FluxMap.MapSubscriber<>(actual, mapper));
复制代码
- 3、顺序消费数据
在(2)之后,就先是MonoJust的onSubscribe方法被调用,所以MonoJust的onNext方法先被执行,然后再到MonoMap的onSubscribe方法被调用,所以MonoMap的onNext方法后执行;
所以,在此案例中,MonoMap的subscribe方法传递的参数才是真实的订阅者System.out::println,而source是MonoJust。在MonoMap#subscribe方法被调用时,先调用MonoJust的subscribe方法,并将真实订阅者封装成FluxMap.MapSubscriber传递给MonoJust,让MonoJust认为FluxMap.MapSubscriber是真实订阅者,当FluxMap.MapSubscriber的onSubscribe方法被调用时, 再由它调用真实订阅者的onSubscribe方法。其实是用了委托设计模式。
到此,复杂一点的例子我们也分析完成了,实际上,很多的操作都是通过MonoOperator实现,这也是能实现上下文Context传递的原因,所以接下来我们将分析Context的实现。
拨开反应式编程中Context实现的神秘面纱
根据上一节总结的多个操作(发布-订阅)组合成一个流的执行流程为:顺序操作、倒序订阅、顺序消费数据,试想如何让一个Context在流中传递呢?
- 倒序订阅:假设在流的中间某个Mono创建Context,可通过subscribe方法层层往上传递Context;
- 顺序消费:在创建Context的Mono之前的Mono都可以使用到这个Context,而在这个Mono之后的Mono就获取不到该Context;
我们画个图来理解:
一个使用Context的简单案例:
public class ContextUseMain{
private static void testMono() {
// MonoMap
Mono mono = Mono.just(1)
.subscriberContext(context -> {
// 可以获取到xxxx
System.out.println(context.get("xxxx").toString());
return context;
})
.map(x -> x * x);
// MonoSubscriberContext
mono = mono.subscriberContext(context -> context.put("xxxx", System.currentTimeMillis()));
// MonoMap
mono = mono.map(x -> (x + 1) * 2)
.subscriberContext(context -> {
// 这里会报空指针
System.out.println(context.get("xxxx").toString());
return context;
});
// 开始订阅
mono.subscribe(System.out::println);
}
}
复制代码
mono.subscriberContext()为什么能够获取到Context,首先subscriberContext返回的是一个MonoSubscriberContext:
public abstract class Mono implements Publisher {
public final Mono subscriberContext(Function doOnContext) {
return new MonoSubscriberContext<>(this, doOnContext);
}
}
复制代码
MonoSubscriberContext类继承MonoOperator,源码如下:
final class MonoSubscriberContext extends MonoOperator implements Fuseable {
final Function doOnContext;
MonoSubscriberContext(Mono extends t> source,
Function doOnContext) {
super(source);
this.doOnContext = Objects.requireNonNull(doOnContext, "doOnContext");
}
@Override
public void subscribe(CoreSubscriber super t> actual) {
// (1) 获取前一个订阅者的Context
Context c = actual.currentContext();
try {
// (2) 回调方法,可往Context put key-value,返回新的Context
c = doOnContext.apply(c);
}
catch (Throwable t) {
Operators.error(actual, Operators.onOperatorError(t, actual.currentContext()));
return;
}
// (3) ContextStartSubscriber即是Subscription,也是CoreSubscriber
Subscription subscription = new FluxContextStart.ContextStartSubscriber<>(actual, c);
// 调用前一个Mono的subscribe方法
source.subscribe(subscription);
}
}
复制代码
- (1):调用订阅者的currentContext获取订阅者的Context,如果不存在则会返回Context.empty()(一个空的Context);
CoreSubscriber提供获取Context的API:
public interface CoreSubscriber extends Subscriber {
default Context currentContext(){
return Context.empty();
}
}
复制代码
- (2):调用Function获取新的Context,如果往currentContext获取的Context put一个key-value,那么就会创建新的Context;
- (3):FluxContextStart.ContextStartSubscriber即是Subscription也是Subscriber。在构造方法中传递了Context,所以,如果在FluxContextStart.ContextStartSubscriber中重写currentContext方法,就能获取到Context;
FluxContextStart.ContextStartSubscriber源码如下(有修剪):
static final class ContextStartSubscriber implements ConditionalSubscriber, InnerOperator, QueueSubscription {
final CoreSubscriber super t> actual;
final Context context;
ContextStartSubscriber(CoreSubscriber super t> actual, Context context) {
this.actual = actual;
this.context = context;
}
@Override
public Context currentContext() {
return this.context;
}
}
复制代码
Context.empty()创建的是Context0:
public interface Context {
static Context empty() {
return Context0.INSTANCE;
}
}
复制代码
再来看Context0的put方法:
final class Context0 implements Context {
@Override
public Context put(Object key, Object value) {
return new Context1(key, value);
}
}
复制代码
可见,调用Context0#put创建的是Context1。
Context1源码如下:
class Context1 implements Context, Map.Entry
如果继续调用Context1#put创建的是Context2。
Context2源码如下:
final class Context2 implements Context {
final Object key1;
final Object value1;
final Object key2;
final Object value2;
Context2(Object key1, Object value1, Object key2, Object value2) {
this.key1 = key1;
this.value1 = value1;
this.key2 = key2;
this.value2 = value2;
}
}
复制代码
再继续调用Context2#put创建的是Context3,往下亦如此。
每次put返回一个新的Context目的是避免多线程加锁,同时也能实现数据隔离:后续操作(Mono)不能获取之前操作(Mono)put的数据。
委托模式的使用与BaseSubscriber
在介绍BaseSubscriber之前,我们先学习一个新的API:transform。Mono/Flux的transform方法允许将一个Mono/Flux转为一个MonoOperator/FluxOperator,将订阅委托给该MonoOperator/FluxOperator;
看个简单的案例:
public class BaseSubscriberUseMain{
public static void main(String[] args){
Mono> mono = createMono();
// transform 将原mono转为新的mono
mono = mono.transform((Function<Mono>, Publisher>>) m -> new MonoOperator(m) {
@Override
public void subscribe(CoreSubscriber actual) {
source.subscribe(actual);
}
});
mono.subscribe();
}
}
复制代码
这样我们就可以将MonoOperator的subscribe方法参数传递的订阅者替换为我们自己实现的订阅者,修改后的案例如下:
public class BaseSubscriberUseMain{
public static void main(String[] args){
Mono> mono = createMono();
// transform 将原mono转为新的mono
mono = mono.transform((Function<Mono>, Publisher>>) m -> new MonoOperator(m) {
@Override
public void subscribe(CoreSubscriber actual) {
source.subscribe(new ActualSubscriberDelegater(actual));
}
});
mono.subscribe();
}
}
复制代码
ActualSubscriberDelegater继承BaseSubscriber,实现hook方法:
public class ActualSubscriberDelegater extends BaseSubscriber {
/**
* 真实的订阅者
*/
private CoreSubscriber super t> actual;
public ActualSubscriberDelegater(CoreSubscriber super t> actual) {
super();
this.actual = actual;
}
@Override
protected void hookOnSubscribe(Subscription subscription) {
actual.onSubscribe(subscription);
}
@Override
protected void hookOnNext(T value) {
// 获取订阅者的Context
Long time = (Long) actual.currentContext().get("xxxx");
actual.onNext(value);
}
@Override
protected void hookOnComplete() {
actual.onComplete();
}
@Override
protected void hookOnError(Throwable throwable) {
actual.onError(throwable);
}
}
复制代码
看起来ActualSubscriberDelegater代理了真实订阅者的所有行为,但这不是代理模式,而是委托模式。
- hookOnSubscribe:该方法在父类的onSubscribe方法被调用时调用;
- hookOnNext:该方法在父类的onNext方法被调用时调用;
- hookOnComplete:该方法在父类的onComplete方法被调用时调用;
- hookOnError:该方法在父类的onError方法被调用时调用;
这样一来,ActualSubscriberDelegater就可用来实现调试、打印日记等操作。
Alibaba Sentinel也是通过transform API与实现BaseSubscriber组合使用适配反应式Reactor库的,我们也可以使用此统计一个接口的执行耗时。
为spring-data-r2dbc实现多数据源动态切换
hotkit-r2dbc是笔者个人的开源项目,封装spring-data-r2dbc多数据源动态切换的实现。
- Github链接:https://github.com/wujiuye/hotkit-r2dbc
在了解Reactor的Context之后,再来看笔者是如何为hotkit-r2dbc实现多数据源动态切换就会觉得很简单,看完之后,你不需要使用hotkit-r2dbc,也能自己为spring-data-r2dbc实现动态数据源切换。
spring-data-r2dbc提供连接工厂路由类:
AbstractRoutingConnectionFactory,我们只需要继承
AbstractRoutingConnectionFactory并实现它的determineCurrentLookupKey方法,在该方法被调用时返回正确的数据源key即可。
/**
* ConnectionFactory路由
*
* @author wujiuye 2020/11/03
*/
public class HotkitR2dbcRoutingConnectionFactory extends AbstractRoutingConnectionFactory {
private final static String DB_KEY = "HOTKIT-R2DBC-DB";
public HotkitR2dbcRoutingConnectionFactory(Map connectionFactoryMap) {
// ....
setTargetConnectionFactories(connectionFactoryMap);
//....
}
@Override
protected Mono<Object> determineCurrentLookupKey() {
return Mono.subscriberContext().handle((context, sink) -> {
if (context.hasKey(DB_KEY)) {
sink.next(context.get(DB_KEY));
}
});
}
}
复制代码
当然,这样还不行,还需要提供一个方法,让切面可以获取到Context并写入数据源,所以完整的连接工厂路由器应该是这样的:
/**
* ConnectionFactory路由
*
* @author wujiuye 2020/11/03
*/
public class HotkitR2dbcRoutingConnectionFactory extends AbstractRoutingConnectionFactory {
private final static String DB_KEY = "HOTKIT-R2DBC-DB";
public HotkitR2dbcRoutingConnectionFactory(Map connectionFactoryMap) {
// ....
setTargetConnectionFactories(connectionFactoryMap);
//....
}
// 写入数据源
public static Mono putDataSource(Mono mono, String dataSource) {
return mono.subscriberContext(context -> context.put(DB_KEY, dataSource));
}
// 写入数据源
public static Flux putDataSource(Flux flux, String dataSource) {
return flux.subscriberContext(context -> context.put(DB_KEY, dataSource));
}
@Override
protected Mono<Object> determineCurrentLookupKey() {
return Mono.subscriberContext().handle((context, sink) -> {
if (context.hasKey(DB_KEY)) {
sink.next(context.get(DB_KEY));
}
});
}
}
复制代码
切面类的实现如下:
@Component
@Aspect
@Order(Ordered.HIGHEST_PRECEDENCE)
public class DynamicDataSourceAop {
@Pointcut(value = "@annotation(com.wujiuye.hotkit.r2dbc.annotation.R2dbcDataBase)")
public void point() {
}
@Around(value = "point()")
public Object aroudAop(ProceedingJoinPoint pjp) throws Throwable {
MethodSignature signature = (MethodSignature) pjp.getSignature();
Method method = signature.getMethod();
R2dbcDataBase dataSource = method.getAnnotation(R2dbcDataBase.class);
// 方法返回值类型为Mono
if (method.getReturnType() == Mono.class) {
return HotkitR2dbcRoutingConnectionFactory.putDataSource((Mono>) pjp.proceed(), dataSource.value());
}
// 方法返回值类型为Flux
else {
return HotkitR2dbcRoutingConnectionFactory.putDataSource((Flux>) pjp.proceed(), dataSource.value());
}
}
}
复制代码
切面在目标方法执行完成返回Mono或者Flux之后,才调用返回值Mono或Flux的subscriberContext方法将返回值Mono转为一个MonoSubscriberContext,或者将返回值Flux转为一个FluxContextStart。
以返回值类型为Mono为例,在MonoSubscriberContext#subscribe方法的回调Function中为Context写入数据源,在Mono最终被订阅时,由MonoSubscriberContext将Context传递给订阅者。
注意数据源切面与事务切面的顺序问题,避免事务不生效。
用一个测试用例说明数据源切换的时机:
public class RoutingTest extends SupporSpringBootTest {
@Resource
private DatabaseClient client;
@Test
public void test() throws InterruptedException {
Mono operation = client.execute("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bind("id", "joe")
.bind("name", "Joe")
.bind("age", 34)
.fetch()
.rowsUpdated()
.then();
// 切换数据源
Mono dbOperation = HotkitR2dbcRoutingConnectionFactory.putDataSource(operation,MasterSlaveMode.Slave);
dbOperation.subscribe();
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
}
}
复制代码
其中发布者operation在真正执行Sql之前肯定是先要从连接工厂路由器获取连接工厂的发布者,也就是连接工厂路由器的determineCurrentLookupKey被调用。
因此,在operation转为dbOperation之后,订阅dbOperation时为dbOperation的订阅者写入Context并put数据源,Context在流中传递,最终operation的订阅者就能调用determineCurrentLookupKey方法从订阅者的Context获取到数据源,这就是多数据源切换的实现原理。
End
本篇只介绍Mono的同步订阅流程,这是因为同步订阅更易于理解,Mono相比Flux也更容易介绍。
笔者建议:在了解本篇介绍的知识点之后,可阅读Flux的源码,或者实现异步订阅的源码,加深对反应式编程库Reactor库的理解。
作者:Java艺术
链接:
https://juejin.cn/post/6897052619488329741
来源:掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。