Spring 框架事件收发功能使用(二)事务事件监听 @TransactionalEventListener

it2022-05-05  184

Spring 框架事件收发功能使用(二)事务事件监听

1. 概述

系列博客(一)中,讲述了如何使用 Spring 框架来实现事件收发机制,包含了实现接口和注解两种实现方式,以及包括了同步和异步两种运行方式。 在接下来的内容里,我们会介绍如何将事件的监听与事务进行关联。

2. 基于事务的事件收发的使用

首先,我们还是依照惯例, 从 Spring 的官方文档来看看怎么使用它。

在 Spring 4.2 开始,事件的监听器可以被绑定到事务的阶段上。 典型的示例就是它可以处理当事务成功完成后的事件。 我们可以使事件更加灵活的使用在关心事务结果的监听器上。

我们可以使用 @EventListener 注解注册一个常规的事件监听器。 当你需要将它绑定到事务上,则需要使用 @TransactionalEventListener。 这个监听器默认会被绑定到事务的提交阶段。

接下的示例会展示这个概念。假设一个组件 publish 了一个 订单创建 事件,并且我们想定义一个监听器,它只处理事务提交成功后发出的事件。下面便是如何实现这个事件监听器的代码例子:

@Component public class MyComponent { @TransactionalEventListener public void handleOrderCreatedEvent(CreationEvent<Order> creationEvent) { //do something } }

@TransactionalEventListener 注解 暴露了一个事务阶段的属性,用来指定监控器绑定的事务阶段。可以使用的阶段有这4个 BEFORE_COMMIT, AFTER_COMMIT (default), AFTER_ROLLBACK, AFTER_COMPLETION ,包含了事务的完成和回滚。 如果没有事务在运行,那这个监听器不会被触发,直到我们提供了事务。不过我们也可以通过改变 fallbackExecution 的设置来使监听器在没有事务的情况下仍然能够被触发。

3. @TransactionEventListener 的实现原理

Talking is cheap, show the Spring framework source code.

//EventListenerMethodProcessor.processBean(148); /* * 由于@TransactionalEventListener 继承了 @EventListener , * 所以,通过 findMergedAnnotation 同样能够获取其所注解的方法 */ annotatedMethods = MethodIntrospector.selectMethods(targetType, (MethodIntrospector.MetadataLookup<EventListener>) method -> AnnotatedElementUtils.findMergedAnnotation(method, EventListener.class)); //EventListenerMethodProcessor.processBean(176); /* * @TranscationalEventListener 由 ApplicationListenerMethodTransactionalAdapter 所负责实现功能 * 通过解读他的 Adapter 便能够了解其功能是如何实现的 */ if (applicationListener instanceof ApplicationListenerMethodAdapter) { ((ApplicationListenerMethodAdapter) applicationListener).init(context, this.evaluator); } class ApplicationListenerMethodTransactionalAdapter extends ApplicationListenerMethodAdapter { private final TransactionalEventListener annotation; /* * 根据对应的 beanName Class 以及方法名,初始化 Adapter */ public ApplicationListenerMethodTransactionalAdapter(String beanName, Class<?> targetClass, Method method) { super(beanName, targetClass, method); TransactionalEventListener ann = AnnotatedElementUtils.findMergedAnnotation(method, TransactionalEventListener.class); if (ann == null) { throw new IllegalStateException("No TransactionalEventListener annotation found on method: " + method); } this.annotation = ann; } @Override public void onApplicationEvent(ApplicationEvent event) { if (TransactionSynchronizationManager.isSynchronizationActive()) {//判断是否开启了事务 TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event);//创建一个新的事务同步器 TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);//将刚刚创建的事务同步器,注册到这个线程的事务同步器列表中 } else if (this.annotation.fallbackExecution()) {//如果当前线程内没有开启事务,并且注解的 fallbackExecution 属性设置为 true if (this.annotation.phase() == TransactionPhase.AFTER_ROLLBACK && logger.isWarnEnabled()) { logger.warn("Processing " + event + " as a fallback execution on AFTER_ROLLBACK phase");//在符合 if 条件后输出警告 } processEvent(event);//处理事件 } else { // No transactional event execution at all if (logger.isDebugEnabled()) { logger.debug("No transaction is active - skipping " + event);//本线程没有开启事务,fallbackExecution 又设置为 false } } } private TransactionSynchronization createTransactionSynchronization(ApplicationEvent event) { return new TransactionSynchronizationEventAdapter(this, event, this.annotation.phase()); } /* * 这个内部类实现了事件监听器能根据 4 个不同的事务阶段进行处理事件的功能 */ private static class TransactionSynchronizationEventAdapter extends TransactionSynchronizationAdapter { private final ApplicationListenerMethodAdapter listener; private final ApplicationEvent event; private final TransactionPhase phase; public TransactionSynchronizationEventAdapter(ApplicationListenerMethodAdapter listener, ApplicationEvent event, TransactionPhase phase) { this.listener = listener; this.event = event; this.phase = phase; } @Override public int getOrder() { return this.listener.getOrder(); } // 如果是 beforeCommit 的这个阶段,需要单独判断 @Override public void beforeCommit(boolean readOnly) { if (this.phase == TransactionPhase.BEFORE_COMMIT) { processEvent(); } } // 其余 3 个事务阶段在事务提交后进行判断和处理 @Override public void afterCompletion(int status) { if (this.phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) { processEvent(); } else if (this.phase == TransactionPhase.AFTER_ROLLBACK && status == STATUS_ROLLED_BACK) { processEvent(); } else if (this.phase == TransactionPhase.AFTER_COMPLETION) { processEvent(); } } protected void processEvent() { this.listener.processEvent(this.event); } } }

4. @TransactionEventListener的异步陷阱

目前为止,我们看到的都是强大的 Spring 框架所提供的事务事件监听器的美好,不过任何美好都需要我们小心的维护。 接下来,我们就看看 @TransactionalEventListener 的美好背后隐藏着什么需要注意的。

如果,看过第一部分内容的读者应该还记得,我们将 @EventListner 变为异步会有两种方式。 一种是使用 @Async 注解,并且 EnableAsync;另一种是通过给我们的 Adatper 配置需要的线程池。 如果使用第一种方式来让我们的 @TransactionalEventListener 不会有问题。 但是,使用第二种的方式会导致我们的监听器找不到事务而失败。 那我们依然还是遵循从功能中来到源码里去的思想,到源码中找找问题出在哪里。

//我们再回到所有 ApplicationEvent 类型时间分发的地方--> SimpleApplicationEventMulticaster类。 //我们会发现他是通过新建线程的方式来进行触发监听器消费事件。 executor.execute(() -> invokeListener(listener, event)); // 在 Adapter 中,我们通过这个方法的调用来判断是否开启了事务 TransactionSynchronizationManager.isSynchronizationActive(); // 接下来我们进入 Manager看看这个方法的注释。注释表明,这个方法会返回当前线程活跃的事务同步器。 // Bingo,是不是觉得有什么不对了。我们刚刚在 invokeListener 的时候是通过新建线程的方式进行异步的。 // 如果,我们再去获取当前线程的事务,结果非常明显,一定是 False。 // 所以,如果我们通过给 Multicaster 配置线程池的方式来实现异步看样子是不行的。 /** * Return if transaction synchronization is active for the current thread. * Can be called before register to avoid unnecessary instance creation. * @see #registerSynchronization */ public static boolean isSynchronizationActive() { return (synchronizations.get() != null); }

5.小结

本小节中,主要围绕 @TransactionalEventListener 来讲述下面 3 个内容:

如何使用 Spring 框架实现事务事件监听@TransactionalEventListener 是怎么实现绑定到事务上的事件监听的在异步场景下,@TransactionalEventListener 的陷阱。

本系列的第三部分给出如何能让 @TransactionalEventListener 异步执行,又不在每个地方都加上 @Async 注解的办法。


最新回复(0)