我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Spring方法中调用异步方法进行事务控制详解

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Spring方法中调用异步方法进行事务控制详解

Spring 异步事务控制

前言:这里的异步方法事务控制指的是:A 方法中异步调用 B 方法,使 A 方法和 B 方法的提交 / 回滚能保持一致;而不是对 @Async 单独的方法进行事务控制。

Spring 事务源码逻辑

在进行 Spring 异步事务控制编码前,先了解下 Spring 是如何进行事务控制的。

一、事务拦截器拦截

定义了@Transactional的方法会被代理,由代理对象执行方法。会进入TransactionInterceptor#invoke()方法

public Object invoke(MethodInvocation invocation) throws Throwable {    Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);    // Adapt to TransactionAspectSupport's invokeWithinTransaction...    return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {// ...        }    });}

二、进行事务控制

进入invokeWithinTransaction()方法

protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,             final InvocationCallback invocation) throws Throwable {    // If the transaction attribute is null, the method is non-transactional.    TransactionAttributeSource tas = getTransactionAttributeSource();    final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);    // 获取事务管理器    final TransactionManager tm = determineTransactionManager(txAttr);    // ...    PlatformTransactionManager ptm = asPlatformTransactionManager(tm);    final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);    if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {        // 定义事务相关信息,如隔离级别、传播机制等        TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);        Object retVal;        try {            // 调用被代理方法            retVal = invocation.proceedWithInvocation();        }        catch (Throwable ex) {            // target invocation exception            completeTransactionAfterThrowing(txInfo, ex);            throw ex;        }        finally {            // 设置ThraedLocal数据为null            cleanupTransactionInfo(txInfo);        }       // ...                // 提交/回滚事务        commitTransactionAfterReturning(txInfo);        return retVal;    }    // ...}

进入 createTransactionIfNecessary()方法

protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,                           @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {    // ...    TransactionStatus status = null;    if (txAttr != null) {        if (tm != null) {            // 构建事务状态,如:启动事务、设置数据库连接信息等            status = tm.getTransaction(txAttr);        }        // ...    }    // 返回事务信息    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);}

getTransaction() 方法

public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)    throws TransactionException {    // 使用默认的事务定义    TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());// 获取事务对象,进入这个方法可以看到事务信息都是存在 ThreadLocal 中的    Object transaction = doGetTransaction();    boolean debugEnabled = logger.isDebugEnabled();    // ...    else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||             def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||             def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {        SuspendedResourcesHolder suspendedResources = suspend(null);                try {            // 开启事务            return startTransaction(def, transaction, debugEnabled, suspendedResources);        }        // ...    }}

进入 DataSourceTransactionManager # doGetTransaction()方法

protected Object doGetTransaction() {    // DataSourceTransactionObject 对象是 DataSourceTransactionManager 的私有静态内部类    DataSourceTransactionObject txObject = new DataSourceTransactionObject();    txObject.setSavepointAllowed(isNestedTransactionAllowed());    ConnectionHolder conHolder =        (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());    txObject.setConnectionHolder(conHolder, false);    return txObject;}// 进入 TransactionSynchronizationManager # getResource()方法private static Object doGetResource(Object actualKey) {    Map<Object, Object> map = resources.get();    if (map == null) {        return null;    }    Object value = map.get(actualKey);    // Transparently remove ResourceHolder that was marked as void...    if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {        map.remove(actualKey);        // Remove entire ThreadLocal if empty...        if (map.isEmpty()) {            resources.remove();        }        value = null;    }    return value;}// 观察 TransactionSynchronizationManager 对象public abstract class TransactionSynchronizationManager {private static final ThreadLocal<Map<Object, Object>> resources =new NamedThreadLocal<>("Transactional resources");private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =new NamedThreadLocal<>("Transaction synchronizations");private static final ThreadLocal<String> currentTransactionName =new NamedThreadLocal<>("Current transaction name");private static final ThreadLocal<Boolean> currentTransactionReadOnly =new NamedThreadLocal<>("Current transaction read-only status");private static final ThreadLocal<Integer> currentTransactionIsolationLevel =new NamedThreadLocal<>("Current transaction isolation level");private static final ThreadLocal<Boolean> actualTransactionActive =new NamedThreadLocal<>("Actual transaction active");}

三、事务开启 / 回滚 /提交操作

事务的控制操作都是由 DataSourceTransactionManager 来进行的,如:

  • doBegin:开启事务
  • doCommit:提交事务
  • doRollback:回滚事务

其中主要是从 ThreadLocal 中取出 Connection 对象进行事务控制,以 doCommit 为例:

protected void doCommit(DefaultTransactionStatus status) {    // 获取连接对象    DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();    Connection con = txObject.getConnectionHolder().getConnection();    if (status.isDebug()) {        logger.debug("Committing JDBC transaction on Connection [" + con + "]");    }    try {        // 提交        con.commit();    }    catch (SQLException ex) {        throw translateException("JDBC commit", ex);    }}

四、事务完成,清除事务信息

在第二步结尾,进入commitTransactionAfterReturning()

protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {    if (txInfo != null && txInfo.getTransactionStatus() != null) {        if (logger.isTraceEnabled()) {            logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");        }        // 提交/回滚事务        txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());    }}public final void commit(TransactionStatus status) throws TransactionException {    if (status.isCompleted()) {        throw new IllegalTransactionStateException(            "Transaction is already completed - do not call commit or rollback more than once per transaction");    }    DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;    if (defStatus.isLocalRollbackOnly()) {        if (defStatus.isDebug()) {            logger.debug("Transactional code has requested rollback");        }        // 回滚        processRollback(defStatus, false);        return;    }    if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {        if (defStatus.isDebug()) {            logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");        }        // 回滚        processRollback(defStatus, true);        return;    }// 提交    processCommit(defStatus);}

以提交为例,进入processCommit()

private void processCommit(DefaultTransactionStatus status) throws TransactionException {    try {        boolean beforeCompletionInvoked = false;        try {            boolean unexpectedRollback = false;            // 各种异常、判断等前置处理        }        finally {            // 清除事务信息            cleanupAfterCompletion(status);        }    }}private void cleanupAfterCompletion(DefaultTransactionStatus status) {    status.setCompleted();    if (status.isNewSynchronization()) {        TransactionSynchronizationManager.clear();    }    if (status.isNewTransaction()) {        doCleanupAfterCompletion(status.getTransaction());    }    if (status.getSuspendedResources() != null) {        if (status.isDebug()) {            logger.debug("Resuming suspended transaction after completion of inner transaction");        }        Object transaction = (status.hasTransaction() ? status.getTransaction() : null);        resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());    }}

简单总结

由上面的源码可以知道,Spring 的事务控制主要是一下步骤:

  1. 对标注了事务注解的方法进行动态代理
  2. 代理方法的前置处理是获取数据库连接,定义事务信息等,存储在 ThreadLocal 中
  3. 开启事务
  4. 执行方法逻辑
  5. 提交 / 回滚事务
  6. 清除事务信息

异步方法事务控制

这里的异步方法事务控制指的是:A 方法中异步调用 B 方法,使 A 方法和 B 方法的提交 / 回滚能保持一致;而不是对 @Async 单独的方法进行事务控制。

方案一:自身编码控制数据库连接

这个方法是自己结合对 Spring 事务控制理解想到的,许多细节并没有考虑到,只提供一种思路。

说到底 Spring 的事务控制还是基于数据库连接的,只不过它帮助我们简化了操作。所以如果我们自己去维护这个数据库连接,然后再对它进行手动的事务控制即可。

PS:要注意的是,使用 Mybatis 结合 Spring 时,Mybatis 获取的数据库连接,是通过 Spring 获得的,所以如果自己在执行 Mybatis 方法前创建数据库连接,再手动控制是没有用的,因为自己创建的数据库连接和 Mybatis 使用的不是同一个。

由于如上情况,如果完全的靠自己获取事务连接进行事务控制,那么就还需要改写 Mybatis 执行的逻辑,这样就很麻烦。所以这里使用的方案是依旧利用 Spring 获取数据库连接对象,只不过要将这个对象拿出来,自己维护。

编码

通过之前的源码知道,Spring 获取数据库连接获取后,将其存储到 DataSourceTransactionObject 类中的

protected void doBegin(Object transaction, TransactionDefinition definition) {    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;    Connection con = null;    try {        if (!txObject.hasConnectionHolder() ||            txObject.getConnectionHolder().isSynchronizedWithTransaction()) {            Connection newCon = obtainDataSource().getConnection();            if (logger.isDebugEnabled()) {                logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");            }            // 将获取的 Connection 对象设置到 DataSourceTransactionObject            txObject.setConnectionHolder(new ConnectionHolder(newCon), true);        }        // ...    }}

DataSourceTransactionObject 对象是 DataSourceTransactionManager 的私有静态内部类,所以没法在外部使用它,所以这里利用了反射来使用这个对象。

方法 A

需要注意下面的代码获取到数据库连接对象后只进行了简单处理,将其设置到了该类的一个属性中,实际使用需要考虑如何维护问题

public void asyncTrans(Boolean flag) {    DefaultTransactionStatus status = null;    try {        DefaultTransactionDefinition def = new DefaultTransactionDefinition();        // 设置事务信息(事务名、传播方式)        def.setName("SomeTxName");        def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);        // 获取事务状态        status = (DefaultTransactionStatus) transactionManager.getTransaction(def);        // 获取事务信息(实际类型 DataSourceTransactionObject,是DataSourceTransactionManager的私有静态内部类)        Object transaction = status.getTransaction();        // 由于是私有静态内部类,所以根据反射调用,获取数据库连接        Method[] methods = ReflectionUtils.getAllDeclaredMethods(transaction.getClass());        for (Method method : methods) {            // 获取数据库连接            if ("getConnectionHolder".equals(method.getName())) {                ConnectionHolder connectionHolder = (ConnectionHolder) ReflectionUtils.invokeMethod(method, transaction);                Connection connection = connectionHolder.getConnection();                // 将该连接自己维护(这里简单处理,只是设置到了该类的一个属性中,实际使用需要考虑如何维护问题)                this.connection = connection;            }        }        // 执行方法逻辑        AreaZoneRel zoneRel = new AreaZoneRel();        zoneRel.setZoneName("华北");        zoneRel.setAreaName("北京");        zoneRelMapper.insert(zoneRel);        // 执行异步方法        asyncMethod(flag);    } catch (Exception ex) {        // 回滚        transactionManager.rollback(status);        throw new RuntimeException(ex);    } finally {        status.setCompleted();        if (status.isNewSynchronization()) {            TransactionSynchronizationManager.clear();        }    }}

异步方法 B

PS:下面针对事务 A 的回滚 / 提交异常只是简单处理,直接将连接关闭(事务会自动回滚),实际使用需要考量。

此外下面的实现有许多细节没有注意,如:A 提交失败,B 的回滚处理问题。

private void asyncMethod(Boolean flag) {    // 执行异步方法    Callable callable = () -> {        try {            // 子线程本身的事务控制使用Spring的事务控制            transactionTemplate.execute(status -> {                AreaGraphRel graphRel = new AreaGraphRel();                graphRel.setAreaName("上海");                graphRel.setGraphName("上海.svg");                graphRelMapper.insert(graphRel);                try {                    Thread.sleep(5000);                } catch (InterruptedException e) {                    throw new RuntimeException(e);                }                // 异常模拟                if (flag) {                    int a = 1 / 0;                }                return null;            });        } catch (Exception e) {            // 如果异步方法抛出异常,则回滚外部线程的事务            try {                connection.rollback();            } catch (Exception exception) {                // 粗暴处理,异常直接关闭连接                connection.close();                throw new RuntimeException(exception);            }            throw new RuntimeException(e);        }        try {            // 提交            connection.commit();        } catch (Exception exception) {            // 粗暴处理,异常直接关闭连接            connection.close();            throw new RuntimeException(exception);        }        return 200;    };    FutureTask task = new FutureTask(callable);    new Thread(task).start();}

方案二:基于快照实现

这个方案是根据 Seata AT 模式的思路实现的

这种方式就是对需要进行控制的方法生成数据快照,如:将 A 方法提交事务前数据信息记录下来,作为前一个版本的数据快照;这时执行 B 方法时,当 B 方法执行失败,其事务回滚,那么就根据快照将 A 方法的数据更新为上一个版本。

PS:这里有如下几点需要注意:

  1. 当利用快照回滚时,需要考虑是否要保证当前数据信息和快照修改后的数据一致问题。
    因为如果在基于快照进行更新时,该数据被其他调用进行数据修改,此时再基于快照更新数据则是将期间其他的正常操作结果都覆盖了,那么要考虑这样是否会对系统数据造成影响。
  2. 当利用快照回滚时,调用失败的异常处理。
    如果是上面 1 比较不一致导致回滚异常,那么只能通过告警,人为处理;如果是超时等情况可以利用 MQ 等消息中间件进行不断的失败重试,最后再结合异常报警通知,来通知人手动处理。

来源地址:https://blog.csdn.net/AhangA/article/details/129978356

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Spring方法中调用异步方法进行事务控制详解

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

Spring方法中调用异步方法进行事务控制详解

Spring 异步事务控制 文章目录 Spring 异步事务控制Spring 事务源码逻辑一、事务拦截器拦截二、进行事务控制三、事务开启 / 回滚 /提交操作四、事务完成,清除事务信息简单总结 异步方法事务控制方案一:自身编码
2023-08-20

Spring事务传播中嵌套调用实现方法详细介绍

Spring事务的本质就是对数据库事务的支持,没有数据库事务,Spring是无法提供事务功能的。Spring只提供统一的事务管理接口,具体实现都是由数据库自己实现的,Spring会在事务开始时,根据当前设置的隔离级别,调整数据库的隔离级别,由此保持一致
2022-11-13

详解Android的OkHttp包编写异步HTTP请求调用的方法

OkHttp 除了支持常用的同步 HTTP 请求之外,还支持异步 HTTP 请求调用。在使用同步调用时,当前线程会被阻塞,直到 HTTP 请求完成。当同时发出多个 HTTP 请求时,同步调用的性能会比较差。这个时候通过异步调用可以提高整体的
2022-06-06

调用阿里云服务器的方法详细步骤与注意事项

本文主要讲解了如何通过调用阿里云服务器的方法,包括详细步骤和需要注意的事项。调用阿里云服务器可以实现数据存储、应用程序运行、网络服务等功能,是企业进行信息化建设的重要工具。正文:调用阿里云服务器的方法有很多种,但是最常用的是通过命令行工具。以下详细步骤和注意事项供参考:步骤一:登录阿里云控制台首先,需要在电脑上安
调用阿里云服务器的方法详细步骤与注意事项
2023-12-16

C++ 成员函数详解:对象方法在异步编程中的作用

成员函数在异步编程中起着至关重要的作用:允许对耗时的任务进行封装,将计算与调用代码分离开来。使应用程序可以在后台执行任务的同时继续运行,提高响应性。创建响应迅速且能利用多核架构的现代 c++++ 应用程序。C++ 成员函数详解:对象方法在异
C++ 成员函数详解:对象方法在异步编程中的作用
2024-04-30

详解Android应用中使用TabHost组件进行布局的基本方法

TabHost布局文件 我们先来了解一下布局文件的基本内容: 1. 根标签及id 设置Android自带id : XML布局文件中, 可以使用 标签设置, 其中的id 需要引用 android的自带id :android:id=@andro
2022-06-06

C#中如何使用异步编程模型和并发编程处理任务分发及解决方法

C#中如何使用异步编程模型和并发编程处理任务分发及解决方法引言:在现代的软件开发中,我们经常面临处理大量任务的情况,而这些任务可能是独立的,互不干扰的。为了提高程序的性能和效率,我们希望能够并发地处理这些任务,并且在每个任务完成时能够得到相
2023-10-22

编程热搜

目录