我的编程空间,编程开发者的网络收藏夹
学习永远不晚

MybatisPlus批量保存原理及失效原因排查全过程

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

MybatisPlus批量保存原理及失效原因排查全过程

问题描述

一般情况下,在MybatisPlus中使用saveBatch方法进行批量保存只需要:在数据库连接串中添加&rewriteBatchedStatements=true,并将MySQL驱动保证在5.0.18以上即可。

但是在这里实际使用中批量保存并没有生效,列表数据被分组成几批数据保存,而不是一批数据保存,通过调试、查看数据库日志等方式可以验证。

所以现在是配置正确,驱动正确,批量保存的数据正确,但是批量保存没有生效。

批量保存原理

框架是不会出问题的,这里来看下MybatisPlus实现批量保存的实现方式,调用saveBatch方法后发生了什么。

ServiceImpl.saveBatch()

@Transactional(rollbackFor = Exception.class)
@Override
public boolean saveBatch(Collection<T> entityList, int batchSize) {
  // ${className}.insert
	String sqlStatement = getSqlStatement(SqlMethod.INSERT_ONE);
  // 函数式编程 BiConsumer<T, U>
	return executeBatch(entityList, batchSize, (sqlSession, entity) -> sqlSession.insert(sqlStatement, entity));
}

采用函数式编程实现BiConsumer接口,其用法相当于

@Override
public boolean saveBatch(Collection entityList, int batchSize) {

	BiConsumer<SqlSession, Object> consumer = new EntityConsumer();

	return executeBatch(entityList, batchSize, consumer);
}

class EntityConsumer implements BiConsumer<SqlSession, Object>{

	@Override
	public void accept(SqlSession sqlSession, Object object) {
		sqlSession.insert("${className}.insert", object);
	}
}

SqlHelper.executeBatch()

定义批量保存的模板方法

public static <E> boolean executeBatch(Class<?> entityClass, Log log, Collection<E> list, int batchSize, BiConsumer<SqlSession, E> consumer) {
	Assert.isFalse(batchSize < 1, "batchSize must not be less than one");
	return !CollectionUtils.isEmpty(list) && executeBatch(entityClass, log, sqlSession -> {
		int size = list.size();
		int i = 1;
		for (E element : list) {
      // 调用sqlSession.insert("${className}.insert", object);
      // 数据最终保存到StatementImpl.batchedArgs中,用于后面做批量保存
			consumer.accept(sqlSession, element);
			if ((i % batchSize == 0) || i == size) {
        // 批量保存StatementImpl.batchedArgs中的数据
				sqlSession.flushStatements();
			}
			i++;
		}
	});
}

MybatisBatchExecutor.doUpdate()

将待执行对象添加到对应的Statement中,可以理解为将批量数据分组,分组的依据包含两个:

 if (sql.equals(currentSql) && ms.equals(currentStatement))

​ ● 数据的SQL语句必须完全一致,包括表名和列

​ ● 使用的MappedStatement一致,即Mapper一致

@Override
public int doUpdate(MappedStatement ms, Object parameterObject) throws SQLException {
	final Configuration configuration = ms.getConfiguration();
	final StatementHandler handler = configuration.newStatementHandler(this, ms, parameterObject, RowBounds.DEFAULT, null, null);
	final BoundSql boundSql = handler.getBoundSql();
	final String sql = boundSql.getSql();
	final Statement stmt;
  // **
	if (sql.equals(currentSql) && ms.equals(currentStatement)) {
		int last = statementList.size() - 1;
		stmt = statementList.get(last);
		applyTransactionTimeout(stmt);
		handler.parameterize(stmt);//fix Issues 322
		BatchResult batchResult = batchResultList.get(last);
		batchResult.addParameterObject(parameterObject);
	} else {
		Connection connection = getConnection(ms.getStatementLog());
		stmt = handler.prepare(connection, transaction.getTimeout());
		if (stmt == null) {
			return 0;
		}
		handler.parameterize(stmt);    //fix Issues 322
		currentSql = sql;
		currentStatement = ms;
		statementList.add(stmt);
		batchResultList.add(new BatchResult(ms, sql, parameterObject));
	}
	handler.batch(stmt);
	return BATCH_UPDATE_RETURN_VALUE;
}

PreparedStatement.addBatch()

将数据添加到StatementImpl.batchedArgs中,至此第一阶段完成

public void addBatch() throws SQLException {
	synchronized (checkClosed().getConnectionMutex()) {
		if (this.batchedArgs == null) {
			this.batchedArgs = new ArrayList<Object>();
		}

		for (int i = 0; i < this.parameterValues.length; i++) {
			checkAllParametersSet(this.parameterValues[i], this.parameterStreams[i], i);
		}

		this.batchedArgs.add(new BatchParams(this.parameterValues, this.parameterStreams, this.isStream, this.streamLengths, this.isNull));
	}
}

MybatisBatchExecutor.doFlushStatements()

遍历Statemen,并执行executeBatch()方法

@Override
public List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException {
	try {
		List<BatchResult> results = new ArrayList<>();
		if (isRollback) {
			return Collections.emptyList();
		}
		for (int i = 0, n = statementList.size(); i < n; i++) {
			Statement stmt = statementList.get(i);
			applyTransactionTimeout(stmt);
			BatchResult batchResult = batchResultList.get(i);
			try {
				batchResult.setUpdateCounts(stmt.executeBatch());
				MappedStatement ms = batchResult.getMappedStatement();
				List<Object> parameterObjects = batchResult.getParameterObjects();
				KeyGenerator keyGenerator = ms.getKeyGenerator();
				if (Jdbc3KeyGenerator.class.equals(keyGenerator.getClass())) {
					Jdbc3KeyGenerator jdbc3KeyGenerator = (Jdbc3KeyGenerator) keyGenerator;
					jdbc3KeyGenerator.processBatch(ms, stmt, parameterObjects);
				} else if (!NoKeyGenerator.class.equals(keyGenerator.getClass())) { //issue #141
					for (Object parameter : parameterObjects) {
						keyGenerator.processAfter(this, ms, stmt, parameter);
					}
				}
				// Close statement to close cursor #1109
				closeStatement(stmt);
			} catch (BatchUpdateException e) {
				StringBuilder message = new StringBuilder();
				message.append(batchResult.getMappedStatement().getId())
					.append(" (batch index #")
					.append(i + 1)
					.append(")")
					.append(" failed.");
				if (i > 0) {
					message.append(" ")
						.append(i)
						.append(" prior sub executor(s) completed successfully, but will be rolled back.");
				}
				throw new BatchExecutorException(message.toString(), e, results, batchResult);
			}
			results.add(batchResult);
		}
		return results;
	} finally {
		for (Statement stmt : statementList) {
			closeStatement(stmt);
		}
		currentSql = null;
		statementList.clear();
		batchResultList.clear();
	}
}

PreparedStatement.executeBatchInternal()

最终执行批量操作的逻辑,这里会判断rewriteBatchedStatements参数

@Override
protected long[] executeBatchInternal() throws SQLException {
	synchronized (checkClosed().getConnectionMutex()) {

		if (this.connection.isReadOnly()) {
			throw new SQLException(Messages.getString("PreparedStatement.25") + Messages.getString("PreparedStatement.26"),
				SQLError.SQL_STATE_ILLEGAL_ARGUMENT);
		}

		if (this.batchedArgs == null || this.batchedArgs.size() == 0) {
			return new long[0];
		}

		// we timeout the entire batch, not individual statements
		int batchTimeout = this.timeoutInMillis;
		this.timeoutInMillis = 0;

		resetCancelledState();

		try {
			statementBegins();

			clearWarnings();

      // 判断rewriteBatchedStatements参数
			if (!this.batchHasPlainStatements && this.connection.getRewriteBatchedStatements()) {

				if (canRewriteAsMultiValueInsertAtSqlLevel()) {
					return executeBatchedInserts(batchTimeout);
				}

				if (this.connection.versionMeetsMinimum(4, 1, 0) && !this.batchHasPlainStatements && this.batchedArgs != null
					&& this.batchedArgs.size() > 3 ) {
					return executePreparedBatchAsMultiStatement(batchTimeout);
				}
			}

			return executeBatchSerially(batchTimeout);
		} finally {
			this.statementExecuting.set(false);

			clearBatch();
		}
	}
}

问题排查

​ 在流程中几个关键节点为:

​ ● MybatisBatchExecutor:1、执行数据分组逻辑。2、遍历Statement执行批量保存逻辑。

​ ● StatementImpl:保存batchedArgs

​ ● PreparedStatement:执行最终的批量保存逻辑

​ 可以看出JDK层只执行最终的保存操作,如果这里的数据batchedArgs没有拿到批量的,那一定是MybatisPlus的分组逻辑出现问题。通过调试发现问题出现在​

if (sql.equals(currentSql) && ms.equals(currentStatement))

​ 由于有些数据有些列没有默认值导致SQL的列不同,数据被添加到不同的Statement执行,导致最终的批量操作失效。

总结

整个批量过程可以分为两个阶段:

  • 1、将批量数据添加到statementImpl.batchedArgs中保存。
  • 2、调用statement.executeBatch方法完成批量。

来看下这两步最基本的操作之上,MybatisPlus做了哪些事情:

  • ​ 1、定义批量操作的模板。
  • ​ 2、验证集合中的数据,将完全一致的SQL添加到同一个Statement中。
  • ​ 3、Jdbc3KeyGenerator?

值得注意的是,批量模板中的单条新增调用的是sqlSession.insert(),这个方法是没有执行execute的,只是将数据放到statementImpl.batchedArgs中。而常规的单条新增调用的是baseMapper.insert()方法,其是基于动态代理的方法来实现。

可以看出以上流程经历了已知的四层结构:MybatisPlus–>Mybatis–>MySQL connector–>JDK。其最底层还是通过preparedStatement来实现批量操作,和我们通过原始JDBC来实现批量操作的原理相同,上层都是框架实现的封装。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程网。

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

MybatisPlus批量保存原理及失效原因排查全过程

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

MybatisPlus批量保存原理及失效原因排查全过程

这篇文章主要介绍了MybatisPlus批量保存原理及失效原因排查全过程,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
2023-01-11

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录