使用责任链和SPI构建弹性的业务架构

前言

如何构造更加灵活的,具有一定弹性扩展能力的架构呢。在本文中我们会结合具体的业务场景来聊一聊这个话题。

正文

我们先来说一下场景:我手头负责的是支付服务。支付服务需要为外部调用者提供各种支付能力接口,比如:生成支付参数的接口、通知支付结果的接口、处理退款的接口,等等。同时,支付服务需要在内部封装对于各类支付平台以及其中各类的支付能力接口。简单点说,支付服务就是不同支付平台支付能力对于业务端服务的统一门面。

那么我们怎么样才能实现这种:对外是统一的门面,对内是不同的实现,这样的效果呢。我们先不直接回答这个问题,而是先回归到具体的业务需求的场景上来。

请看需求:

需求一. 提供一个“支付参数构造接口”,这个接口的作用是:业务调用端给一个请求,请求中里面包含了价格、IP、和一个“支付账号”,我们的接口接到请求后,先要记录一条“支付流水”,然后再根据请求参数中的支付账号路由到对应的支付平台对接逻辑,生成前端需要的,用于生成最终用户进行支付UI的参数(比如用于唤起支付宝支付页面的参数或生成微信支付二维码页面的参数)然后将这些参数封装为响应返给业务调用端。同时,这个需求中实际上还包含了隐含的需求:接口后续还会有若干其他的附加功能逻辑被添加到这个处理过程上,比如针对单个用户的支付参数生成频率的限制等,这些附加处理流程无法确定有多少。

需求二. 提供一个“支付结果通知接口”,这个接口的作用是:当用户在前端完成了付款后,支付平台会给我们发送支付已成功的通知,我们接到这个通知后,需要进行签名验证,然后更新“支付流水状态”以及通知业务方支付已成功。同时,这个需求里其实也是包含了一个隐含的需求:不确定会支持多少个支付平台,每新接入一个支付平台都应该能够支持这个业务流程。

我们可以看到,这两个需求中对于隐含需求的描述:“这些附加处理流程无法确定有多少” 以及 “不确定会支持多少个支付平台”,这其实是提示我们,需要实现一种 “弹性” 的架构方式。

那么,怎么理解 “弹性” 呢?回到需求上来看,对于第一个需求,我们可以先来想象一下,如果我们一开始就把逻辑写死,也就是整个业务逻辑流程从开始到结束是一旦写好不能轻易修改的,那么,当后期需要增加新的业务逻辑流程进来时,我们不得不重新设计一遍业务流程,这想必是很麻烦的一件事。但是,如果我们在设计业务逻辑时,预先为将来有可能加入的流程预留一个接入点,或者说是扩展点,那么将来我们每加一个新逻辑只需要实现这个扩展点就好了,实现完了这个流程自动的会被加入到原有流程当中去,不用再把原有流程重写一遍,这样就可以理解为是一种 “弹性” (实际上可以类比于插件机制,可以想想平常咱们用的IDE的插件体系,开发一个新功能只要实现一个插件就好了,而不是把整个IDE重写一遍);对于第二个需求,我们也可以想象一下,如果我们一开始把逻辑写死了只能通过内置类似于switch case这样的判断逻辑来决定分发到哪一种支付平台的逻辑上,那么我们怎么直到未来会接什么样的支付平台呢,根本无法知道,因此根本没法预先确定分支逻辑。那么我们就需要某一种架构方式,试其能够在不确定有多少分支的前提下也能做到逻辑分发,这种架构也可称之为是一种 “弹性” 的架构。

那么,如何实现 “弹性” ?,对于第一个需求我们可以用设计模式中的 “责任链模式” 来解决。而对于第二个需求,我们可以用一种近似于 “SPI” 的模式来实现。具体的,我们回归到代码来看吧。

先针对第一个需求,我们来看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 这是面向调用者的唯一接口
@ServiceContract
public interface PaymentParamBuildService {

/**
* 支付参数构建
*
* @param paymentParamBuildRequest 支付参数构建请求
* @return 支付参数构建响应
*/
@OperationContract
PaymentParamBuildResponse execute(PaymentParamBuildRequest paymentParamBuildRequest) throws Throwable;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class PaymentParamBuildServiceImpl implements PaymentParamBuildService, ApplicationContextAware {

...

@Override
public PaymentParamBuildResponse execute(PaymentParamBuildRequest paymentParamBuildRequest) throws Throwable {
LOGGER.info(MARKER, "支付参数生成流程开始");

// 这里就是我们的整体业务逻辑的流程,可以看到,业务逻辑是被分发给 HandlerPipeline 这个组件去处理了
PaymentParamBuildResponse paymentParamBuildResponse = new PaymentParamBuildResponse();
HandlerPipeline handlerPipeline = applicationContext.getBean(DefaultHandlerPipeline.class, paymentParamBuildRequest, paymentParamBuildResponse);
handlerPipeline.fireExecute(); // 业务逻辑是被分发给 HandlerPipeline

LOGGER.info(MARKER, "支付参数生成流程结束");
return paymentParamBuildResponse;
}

...

}

从上边这段代码我们可以看到,PaymentParamBuildService#execute 的实现逻辑是:构造好一个空的响应对象,然后连同通过参数传递进来的请求对象一起,作为参数来构造了一个 HandlerPipeline 并且执行了其 fireExecute,那么,这个 HandlerPipeline 是个什么呢?实际上在这个 HandlerPipeline 背后是一系列关联的执行体系,这些执行体系构造成了 责任链。我们先把这个执行体系里的关键组件的代码放出来,然后我们再来仔细说明。

1
2
3
4
5
6
7
// 这个是上边提到的 HandlerPipeline
public interface HandlerPipeline<RQ, RS> {

void fireExecute() throws Throwable;

void addLast(Handler<RQ, RS> handler);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
// 这个是 HandlerPipeline 的默认实现
@Scope(SCOPE_PROTOTYPE)
@Component
public class DefaultHandlerPipeline<RQ, RS> implements HandlerPipeline<RQ, RS>, ApplicationContextAware {

private ApplicationContext applicationContext;

// 头 HandlerContext
private HandlerContext<RQ, RS> first;

// 尾 HandlerContext
private HandlerContext<RQ, RS> last;

private RQ request;

private RS response;

public HandlerContext<RQ, RS> getFirst() {
return first;
}

public HandlerContext<RQ, RS> getLast() {
return last;
}

// 在构造时传入请求对象和响应对象
public DefaultHandlerPipeline(RQ request, RS response) {
this.request = request;
this.response = response;
}

// 初始化头 HandlerContext 和尾 HandlerContext 并将他们两个链在一起
@PostConstruct
public void init() { // ①
first = new FirstContext<>();
last = new LastContext<>();
first.setHandler(new DefaultHandler<>()); // 头 HandlerContext 中的 Handler 使用缺省 Handler
last.setHandler(new DefaultHandler<>()); // 尾 HandlerContext 中的 Handler 使用缺省 Handler
first.setNext(last);
last.setPrev(first);
}

// 执行头 HandlerContext 的 fireExecute 传入请求对象和响应对象作为参数
@Override
public void fireExecute() throws Throwable {
first.fireExecute(request, response); // ④
}

// 传入一个 Handler 将其构造为一个新的 HandlerContext 并把这个新的 HandlerContext 挂到当前 HandlerContext 链的倒数第二的位置上,也就是 last 之前
@SuppressWarnings("unchecked")
@Override
public void addLast(Handler<RQ, RS> handler) {
HandlerContext handlerContextBean = (HandlerContext) applicationContext.getBean("defaultHandlerContext");
handlerContextBean.setHandler(handler); // ③

last.getPrev().setNext(handlerContextBean);
handlerContextBean.setPrev(last.getPrev());
handlerContextBean.setNext(last);
last.setPrev(handlerContextBean);
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}

// 头 HandlerContext 是一个特殊的类型,之所以要特化是考虑有可能会在头部需要特殊操作
private static class FirstContext<RQ, RS> extends DefaultHandlerContext<RQ, RS> {

}

// 尾 HandlerContext 是一个特殊的类型,之所以要特化是考虑有可能会在尾部需要特殊操作
private static class LastContext<RQ, RS> extends DefaultHandlerContext<RQ, RS> {

}

// 缺省 Handler
private static class DefaultHandler<RQ, RS> extends HandlerAdapter<RQ, RS> {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultHandler.class);

@Override
protected boolean support(RQ request, RS response) {
return true;
}

@Override
protected void doExecute(RQ request, RS response) {
LOGGER.info("request {} response {}", JSON.toJSONString(request), JSON.toJSONString(response));
}

@Override
public int getOrder() {
return HIGHEST_PRECEDENCE;
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Component
public class HandlerBeanProcessor implements BeanPostProcessor, ApplicationContextAware {

private ApplicationContext applicationContext;

@SuppressWarnings("unchecked")
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { // ②
if (bean instanceof HandlerPipeline) {
applicationContext
.getBeansOfType(Handler.class) // 将所有 Handler 类型的 Bean 找出来
.entrySet()
.stream()
.sorted(Comparator.comparingInt(item -> item.getValue().getOrder())) // 按照 Handler 的排序标识进行排序
.forEach(entry -> ((HandlerPipeline) bean).addLast(entry.getValue())); // 依次挂载到链的倒数第二个位置
}
return bean;
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
1
2
3
4
5
6
7
8
9
// Handler 就是我们需要实现的 “扩展点”
public interface Handler<RQ, RS> extends Ordered {

// 接收请求对象和响应对象并作业务逻辑,业务逻辑可以看作是拿到请求,经过某些处理后填充响应对象
void execute(HandlerContext<RQ, RS> context, RQ request, RS response) throws Throwable;

// 处理业务逻辑中的异常
void exceptionCaught(HandlerContext<RQ, RS> context, Throwable throwable);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 业务中的 “扩展点” 的实现的一个例子,验证单个用户请求是否超频率
@Scope(SCOPE_PROTOTYPE)
@Component
public class PaymentParamBuildRequestCountLimitCheckHandler implements Handler<PaymentParamBuildRequest, PaymentParamBuildResponse> {

...

@Override
public int getOrder() {
return -1;
}

@Override
public void execute(HandlerContext<PaymentParamBuildRequest, PaymentParamBuildResponse> context, PaymentParamBuildRequest request, PaymentParamBuildResponse response) throws Throwable { // ⑥
// 校验频度是否超出阈值的逻辑

...

if(未超阈值) {
// 未超阈值则让下一个“链节”去执行,也就是继续后续业务逻辑
context.fireExecute(request, response); // ⑦
} else {
// 超过阈值则不让下一个“链节”执行,也就是之后的业务逻辑就不会再执行了,整个业务逻辑终结于此
}
}

@Override
public void exceptionCaught(HandlerContext<PaymentParamBuildRequest, PaymentParamBuildResponse> context, Throwable throwable) {

// 处理在频度校验中出现的异常
...

}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 这个是责任“链”上的一个“链节”,每个链节与其前驱和后继共同作用构成了“链”
public interface HandlerContext<RQ, RS> {

// 前驱
HandlerContext<RQ, RS> getPrev();

// 后继
HandlerContext<RQ, RS> getNext();

void setPrev(HandlerContext<RQ, RS> prev);

void setNext(HandlerContext<RQ, RS> next);

Handler<RQ, RS> getHandler();

// 设置这一链节上挂载的 Handler(扩展点)
void setHandler(Handler<RQ, RS> handler);

void fireExecute(RQ request, RS response) throws Throwable;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// “链节”的默认实现
@Scope(SCOPE_PROTOTYPE)
@Component
public class DefaultHandlerContext<RQ, RS> implements HandlerContext<RQ, RS> {

private HandlerContext<RQ, RS> prev;

private HandlerContext<RQ, RS> next;

private Handler<RQ, RS> handler;

@Override
public HandlerContext<RQ, RS> getPrev() {
return prev;
}

@Override
public HandlerContext<RQ, RS> getNext() {
return next;
}

@Override
public void setPrev(HandlerContext<RQ, RS> prev) {
this.prev = prev;
}

@Override
public void setNext(HandlerContext<RQ, RS> next) {
this.next = next;
}

@Override
public Handler<RQ, RS> getHandler() {
return handler;
}

@Override
public void setHandler(Handler<RQ, RS> handler) {
this.handler = handler;
}

// 如果有后继,拿到后继上挂载的 Handler 并执行
@Override
public void fireExecute(RQ request, RS response) {
HandlerContext<RQ, RS> _next = getNext();
if (_next != null) {
try {
_next.getHandler().execute(_next, request, response); // ⑤
} catch (Throwable throwable) {
_next.getHandler().exceptionCaught(_next, throwable);
}
}
}
}

以上是 责任链 的关键组件。那我们怎么理解这些组件呢?他们都是做什么用的?首先,我们先来说一下什么是责任链。我不想给你讲严格的定义,而是用一个例子来类比一下。可以在头脑中想想这样一个场景:一个车间中有若干工作站、一名车间主任和与车间数量相同的若干工人。车间主任按事先确定好的生产工序给若干工作站排序,具体做法是每个工作站会有个小本子记录下一个工作站是哪个,同时会给每个工作站分派一名工人。这个车间会加工某种工件,工件会先给到第一个工作站,第一个工作站中的工人拿到工件后进行加工,加工好后看看小本子,找到下一个(第二个)工作站,然后把工件传给第二个工作站的工人,第二个工作站工人拿到工件,加工好后再看看小本找到第三个工作站,把工件传给第三个工作站的工人,依次类推,直至最后一个工作站的工人处理好工件,整个工序结束。另外,生产规定中规定了某个工作站的工人有权决定不把工件传给下一个工作站的工人,这样,工序就就在当前工作站立刻结束。这个场景中每个工人加工工件可以算是一种 责任,而每个工作站有个小本记录谁是下一个工作站,这实际上构成了 ,这样就组成了 责任链(当然,这个描述不严谨,更多严谨地描述请参考维基百科等资料来源中的介绍)。

大致明白了什么是 责任链 之后,我们再回过头来看这几个组件:

  • HandlerPipeline 相当于车间主任
  • HandlerContext 相当于工作站
  • Handler 相当于被分派到工作站中的一个工人
  • RQ request, RS response 相当于待加工的工件

代码 ① 和 ② 处:HandlerPipeline(车间主任) 将 Handler(工人) 附加到 HandlerContext(分派到某一车间,对应代码 ③ 处),并按照 HandlergetOrder() 排序(按确定好的工序),串到一起形成链表(形成责任链

代码 ④ 处:first.fireExecute(request, response); 执行第一个 HandlerContextfireExecute(车间拿到工件先会到头一个工作站加工),代码 ⑤ 处获得当前 HandlerContext 的后继(查看小本本看看下一个工作站是哪个),并拿到后继中挂载的 Handler(然后把工件传给下一个工作站的工人) 执行其 execute(工人加工工件,也就是执行业务逻辑,对应代码 ⑥ 处)

代码 ⑦ 处:根据某一条件决定还调不调用下一个 HandlerContextfireExecute,也就是工人有权决定是否把工件传递给下个工作站的工人,如果选择传递,则业务逻辑继续,若不传递,业务逻辑就此终止。在我们的需求一中,此处的逻辑就是如果单用户访问未超阈值则继续后续业务逻辑,若超了,则阻断后续业务逻辑。(当然这个场景完全也可用 AOP 来做,但这不是本文的关注点,在此不讨论了)

1
2
3
4
5
6
7
8
...
if(未超阈值) {
// 未超阈值则让下一个“链节”去执行,也就是继续后续业务逻辑
context.fireExecute(request, response); // ⑦
} else {
// 超过阈值则不让下一个“链节”执行,也就是之后的业务逻辑就不会再执行了,整个业务逻辑终结于此
}
...

到此,第一个弹性架构的实现解法:“责任链” 就解析完成了。我们来总结一下,实际上就是将业务流程的起始点和终止点确定好,中间用链串起扩展点,在扩展点中挂载业务逻辑,需要新增业务逻辑时,不用整个推翻重写,而是再多实现一个扩展点就好了,这样就实现了弹性。

我们再来看第二个需求对应的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 面向调用者的接口
@ServiceContract
public interface PaymentNotificationService {

/**
* 执行支付后通知处理
*
* @param notificationRequest 支付后通知请求
* @return 支付后通知响应
*/
@OperationContract
PaymentNotificationResponse execute(PaymentNotificationRequest notificationRequest) throws Throwable;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
// 接口的实现
@Service
public class PaymentNotificationServiceImpl implements PaymentNotificationService {

private List<Handler> handlers;

@Autowired
public void setHandlers(List<Handler> handlers) { // ④
this.handlers = handlers;
}

@Override
public PaymentNotificationResponse execute(PaymentNotificationRequest notificationRequest) {
LOGGER.info(MARKER, "支付结果通知流程开始");

PaymentNotificationResponse notificationResponse = new PaymentNotificationResponse();
handlers
.stream()
.filter(handler -> handler.support(notificationRequest)) // ⑤
.forEach(handler -> handler.execute(notificationRequest, notificationResponse)); // ⑥

LOGGER.info(MARKER, "支付结果通知流程结束");
return notificationResponse;
}

public interface Handler { // ①

boolean support(PaymentNotificationRequest notificationRequest); // ②

void execute(PaymentNotificationRequest notificationRequest, PaymentNotificationResponse notificationResponse); // ③
}

@Component
public static class WeChatPayHandler implements Handler {

...

@Override
public boolean support(PaymentNotificationRequest notificationRequest) {
// 依据 notification 检查是否为微信支付结果通知流程
...
}

@Override
public void execute(PaymentNotificationRequest notificationRequest, PaymentNotificationResponse notificationResponse) {
// 执行微信支付结果通知流程
...
}

...
}

@Component
public static class AlipayHandler implements Handler {

...

@Override
public boolean support(PaymentNotificationRequest notificationRequest) {
// 依据 notification 检查是否为支付宝结果通知流程
...
}

@Override
public void execute(PaymentNotificationRequest notificationRequest, PaymentNotificationResponse notificationResponse) {
// 执行支付宝结果通知流程
...
}

...

}
}

以上是第二个业务逻辑的代码,在这里,我们使用了类似于SPI的模式,再说的具体一点,是一种类似于 spring-mvc 中的 HandlerAdapterSPI(关于这一点可参看我得另一篇文章 spring实现根据controller中接收请求参数不同走不同service),那么SPI模式怎么做到弹性的呢?我们再来想象一个场景:一个车间有一条传送带,传送带旁依次站立若干名工人,每一名工人只能加工一种工件。各种工件从传送带上依次经过每个工人,每个工人拿起工件后,先看一看这个是不是自己能处理的工件类型,如果是,则进行加工,加工好后得到成品然后拿出车间,如果不是,则原样放回传送带,让下一个工人重复同样的步骤。

回到代码上来:

  • Handler 相当于工人
  • PaymentNotificationRequest 相当于某种工件

代码 ① 处 Handler 是对工人的抽象,代码 ② 处 boolean support(PaymentNotificationRequest notificationRequest) 是对应于“判断工件是不是自己能加工的类型”,代码 ③ 处void execute(PaymentNotificationRequest notificationRequest, PaymentNotificationResponse notificationResponse); 对应于“加工工件”

代码 ④ 处这个 List 方式的注入,实际上是构造了一条“传送带”,代码 ⑤ 处 .filter(handler -> handler.support(notificationRequest)) 对应于工人从传送带上拿下来工件并判断是不是自己能加工的类型

代码 ⑥ 处 handler -> handler.execute(notificationRequest, notificationResponse) 对应了工人加工工件

可以再来看一下这段代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Component
public static class WeChatPayHandler implements Handler {

...

@Override
public boolean support(PaymentNotificationRequest notificationRequest) {
// 依据 notification 检查是否为微信支付结果通知流程
...
}

@Override
public void execute(PaymentNotificationRequest notificationRequest, PaymentNotificationResponse notificationResponse) {
// 执行微信支付结果通知流程
...
}

...
}

@Component
public static class AlipayHandler implements Handler {

...

@Override
public boolean support(PaymentNotificationRequest notificationRequest) {
// 依据 notification 检查是否为支付宝结果通知流程
...
}

@Override
public void execute(PaymentNotificationRequest notificationRequest, PaymentNotificationResponse notificationResponse) {
// 执行支付宝结果通知流程
...
}

...

}

这里创建了两个 Handler, 一个用于支持微信的通知逻辑,另一个用于支持支付宝的通知逻辑,他们都是依据 notification 检查是否是自己能处理的,能的话才实际进行处理。所有 Handler 都会被放到 List 中依次触发 support 因此,将来再多一个阿猫阿狗支付平台的逻辑,再实现一个 阿猫啊狗Handler 就好了,新实现的 Handler 同样也会进入 List 被访问到 support,流程依旧。这样,我们不需要提前知道会有什么样的逻辑要加进来,只要来一个,实现一个就好了。这样就实现了架构的弹性。

到这里,第二种实现架构弹性的方式也介绍完了,再来总结一下,实际上就是将业务流程的起始点和终止点确定好,中间用 List 串起扩展点,在扩展点中挂载业务逻辑,需要新增业务逻辑时,不用整个推翻重写,而是再多实现一个扩展点就好了。

到此,两种实现架构弹性的方式就都介绍完了,再来多说一些,这些设计思路基本上可以说是从中间件的设计思路中来的,比如第一种的责任链的实现来源于netty,而第二种的SPI的方式借鉴自spring-mvc,这些优秀的中间件的设计思想是值得学习的。

参考资料

Spring中如何使用责任链模式