Spring方法中调用异步方法进行事务控制详解
Spring 异步事务控制
文章目录
前言:这里的异步方法事务控制指的是:A 方法中异步调用 B 方法,使 A 方法和 B 方法的提交 / 回滚能保持一致;而不是对 @Async
单独的方法进行事务控制。
Spring 事务源码逻辑
在进行 Spring 异步事务控制编码前,先了解下 Spring 是如何进行事务控制的。
一、事务拦截器拦截
定义了@Transactional
的方法会被代理,由代理对象执行方法。会进入TransactionInterceptor#invoke()
方法
public Object invoke(MethodInvocation invocation) throws Throwable { Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); // Adapt to TransactionAspectSupport's invokeWithinTransaction... return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {// ... } });}
二、进行事务控制
进入invokeWithinTransaction()
方法
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) throws Throwable { // If the transaction attribute is null, the method is non-transactional. TransactionAttributeSource tas = getTransactionAttributeSource(); final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null); // 获取事务管理器 final TransactionManager tm = determineTransactionManager(txAttr); // ... PlatformTransactionManager ptm = asPlatformTransactionManager(tm); final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) { // 定义事务相关信息,如隔离级别、传播机制等 TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification); Object retVal; try { // 调用被代理方法 retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // target invocation exception completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { // 设置ThraedLocal数据为null cleanupTransactionInfo(txInfo); } // ... // 提交/回滚事务 commitTransactionAfterReturning(txInfo); return retVal; } // ...}
进入 createTransactionIfNecessary()
方法
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, final String joinpointIdentification) { // ... TransactionStatus status = null; if (txAttr != null) { if (tm != null) { // 构建事务状态,如:启动事务、设置数据库连接信息等 status = tm.getTransaction(txAttr); } // ... } // 返回事务信息 return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);}
getTransaction()
方法
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException { // 使用默认的事务定义 TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());// 获取事务对象,进入这个方法可以看到事务信息都是存在 ThreadLocal 中的 Object transaction = doGetTransaction(); boolean debugEnabled = logger.isDebugEnabled(); // ... else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { SuspendedResourcesHolder suspendedResources = suspend(null); try { // 开启事务 return startTransaction(def, transaction, debugEnabled, suspendedResources); } // ... }}
进入 DataSourceTransactionManager # doGetTransaction()
方法
protected Object doGetTransaction() { // DataSourceTransactionObject 对象是 DataSourceTransactionManager 的私有静态内部类 DataSourceTransactionObject txObject = new DataSourceTransactionObject(); txObject.setSavepointAllowed(isNestedTransactionAllowed()); ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource()); txObject.setConnectionHolder(conHolder, false); return txObject;}// 进入 TransactionSynchronizationManager # getResource()方法private static Object doGetResource(Object actualKey) { Map<Object, Object> map = resources.get(); if (map == null) { return null; } Object value = map.get(actualKey); // Transparently remove ResourceHolder that was marked as void... if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) { map.remove(actualKey); // Remove entire ThreadLocal if empty... if (map.isEmpty()) { resources.remove(); } value = null; } return value;}// 观察 TransactionSynchronizationManager 对象public abstract class TransactionSynchronizationManager {private static final ThreadLocal<Map<Object, Object>> resources =new NamedThreadLocal<>("Transactional resources");private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =new NamedThreadLocal<>("Transaction synchronizations");private static final ThreadLocal<String> currentTransactionName =new NamedThreadLocal<>("Current transaction name");private static final ThreadLocal<Boolean> currentTransactionReadOnly =new NamedThreadLocal<>("Current transaction read-only status");private static final ThreadLocal<Integer> currentTransactionIsolationLevel =new NamedThreadLocal<>("Current transaction isolation level");private static final ThreadLocal<Boolean> actualTransactionActive =new NamedThreadLocal<>("Actual transaction active");}
三、事务开启 / 回滚 /提交操作
事务的控制操作都是由 DataSourceTransactionManager
来进行的,如:
- doBegin:开启事务
- doCommit:提交事务
- doRollback:回滚事务
其中主要是从 ThreadLocal
中取出 Connection
对象进行事务控制,以 doCommit
为例:
protected void doCommit(DefaultTransactionStatus status) { // 获取连接对象 DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction(); Connection con = txObject.getConnectionHolder().getConnection(); if (status.isDebug()) { logger.debug("Committing JDBC transaction on Connection [" + con + "]"); } try { // 提交 con.commit(); } catch (SQLException ex) { throw translateException("JDBC commit", ex); }}
四、事务完成,清除事务信息
在第二步结尾,进入commitTransactionAfterReturning()
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) { if (txInfo != null && txInfo.getTransactionStatus() != null) { if (logger.isTraceEnabled()) { logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]"); } // 提交/回滚事务 txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); }}public final void commit(TransactionStatus status) throws TransactionException { if (status.isCompleted()) { throw new IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); } DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; if (defStatus.isLocalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Transactional code has requested rollback"); } // 回滚 processRollback(defStatus, false); return; } if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Global transaction is marked as rollback-only but transactional code requested commit"); } // 回滚 processRollback(defStatus, true); return; }// 提交 processCommit(defStatus);}
以提交为例,进入processCommit()
private void processCommit(DefaultTransactionStatus status) throws TransactionException { try { boolean beforeCompletionInvoked = false; try { boolean unexpectedRollback = false; // 各种异常、判断等前置处理 } finally { // 清除事务信息 cleanupAfterCompletion(status); } }}private void cleanupAfterCompletion(DefaultTransactionStatus status) { status.setCompleted(); if (status.isNewSynchronization()) { TransactionSynchronizationManager.clear(); } if (status.isNewTransaction()) { doCleanupAfterCompletion(status.getTransaction()); } if (status.getSuspendedResources() != null) { if (status.isDebug()) { logger.debug("Resuming suspended transaction after completion of inner transaction"); } Object transaction = (status.hasTransaction() ? status.getTransaction() : null); resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources()); }}
简单总结
由上面的源码可以知道,Spring 的事务控制主要是一下步骤:
- 对标注了事务注解的方法进行动态代理
- 代理方法的前置处理是获取数据库连接,定义事务信息等,存储在 ThreadLocal 中
- 开启事务
- 执行方法逻辑
- 提交 / 回滚事务
- 清除事务信息
异步方法事务控制
这里的异步方法事务控制指的是:A 方法中异步调用 B 方法,使 A 方法和 B 方法的提交 / 回滚能保持一致;而不是对 @Async
单独的方法进行事务控制。
方案一:自身编码控制数据库连接
这个方法是自己结合对 Spring 事务控制理解想到的,许多细节并没有考虑到,只提供一种思路。
说到底 Spring 的事务控制还是基于数据库连接的,只不过它帮助我们简化了操作。所以如果我们自己去维护这个数据库连接,然后再对它进行手动的事务控制即可。
PS:要注意的是,使用 Mybatis 结合 Spring 时,Mybatis 获取的数据库连接,是通过 Spring 获得的,所以如果自己在执行 Mybatis 方法前创建数据库连接,再手动控制是没有用的,因为自己创建的数据库连接和 Mybatis 使用的不是同一个。
由于如上情况,如果完全的靠自己获取事务连接进行事务控制,那么就还需要改写 Mybatis 执行的逻辑,这样就很麻烦。所以这里使用的方案是依旧利用 Spring 获取数据库连接对象,只不过要将这个对象拿出来,自己维护。
编码
通过之前的源码知道,Spring 获取数据库连接获取后,将其存储到 DataSourceTransactionObject
类中的
protected void doBegin(Object transaction, TransactionDefinition definition) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; Connection con = null; try { if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) { Connection newCon = obtainDataSource().getConnection(); if (logger.isDebugEnabled()) { logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction"); } // 将获取的 Connection 对象设置到 DataSourceTransactionObject txObject.setConnectionHolder(new ConnectionHolder(newCon), true); } // ... }}
而DataSourceTransactionObject
对象是 DataSourceTransactionManager
的私有静态内部类,所以没法在外部使用它,所以这里利用了反射来使用这个对象。
方法 A
需要注意下面的代码获取到数据库连接对象后只进行了简单处理,将其设置到了该类的一个属性中,实际使用需要考虑如何维护问题
public void asyncTrans(Boolean flag) { DefaultTransactionStatus status = null; try { DefaultTransactionDefinition def = new DefaultTransactionDefinition(); // 设置事务信息(事务名、传播方式) def.setName("SomeTxName"); def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); // 获取事务状态 status = (DefaultTransactionStatus) transactionManager.getTransaction(def); // 获取事务信息(实际类型 DataSourceTransactionObject,是DataSourceTransactionManager的私有静态内部类) Object transaction = status.getTransaction(); // 由于是私有静态内部类,所以根据反射调用,获取数据库连接 Method[] methods = ReflectionUtils.getAllDeclaredMethods(transaction.getClass()); for (Method method : methods) { // 获取数据库连接 if ("getConnectionHolder".equals(method.getName())) { ConnectionHolder connectionHolder = (ConnectionHolder) ReflectionUtils.invokeMethod(method, transaction); Connection connection = connectionHolder.getConnection(); // 将该连接自己维护(这里简单处理,只是设置到了该类的一个属性中,实际使用需要考虑如何维护问题) this.connection = connection; } } // 执行方法逻辑 AreaZoneRel zoneRel = new AreaZoneRel(); zoneRel.setZoneName("华北"); zoneRel.setAreaName("北京"); zoneRelMapper.insert(zoneRel); // 执行异步方法 asyncMethod(flag); } catch (Exception ex) { // 回滚 transactionManager.rollback(status); throw new RuntimeException(ex); } finally { status.setCompleted(); if (status.isNewSynchronization()) { TransactionSynchronizationManager.clear(); } }}
异步方法 B
PS:下面针对事务 A 的回滚 / 提交异常只是简单处理,直接将连接关闭(事务会自动回滚),实际使用需要考量。
此外下面的实现有许多细节没有注意,如:A 提交失败,B 的回滚处理问题。
private void asyncMethod(Boolean flag) { // 执行异步方法 Callable callable = () -> { try { // 子线程本身的事务控制使用Spring的事务控制 transactionTemplate.execute(status -> { AreaGraphRel graphRel = new AreaGraphRel(); graphRel.setAreaName("上海"); graphRel.setGraphName("上海.svg"); graphRelMapper.insert(graphRel); try { Thread.sleep(5000); } catch (InterruptedException e) { throw new RuntimeException(e); } // 异常模拟 if (flag) { int a = 1 / 0; } return null; }); } catch (Exception e) { // 如果异步方法抛出异常,则回滚外部线程的事务 try { connection.rollback(); } catch (Exception exception) { // 粗暴处理,异常直接关闭连接 connection.close(); throw new RuntimeException(exception); } throw new RuntimeException(e); } try { // 提交 connection.commit(); } catch (Exception exception) { // 粗暴处理,异常直接关闭连接 connection.close(); throw new RuntimeException(exception); } return 200; }; FutureTask task = new FutureTask(callable); new Thread(task).start();}
方案二:基于快照实现
这个方案是根据 Seata AT 模式的思路实现的
这种方式就是对需要进行控制的方法生成数据快照,如:将 A 方法提交事务前数据信息记录下来,作为前一个版本的数据快照;这时执行 B 方法时,当 B 方法执行失败,其事务回滚,那么就根据快照将 A 方法的数据更新为上一个版本。
PS:这里有如下几点需要注意:
- 当利用快照回滚时,需要考虑是否要保证当前数据信息和快照修改后的数据一致问题。
因为如果在基于快照进行更新时,该数据被其他调用进行数据修改,此时再基于快照更新数据则是将期间其他的正常操作结果都覆盖了,那么要考虑这样是否会对系统数据造成影响。 - 当利用快照回滚时,调用失败的异常处理。
如果是上面 1 比较不一致导致回滚异常,那么只能通过告警,人为处理;如果是超时等情况可以利用 MQ 等消息中间件进行不断的失败重试,最后再结合异常报警通知,来通知人手动处理。
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341