SpringBoot实现分库分表
方案:可以使用拦截器拦截mybatis框架,在执行SQL前对SQL语句根据路由字段进行分库分表操作,下例只做分表功能
@Intercepts:申明需要拦截的方法
拦截StatementHandler对象
一、statementHandler对象的定义
首先我们先来看看statementHandler接口的定义:
首先约定文中将的四大对象是指:executor, statementHandler,parameterHandler,resultHandler对象。
SimpleStatementHandler
:对应我们JDBC中常用的Statement接口,用于简单SQL的处理;PreparedStatementHandler
:对应JDBC中的PreparedStatement,预编译SQL的接口;CallableStatementHandler
:对应JDBC中CallableStatement,用于执行存储过程相关的接口;RoutingStatementHandler
:这个接口是以上三个接口的路由,没有实际操作,只是负责上面三个StatementHandler的创建及调用。
讲到statementHandler,毫无疑问它是我们四大对象最重要的一个,它的任务就是和数据库对话。在它这里会使用parameterHandler和ResultHandler对象为我们绑定SQL参数和组装最后的结果返回。
public interface StatementHandler {
Statement prepare(Connection connection)
throws SQLException;
void parameterize(Statement statement)
throws SQLException;
void batch(Statement statement)
throws SQLException;
int update(Statement statement)
throws SQLException;
<E> List<E> query(Statement statement, ResultHandler resultHandler)
throws SQLException;
BoundSql getBoundSql();
ParameterHandler getParameterHandler();
}
二、prepare方法
1、首先prepare方法是用来编译SQL
让我们看看它的源码实现。这里我们看到了BaseStatementHandler对prepare方法的实现
@Override
public Statement prepare(Connection connection) throws SQLException {
ErrorContext.instance().sql(boundSql.getSql());
Statement statement = null;
try {
statement = instantiateStatement(connection);
setStatementTimeout(statement);
setFetchSize(statement);
return statement;
} catch (SQLException e) {
closeStatement(statement);
throw e;
} catch (Exception e) {
closeStatement(statement);
throw new ExecutorException("Error preparing statement. Cause: " + e, e);
}
}
protected abstract Statement instantiateStatement(Connection connection) throws SQLException;
显然我们通过源码更加关注抽象方法instantiateStatement是做了什么事情。它依旧是一个抽象方法,那么它就有其实现类。
2、那就是之前说的那几个具体的StatementHandler对象
让我们看看PreparedStatementHandler:
@Override
protected Statement instantiateStatement(Connection connection) throws SQLException {
String sql = boundSql.getSql();
if (mappedStatement.getKeyGenerator() instanceof Jdbc3KeyGenerator) {
String[] keyColumnNames = mappedStatement.getKeyColumns();
if (keyColumnNames == null) {
return connection.prepareStatement(sql, PreparedStatement.RETURN_GENERATED_KEYS);
} else {
return connection.prepareStatement(sql, keyColumnNames);
}
} else if (mappedStatement.getResultSetType() != null) {
return connection.prepareStatement(sql, mappedStatement.getResultSetType().getValue(), ResultSet.CONCUR_READ_ONLY);
} else {
return connection.prepareStatement(sql);
}
}
好这个方法非常简单,我们可以看到它主要是根据上下文来预编译SQL,这是我们还没有设置参数。设置参数的任务是交由,statement接口的parameterize方法来实现的。
3、parameterize方法
上面我们在prepare方法里面预编译了SQL。那么我们这个时候希望设置参数。在Statement中我们是使用parameterize方法进行设置参数的。
让我们看看PreparedStatementHandler中的parameterize方法:
@Override
public void parameterize(Statement statement) throws SQLException {
parameterHandler.setParameters((PreparedStatement) statement);
}
很显然这里很简单是通过parameterHandler来实现的,我们这篇文章只是停留在statementhandler的程度,等我们讲解parameterHandler的时候再来看它如何实现吧,期待一下吧。
4、query/update方法
我们用了prepare方法预编译了SQL,用了parameterize方法设置参数,那么我们接下来肯定是想执行SQL,而SQL无非是两种:
一种是进行查询——query,另外就是更新——update。
这些方法都很简单,让我们看看PreparedStatementHandler的实现:
@Override
public int update(Statement statement) throws SQLException {
PreparedStatement ps = (PreparedStatement) statement;
ps.execute();
int rows = ps.getUpdateCount();
Object parameterObject = boundSql.getParameterObject();
KeyGenerator keyGenerator = mappedStatement.getKeyGenerator();
keyGenerator.processAfter(executor, mappedStatement, ps, parameterObject);
return rows;
}
@Override
public <E> List<E> query(Statement statement, ResultHandler resultHandler) throws SQLException {
PreparedStatement ps = (PreparedStatement) statement;
ps.execute();
return resultSetHandler.<E> handleResultSets(ps);
}
例:动态替换SQL中@TableID标识符
package com.study.demo.interceptor;
import com.study.demo.exception.BaseException;
import org.apache.ibatis.executor.statement.StatementHandler;
import org.apache.ibatis.mapping.BoundSql;
import org.apache.ibatis.plugin.Interceptor;
import org.apache.ibatis.plugin.Intercepts;
import org.apache.ibatis.plugin.Invocation;
import org.apache.ibatis.plugin.Plugin;
import org.apache.ibatis.plugin.Signature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.lang.reflect.Field;
import java.sql.Connection;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
@Component
@Intercepts({
@Signature(type = StatementHandler.class, method = "prepare", args = {Connection.class, Integer.class})})
public class DynamicSQLInterceptor implements Interceptor {
private static final Logger LOGGER = LoggerFactory.getLogger(DynamicSQLInterceptor.class);
private static final String SHARD_TABLE_ID = "SHARD_TABLE_ID";
private static final String DEFAULT_TABLE_ID = "000";
@Override
@SuppressWarnings("unchecked")
public Object intercept(Invocation invocation) throws Throwable {
LOGGER.info("DynamicSQLInterceptor.intercept() exec.");
StatementHandler statementHandler = (StatementHandler) invocation.getTarget();
Object parameter = statementHandler.getParameterHandler().getParameterObject();
Map<String, Object> params = (Map)parameter;
if(CollectionUtils.isEmpty(params)){
throw new BaseException("SQL: 路由字段不能为空!");
}
String tableId = DEFAULT_TABLE_ID;
Set<String> keySet = params.keySet();
for (String key : keySet) {
if (SHARD_TABLE_ID.equals(key)) {
tableId = String.valueOf(params.get(key));
}
}
BoundSql boundSql = statementHandler.getBoundSql();
//获取到原始sql语句
String sql = boundSql.getSql();
String newSql = sql.replaceAll("@TableID", tableId);
LOGGER.debug("[DynamicSQLInterceptor] Sql:{}", newSql);
//通过反射修改sql语句
Field field = boundSql.getClass().getDeclaredField("sql");
field.setAccessible(true);
field.set(boundSql, newSql);
return invocation.proceed();
}
@Override
public Object plugin(Object target) {
//只拦截Executor对象,减少目标被代理的次数
if (target instanceof StatementHandler) {
return Plugin.wrap(target, this);
} else {
return target;
}
}
@Override
public void setProperties(Properties properties) {
LOGGER.debug("[DynamicSQLInterceptor] SetProperties");
}
}
示例SQL:
SELECT * FROM ST_CLASS_@TableID WHERE ID = #{id}
service层示例:
@Override
public Objcet queryByPrimaryKey(String id) {
Map<String, Object> params = DbShardUtils.shardDBParamMap(id);
params.put("id", id);
return testDao.queryByPrimaryKey(params);
}
dao层示例:
@Repository
public interface TestDao {
Object queryByPrimaryKey(Map<String, Object> params);
}
package com.study.demo.utils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
public class DbShardUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(DbShardUtils.class);
private static final String SHARD_TABLE_ID = "SHARD_TABLE_ID";
private DbShardUtils() {
}
public static Map<String, Object> shardDBParamMap(String id){
if (StringUtils.isBlank(id)) {
LOGGER.error("sharding id is null");
}
Map<String, Object> paramMap = new HashMap<>();
paramMap.put(SHARD_TABLE_ID, rout(id));
return paramMap;
}
private static String rout(String id) {
// 测试
return "000";
}
}
以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程网。
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341