我的编程空间,编程开发者的网络收藏夹
学习永远不晚

JPA多数据源分布式事务的示例分析

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

JPA多数据源分布式事务的示例分析

这篇文章主要介绍了JPA多数据源分布式事务的示例分析,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。

    问题背景

    在解决mysql字段脱敏处理时,结合sharding-jdbc的脱敏组件功能,为了sql兼容和最小化应用改造,博主给出了一个多数据源融合的字段脱敏解决方案(只把包含脱敏字段表的操作走sharding-jdbc脱敏代理数据源)。这个方案解决了问题的同时,带来了一个新的问题,数据源的事务是独立的,正如我文中所述《JPA项目多数据源模式整合sharding-jdbc实现数据脱敏》,在spring上下文中,每个数据源对应一个独立的事务管理器,默认的事务管理器的数据源就用业务本身的数据源,所以需要加密的业务使用时,需要指定@Transactional注解里的事务管理器名称为脱敏对应的事务管理器名称。简单的业务场景这样用也就没有问题了,但是一般的业务场景总有一个事务覆盖两个数据源的操作,这个时候单指定哪个事务管理器都不行,so,这里需要一种多数据源的事务管理器。

    XA事务方案

    XA协议采用2PC(两阶段提交)的方式来管理分布式事务。XA接口提供资源管理器与事务管理器之间进行通信的标准接口。在JDBC的XA事务相关api抽象里,相关接口定义如下

    XADataSource,XA协议数据源

    public interface XADataSource extends CommonDataSource {    XAConnection getXAConnection() throws SQLException;   //省略getLogWriter等非关键方法 }

    XAConnection

    public interface XAConnection extends PooledConnection {        javax.transaction.xa.XAResource getXAResource() throws SQLException;}

    XAResource

    public interface XAResource {        void commit(Xid xid, boolean onePhase) throws XAException;        void end(Xid xid, int flags) throws XAException;        void forget(Xid xid) throws XAException;        boolean isSameRM(XAResource xares) throws XAException;        int prepare(Xid xid) throws XAException;        Xid[] recover(int flag) throws XAException;        void rollback(Xid xid) throws XAException;        void start(Xid xid, int flags) throws XAException;    //省略非关键方法}

    相比较普通的事务管理,JDBC的XA协议管理多了一个XAResource资源管理器,XA事务相关的行为(开启、准备、提交、回滚、结束)都由这个资源管理器来控制,这些都是框架内部的行为,体现在开发层面提供的数据源也变成了XADataSource。而JTA的抽象里,定义了UserTransaction、TransactionManager。想要使用JTA事务,必须先实现这两个接口。所以,如果我们要使用JTA+XA控制多数据源的事务,在sprign boot里以Atomikos为例,

    引入Atomikos依赖

    <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-jta-atomikos</artifactId></dependency>

    spring boot已经帮我们把XA事务管理器自动装载类定义好了,如:

    创建JTA事务管理器

    @Configuration(proxyBeanMethods = false)@EnableConfigurationProperties({ AtomikosProperties.class, JtaProperties.class })@ConditionalOnClass({ JtaTransactionManager.class, UserTransactionManager.class })@ConditionalOnMissingBean(PlatformTransactionManager.class)class AtomikosJtaConfiguration {@Bean(initMethod = "init", destroyMethod = "shutdownWait")@ConditionalOnMissingBean(UserTransactionService.class)UserTransactionServiceImp userTransactionService(AtomikosProperties atomikosProperties,JtaProperties jtaProperties) {Properties properties = new Properties();if (StringUtils.hasText(jtaProperties.getTransactionManagerId())) {properties.setProperty("com.atomikos.icatch.tm_unique_name", jtaProperties.getTransactionManagerId());}properties.setProperty("com.atomikos.icatch.log_base_dir", getLogBaseDir(jtaProperties));properties.putAll(atomikosProperties.asProperties());return new UserTransactionServiceImp(properties);}@Bean(initMethod = "init", destroyMethod = "close")@ConditionalOnMissingBean(TransactionManager.class)UserTransactionManager atomikosTransactionManager(UserTransactionService userTransactionService) throws Exception {UserTransactionManager manager = new UserTransactionManager();manager.setStartupTransactionService(false);manager.setForceShutdown(true);return manager;}@Bean@ConditionalOnMissingBean(XADataSourceWrapper.class)AtomikosXADataSourceWrapper xaDataSourceWrapper() {return new AtomikosXADataSourceWrapper();}@BeanJtaTransactionManager transactionManager(UserTransaction userTransaction, TransactionManager transactionManager,ObjectProvidertransactionManagerCustomizers) {JtaTransactionManager jtaTransactionManager = new JtaTransactionManager(userTransaction, transactionManager);transactionManagerCustomizers.ifAvailable((customizers) -> customizers.customize(jtaTransactionManager));return jtaTransactionManager;}}

    显然,想要使用XA事务,除了需要提供UserTransaction、TransactionManager的实现。还必须要有一个XADataSource,而sharding-jdbc代理的数据源是DataSource的,我们需要将XADataSource包装成普通的DataSource,spring已经提供了一个AtomikosXADataSourceWrapper的XA数据源包装器,而且在AtomikosJtaConfiguration里已经注册到Spring上下文中,所以我们在自定义数据源时可以直接注入包装器实例,然后,因为是JPA环境,所以在创建EntityManagerFactory实例时,需要指定JPA的事务管理类型为JTA,综上,普通的业务默认数据源配置如下:

    @Configuration@EnableConfigurationProperties({JpaProperties.class, DataSourceProperties.class})public class DataSourceConfiguration{    @Primary    @Bean    public DataSource dataSource(AtomikosXADataSourceWrapper wrapper, DataSourceProperties dataSourceProperties) throws Exception {        MysqlXADataSource dataSource = dataSourceProperties.initializeDataSourceBuilder().type(MysqlXADataSource.class).build();        return wrapper.wrapDataSource(dataSource);    }    @Primary    @Bean(initMethod = "afterPropertiesSet")    public LocalContainerEntityManagerFactoryBean entityManagerFactory(JpaProperties jpaProperties, DataSource dataSource, EntityManagerFactoryBuilder factoryBuilder) {        return factoryBuilder.dataSource(dataSource)                .packages(Constants.BASE_PACKAGES)                .properties(jpaProperties.getProperties())                .persistenceUnit("default")                .jta(true)                .build();    }    @Bean    @Primary    public EntityManager entityManager(EntityManagerFactory entityManagerFactory){        //必须使用SharedEntityManagerCreator创建SharedEntityManager实例,否则SimpleJpaRepository中的事务不生效        return SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory);    }}

    sharding-jdbc加密数据源和普通业务数据源其实是同一个数据源,只是走加解密逻辑的数据源需要被sharding-jdbc的加密组件代理一层,加上了加解密的处理逻辑。所以配置如下:

    @Configuration@EnableConfigurationProperties({JpaProperties.class,SpringBootEncryptRuleConfigurationProperties.class, SpringBootPropertiesConfigurationProperties.class})public class EncryptDataSourceConfiguration {    @Bean    public DataSource encryptDataSource(DataSource dataSource,SpringBootPropertiesConfigurationProperties props,SpringBootEncryptRuleConfigurationProperties encryptRule) throws SQLException {        return EncryptDataSourceFactory.createDataSource(dataSource, new EncryptRuleConfigurationYamlSwapper().swap(encryptRule), props.getProps());    }    @Bean(initMethod = "afterPropertiesSet")    public LocalContainerEntityManagerFactoryBean encryptEntityManagerFactory(@Qualifier("encryptDataSource") DataSource dataSource,JpaProperties jpaProperties, EntityManagerFactoryBuilder factoryBuilder) throws SQLException {        return factoryBuilder.dataSource(dataSource)                .packages(Constants.BASE_PACKAGES)                .properties(jpaProperties.getProperties())                .persistenceUnit("encryptPersistenceUnit")                .jta(true)                .build();    }    @Bean    public EntityManager encryptEntityManager(@Qualifier("encryptEntityManagerFactory") EntityManagerFactory entityManagerFactory){        //必须使用SharedEntityManagerCreator创建SharedEntityManager实例,否则SimpleJpaRepository中的事务不生效        return SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory);    }}

    遇到问题1、

    Connection pool exhausted - try increasing 'maxPoolSize' and/or 'borrowConnectionTimeout' on the DataSourceBean.

    解决问题:默认AtomikosXADataSourceWrapper包装器初始化的数据源连接池最大为1,所以需要添加配置参数如:

    spring.jta.atomikos.datasource.max-pool-size=20

    遇到问题2、

    XAER_INVAL: Invalid arguments (or unsupported command)

    解决问题:这个是mysql实现XA的bug,仅当您在同一事务中多次访问同一MySQL数据库时,才会发生此问题,在mysql连接url加上如下参数即可,如:

    spring.datasource.url = jdbc:mysql://127.0.0.1:3306/xxx?pinGlobalTxToPhysicalConnection=true

    Mysql XA事务行为

    在这个场景中,虽然是多数据源,但是底层链接的是同一个mysql数据库,所以XA事务行为为,从第一个执行的sql开始(并不是JTA事务begin阶段),生成xid并XA START事务,然后XA END。第二个数据源的sql执行时会判断是否同一个mysql资源,如果是同一个则用刚生成的xid重新XA START RESUME,然后XA END,最终虽然在应用层是两个DataSource,其实最后只会调用XA COMMIT一次。mysql驱动实现的XAResource的start如下:

    public void start(Xid xid, int flags) throws XAException {        StringBuilder commandBuf = new StringBuilder(MAX_COMMAND_LENGTH);        commandBuf.append("XA START ");        appendXid(commandBuf, xid);        switch (flags) {            case TMJOIN:                commandBuf.append(" JOIN");                break;            case TMRESUME:                commandBuf.append(" RESUME");                break;            case TMNOFLAGS:                // no-op                break;            default:                throw new XAException(XAException.XAER_INVAL);        }        dispatchCommand(commandBuf.toString());        this.underlyingConnection.setInGlobalTx(true);    }

    第一次sql执行时,flags=0,走的TMNOFLAGS逻辑,第二次sql执行时,flags=134217728,走的TMRESUME,重新开启事务的逻辑。以上是Mysql XA的真实事务逻辑,但是博主研究下来发现,msyql xa并不支持XA START RESUME这种语句,而且有很多限制《Mysql XA交易限制》,所以在mysql数据库使用XA事务时,最好了解下mysql xa的缺陷

    链式事务方案

    链式事务不是我首创的叫法,在spring-data-common项目的Transaction包下,已经有一个默认实现ChainedTransactionManager,前文中《深入理解spring的@Transactional工作原理》已经分析了Spring的事务抽象,由PlatformTransactionManager(事务管理器)、TransactionStatus(事务状态)、TransactionDefinition(事务定义)等形态组成,ChainedTransactionManager也是实现了PlatformTransactionManager和TransactionStatus。实现原理也很简单,在ChainedTransactionManager内部维护了事务管理器的集合,通过代理编排真实的事务管理器,在事务开启、提交、回滚时,都分别操作集合里的事务。以达到对多个事务的统一管理。这个方案比较简陋,而且有缺陷,在提交阶段,如果异常不是发生在第一个数据源,那么会存在之前的提交不会回滚,所以在使用ChainedTransactionManager时,尽量把出问题可能性比较大的事务管理器放链的后面(开启事务、提交事务顺序相反)。这里只是抛出了一种新的多数据源事务管理的思路,能用XA尽量用XA管理。

    普通的业务默认数据源配置如下:

    @Configuration@EnableConfigurationProperties({JpaProperties.class, DataSourceProperties.class})public class DataSourceConfiguration{    @Primary    @Bean    public DataSource dataSource(DataSourceProperties dataSourceProperties){       return dataSourceProperties.initializeDataSourceBuilder().type(HikariDataSource.class).build();    }    @Primary    @Bean(initMethod = "afterPropertiesSet")    public LocalContainerEntityManagerFactoryBean entityManagerFactory(JpaProperties jpaProperties, DataSource dataSource, EntityManagerFactoryBuilder factoryBuilder) {        return factoryBuilder.dataSource(dataSource)                .packages(Constants.BASE_PACKAGES)                .properties(jpaProperties.getProperties())                .persistenceUnit("default")                .build();    }    @Bean    @Primary    public EntityManager entityManager(EntityManagerFactory entityManagerFactory){        //必须使用SharedEntityManagerCreator创建SharedEntityManager实例,否则SimpleJpaRepository中的事务不生效        return SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory);    }    @Primary    @Bean    public PlatformTransactionManager transactionManager(EntityManagerFactory entityManagerFactory){        JpaTransactionManager txManager = new JpaTransactionManager();        txManager.setEntityManagerFactory(entityManagerFactory);        return txManager;    }}

    sharding-jdbc加密数据源配置如下:

    @Configuration@EnableConfigurationProperties({JpaProperties.class,SpringBootEncryptRuleConfigurationProperties.class, SpringBootPropertiesConfigurationProperties.class})public class EncryptDataSourceConfiguration {    @Bean    public DataSource encryptDataSource(DataSource dataSource,SpringBootPropertiesConfigurationProperties props,SpringBootEncryptRuleConfigurationProperties encryptRule) throws SQLException {        return EncryptDataSourceFactory.createDataSource(dataSource, new EncryptRuleConfigurationYamlSwapper().swap(encryptRule), props.getProps());    }    @Bean(initMethod = "afterPropertiesSet")    public LocalContainerEntityManagerFactoryBean encryptEntityManagerFactory(@Qualifier("encryptDataSource") DataSource dataSource,JpaProperties jpaProperties, EntityManagerFactoryBuilder factoryBuilder) throws SQLException {        return factoryBuilder.dataSource(dataSource)                .packages(Constants.BASE_PACKAGES)                .properties(jpaProperties.getProperties())                .persistenceUnit("encryptPersistenceUnit")                .build();    }    @Bean    public EntityManager encryptEntityManager(@Qualifier("encryptEntityManagerFactory") EntityManagerFactory entityManagerFactory){        //必须使用SharedEntityManagerCreator创建SharedEntityManager实例,否则SimpleJpaRepository中的事务不生效        return SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory);    }    @Bean    public PlatformTransactionManager chainedTransactionManager(PlatformTransactionManager transactionManager) throws SQLException {        JpaTransactionManager encryptTransactionManager = new JpaTransactionManager();        encryptTransactionManager.setEntityManagerFactory(encryptEntityManagerFactory());        //使用链式事务管理器包装真正的transactionManager、txManager事务        ChainedTransactionManager chainedTransactionManager = new ChainedTransactionManager(encryptTransactionManager,transactionManager);        return chainedTransactionManager;    }}

    使用这种方案,在涉及到多数据源的业务时,需要指定使用哪个事务管理器,如:

    @PersistenceContext(unitName = "encryptPersistenceUnit")    private EntityManager entityManager;    @PersistenceContext    private EntityManager manager;    @Transactional(transactionManager = "chainedTransactionManager")    public AccountModel  save(AccountDTO dto){        AccountModel accountModel = AccountMapper.INSTANCE.dtoTo(dto);        entityManager.persist(accountModel);        entityManager.flush();        AccountModel accountMode2 = AccountMapper.INSTANCE.dtoTo(dto);        manager.persist(accountMode2);        manager.flush();        return accountModel;    }

    感谢你能够认真阅读完这篇文章,希望小编分享的“JPA多数据源分布式事务的示例分析”这篇文章对大家有帮助,同时也希望大家多多支持编程网,关注编程网行业资讯频道,更多相关知识等着你来学习!

    免责声明:

    ① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

    ② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

    JPA多数据源分布式事务的示例分析

    下载Word文档到电脑,方便收藏和打印~

    下载Word文档

    猜你喜欢

    JPA多数据源分布式事务的示例分析

    这篇文章主要介绍了JPA多数据源分布式事务的示例分析,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。问题背景在解决mysql字段脱敏处理时,结合sharding-jdbc的脱敏
    2023-06-29

    Redis数据库分布式的示例分析

    这篇文章给大家分享的是有关Redis数据库分布式的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。问题:1-2亿数据需要缓存,如何设计?1 哈希取余分区2亿条记录就是2亿个k,v,假设有3台机器构成一个集群
    2023-06-28

    Netty分布式监听读事件的示例分析

    小编给大家分享一下Netty分布式监听读事件的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!我们回到AbstractUnsafe的register0()方
    2023-06-29

    Spring Boot多数据源处理事务实例分析

    这篇“Spring Boot多数据源处理事务实例分析”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“Spring Boot多数
    2023-06-30

    LINQ数据源的示例分析

    这篇文章给大家分享的是有关LINQ数据源的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。LINQ数据源在上一个示例中,由于数据源是数组,因此它隐式支持泛型 IEnumerable<(Of <(t>)>)
    2023-06-17

    springboot+mybatisplus+druid如何实现多数据源+分布式事务

    这篇文章主要介绍springboot+mybatisplus+druid如何实现多数据源+分布式事务,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!  jdk环境:1.8  springboot:2.1.3.RELEA
    2023-06-02

    Spring源码解析之编程式事务的示例分析

    这篇文章主要为大家展示了“Spring源码解析之编程式事务的示例分析”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“Spring源码解析之编程式事务的示例分析”这篇文章吧。一、前言在Spring中
    2023-06-15

    Netty分布式pipeline管道传播outBound事件的示例分析

    这篇文章将为大家详细讲解有关Netty分布式pipeline管道传播outBound事件的示例分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。outbound事件传输流程在我们业务代码中, 有可能使用w
    2023-06-29

    Netty分布式pipeline管道异常传播事件的示例分析

    这篇文章主要介绍了Netty分布式pipeline管道异常传播事件的示例分析,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。传播异常事件简单的异常处理的场景@Overridep
    2023-06-29

    Netty分布式编码器及写数据事件处理使用场景的示例分析

    这篇文章主要介绍Netty分布式编码器及写数据事件处理使用场景的示例分析,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!编码器第一节: writeAndFlush的事件传播我们之前在学习pipeline的时候, 讲解了
    2023-06-29

    编程热搜

    • Python 学习之路 - Python
      一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
      Python 学习之路 - Python
    • chatgpt的中文全称是什么
      chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
      chatgpt的中文全称是什么
    • C/C++中extern函数使用详解
    • C/C++可变参数的使用
      可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
      C/C++可变参数的使用
    • css样式文件该放在哪里
    • php中数组下标必须是连续的吗
    • Python 3 教程
      Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
      Python 3 教程
    • Python pip包管理
      一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
      Python pip包管理
    • ubuntu如何重新编译内核
    • 改善Java代码之慎用java动态编译

    目录