分布式 | DBLE LOAD DATA 功能实现解析
1.概述
本篇文章主要介绍 DBLE LOAD DATA 大规模数据导入功能的实现,包括方案设计、源码解读。
下面就让我们一起来探秘 DBLE 是如何实现该功能的吧!
2.方案设计
LOAD DATA 为 MySQL 提供的从文本文件导入数据到表的语法,作为数据库中间件,当然也需要实现对应的功能,来满足用户的导入数据需求。
DBLE 对该功能的实现其实就是直接模拟了 MySQL 对 LOAD DATA 命令相应的处理协议。当然作为数据库中间件,还需要处理相应数据的存储、数据路由情况以及与后端 MySQL 的交互等方面的逻辑。
下图即为 DBLE 对 LOAD DATA 处理的整体流程:
3.源码解读
DBLE 与 LOAD DATA 功能实现相关的类其实主要有两个,一个是 ServerLoadDataInfileHandler
类,一个是 LoadDataUtil
类,ServerLoadDataInfileHandler
类主要处理的是与客户端交互的逻辑,而 LoadDataUtil
类主要处理的是与后端 MySQL 交互的逻辑。
下面我们就从客户端发送命令到 DBLE 处理,最后到 DBLE 与后端 MySQL 交互的过程,来详细看下相应的代码。
当客户端发来 LOAD DATA 导入数据到表命令的时候,DBLE 作为服务端会接收到相应的命令并进行处理,对应的代码在 ServerQueryHandler#query
方法中,这里会判断 SQL 的类型为 LOAD DATA,然后进一步处理:
public void query(String sql) {
ServerConnection c = this.source;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.valueOf(c) + sql);
}
……
int rs = ServerParse.parse(sql);
boolean isWithHint = ServerParse.startWithHint(sql);
int sqlType = rs & 0xff;
……
switch (sqlType) {
……
case ServerParse.LOAD_DATA_INFILE_SQL:
//对LOAD DATA的处理,调用FrontendConnection#loadDataInfileStart方法
c.loadDataInfileStart(sql);
break;
……
}
}
继续看一下 FrontendConnection#loadDataInfileStart
方法:
public void loadDataInfileStart(String sql) {
if (loadDataInfileHandler != null) {
try {
//进一步调用了ServerLoadDataInfileHandler#start方法
loadDataInfileHandler.start(sql);
} catch (Exception e) {
LOGGER.info("load data error", e);
writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.getMessage());
}
} else {
writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "load data infile sql is not unsupported!");
}
}
下面便进入到了 ServerLoadDataInfileHandler#start
方法,前面讲过该类主要处理的是 DBLE 与客户端的交互逻辑。
该方法比较长,大家可以去细看,主要功能还是解析了客户端发送过来的 SQL 语句,然后针对 LOAD DATA 语法,如果导入文件是本机文件,则直接进行解析,否则的话会向客户端发送获取文件的命令,让客户端传输文件过来:
public void start(String strSql) {
……
parseLoadDataPram();
//如果文件不在本地,则向客户端发送命令,请求数据文件,这里的local可能会让人疑惑,但MySQL语法确实是这么规定的,load data local用法反而是文件不在本地的用法
if (statement.isLocal()) {
isStartLoadData = true;
//request file from client
ByteBuffer buffer = serverConnection.allocate();
RequestFilePacket filePacket = new RequestFilePacket();
filePacket.setFileName(fileName.getBytes());
filePacket.setPacketId(1);
filePacket.write(buffer, serverConnection, true);
} else {
//如果文件在本地的话,先判断文件是否存在,不存在则报错,存在的话需要对文件进行读取,计算每一行的路由结果,然后对不同节点的数据分别进行存储
if (!new File(fileName).exists()) {
String msg = fileName + " is not found!";
clear();
serverConnection.writeErrMessage(ErrorCode.ER_FILE_NOT_FOUND, msg);
} else {
if (parseFileByLine(fileName, loadData.getCharset(), loadData.getLineTerminatedBy())) {
RouteResultset rrs = buildResultSet(routeResultMap);
if (rrs != null) {
flushDataToFile();
isStartLoadData = false;
serverConnection.getSession2().execute(rrs);
}
}
}
}
}
DBLE 发送命令给客户端后,客户端便会源源不断地把数据文件发送过来,对发送过来文件的处理逻辑在 ServerLoadDataInfileHandler#handle
方法中,该方法其实就是对传输过来的文件进行转储,默认数据小于 200Mb 则存在内存中,否则的话存储到本地文件:
public void handle(byte[] data) {
try {
if (sql == null) {
clear();
serverConnection.writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "Unknown command");
return;
}
BinaryPacket packet = new BinaryPacket();
ByteArrayInputStream inputStream = new ByteArrayInputStream(data, 0, data.length);
packet.read(inputStream);
//这里就是对发送过来的文件进行转储
saveByteOrToFile(packet.getData(), false);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
文件发送完成,客户端还会发送一个空包过来,告诉 DBLE 数据发送完了,然后 DBLE 会进行下一步处理(其实这里就是 MySQL 协议中的规定),下一步处理的逻辑在 ServerLoadDataInfileHandler#end
方法中。
该方法也比较长,主要处理逻辑是将接受过来的文件进一步计算路由,根据计算结果将文件根据不同节点分别存储,最后构建路由结果集,通过 DBLE 下发 LOAD DATA 命令到后端不同的 MySQL 节点:
public void end(byte packId) {
isStartLoadData = false;
this.packID = packId;
//empty packet for end
saveByteOrToFile(null, true);
if (isHasStoreToFile) {
//这里便是计算路由,并根据路由结果存储不同节点的数据文件
parseFileByLine(tempFile, loadData.getCharset(), loadData.getLineTerminatedBy());
}
……
//构建路由结果集,下发后端MySQL,执行LOAD DATA命令
RouteResultset rrs = buildResultSet(routeResultMap);
if (rrs != null) {
flushDataToFile();
serverConnection.getSession2().execute(rrs);
}
}
DBLE 与后端 MySQL 的交互逻辑跟客户端与 DBLE 的交互逻辑基本一样,因为都是基于 MySQL 协议嘛,DBLE 这边还需要做的就是将不同节点的数据文件发送给后端的 MySQL,具体的逻辑在 LoadDataUtil#requestFileDataResponse
方法中,该方法就是将 DBLE 处理过的数据文件,发送到后端的 MySQL 了,由 MySQL 来进行真正的数据存储:
public static void requestFileDataResponse(byte[] data, BackendConnection conn) {
byte packId = data[3];
MySQLConnection c = (MySQLConnection) conn;
RouteResultsetNode rrn = (RouteResultsetNode) conn.getAttachment();
LoadData loadData = rrn.getLoadData();
List loadDataData = loadData.getData();
BufferedInputStream in = null;
try {
//如果数据较小,都在内存中,则直接发送
if (loadDataData != null && loadDataData.size() > 0) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
for (String loadDataDataLine : loadDataData) {
String s = loadDataDataLine + loadData.getLineTerminatedBy();
byte[] bytes = s.getBytes(CharsetUtil.getJavaCharset(loadData.getCharset()));
bos.write(bytes);
}
packId = writeToBackConnection(packId, new ByteArrayInputStream(bos.toByteArray()), c);
} else {
//否则的话,先读取文件,然后再发送数据
in = new BufferedInputStream(new FileInputStream(loadData.getFileName()));
packId = writeToBackConnection(packId, in, c);
}
}
……
}
到这里,整个 DBLE 对 LOAD DATA 的处理流程就讲完啦。
4.总结
本篇文章主要分析讲解了 DBLE 对 LOAD DATA 功能的实现,包括方案设计以及源码解读,希望大家看完后能对整个 LOAD DATA 功能有更进一步的了解。
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341