我的编程空间,编程开发者的网络收藏夹
学习永远不晚

使用DataX实现mysql与hive数据互相导入导出

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

使用DataX实现mysql与hive数据互相导入导出

一、概论

1.1 什么是DataX

         DataX 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle 等)、HDFS、Hive、ODPS、HBase、FTP 等各种异构数据源之间稳定高效的数据同步功能。

1.2 DataX 的设计

         为了解决异构数据源同步问题,DataX 将复杂的网状的同步链路变成了星型数据链路,DataX 作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到 DataX,便能跟已有的数据源做到无缝数据同步
在这里插入图片描述

1.3 框架设计

在这里插入图片描述

  • Reader:数据采集模块,负责采集数据源的数据,将数据发给Framework。
  • Wiriter: 数据写入模块,负责不断向Framwork取数据,并将数据写入到目的端。
  • Framework:用于连接read和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
    运行原理
    在这里插入图片描述
  • Job:单个作业的管理节点,负责数据清理、子任务划分、TaskGroup监控管理。
  • Task:由Job切分而来,是DataX作业的最小单元,每个Task负责一部分数据的同步工作。
  • Schedule:将Task组成TaskGroup,单个TaskGroup的并发数量为5。
  • TaskGroup:负责启动Task。

1.4 Datax所支持的渠道

类型数据源读者作家(写)文件
RDBMS关系型数据库MySQL
           甲骨文        √        √    
SQL服务器
PostgreSQL的
DRDS
通用RDBMS(支持所有关系型数据库)
阿里云数仓数据存储ODPS
美国存托凭证
开源软件
OCS
NoSQL数据存储OTS
Hbase0.94
Hbase1.1
凤凰4.x
凤凰5.x
MongoDB
蜂巢
卡桑德拉
无结构化数据存储文本文件
的FTP
HDFS
弹性搜索
时间序列数据库OpenTSDB
技术开发局

二、快速入门

2.1 环境搭建

下载地址: http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
源码地址: https://github.com/alibaba/DataX

配置要求:

  • Linux
  • JDK(1.8以上 建议1.8) 下载
  • Python(推荐 Python2.6.X)下载
    安装:

1) 将下载好的datax.tar.gz上传到服务器的任意节点,我这里上传到node01上的/exprot/soft
2)解压到/export/servers/

[root@node01 soft]# tar -zxvf datax.tar.gz  -C ../servers/

3)运行自检脚本

出现以下结果说明你得环境没有问题

[/opt/module/datax/plugin/reader/._hbase094xreader/plugin.json]不存在. 请检查您的配置文件.
在这里插入图片描述

2.2搭建环境注意事项

[/opt/module/datax/plugin/reader/._hbase094xreader/plugin.json]不存在. 请检查您的配置文件.

参考:

find ./* -type f -name ".*er"  | xargs rm -rffind: paths must precede expression: |Usage: find [-H] [-L] [-P] [-Olevel] [-D help|tree|search|stat|rates|opt|exec] [path...] [expression]find /datax/plugin/reader/ -type f -name "._*er" | xargs rm -rffind /datax/plugin/writer/ -type f -name "._*er" | xargs rm -rf这里的/datax/plugin/writer/要改为你自己的目录

原文链接:https://blog.csdn.net/dz77dz/article/details/127055299

2.3读取Mysql中的数据写入到HDFS

准备
创建数据库和表并加载测试数据

create database test;use test;create table c_s(   id   varchar(100) null,    c_id int          null,    s_id varchar(20)  null);INSERT INTO test.c_s (id, c_id, s_id) VALUES ('123', 1, '201967');INSERT INTO test.c_s (id, c_id, s_id) VALUES ('123', 2, '201967');INSERT INTO test.c_s (id, c_id, s_id) VALUES ('123', 3, '201967');INSERT INTO test.c_s (id, c_id, s_id) VALUES ('123', 5, '201967');INSERT INTO test.c_s (id, c_id, s_id) VALUES ('123', 6, '201967');

查看官方提供的模板

[root@node01 datax]# bin/datax.py -r mysqlreader -w hdfswriterDataX (DATAX-OPENSOURCE-3.0), From Alibaba !Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.Please refer to the mysqlreader document:     https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.mdPlease refer to the hdfswriter document:     https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.mdPlease save the following configuration as a json file and  use     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.jsonto run the job.{    "job": {        "content": [            {                "reader": {                    "name": "mysqlreader",                    "parameter": {                        "column": [],                        "connection": [{    "jdbcUrl": [],    "table": []}                        ],                        "password": "",                        "username": "",                        "where": ""                    }                },                "writer": {                    "name": "hdfswriter",                    "parameter": {                        "column": [],                        "compress": "",                        "defaultFS": "",                        "fieldDelimiter": "",                        "fileName": "",                        "fileType": "",                        "path": "",                        "writeMode": ""                    }                }            }        ],        "setting": {            "speed": {                "channel": ""            }        }    }}

根据官网模板进行修改

[root@node01 datax]# vim job/mysqlToHDFS.json{    "job": {        "content": [            {                "reader": {                    "name": "mysqlreader",                    "parameter": {                        "column": ["id","c_id","s_id"                        ],                        "connection": [{    "jdbcUrl": [        "jdbc:mysql://node02:3306/test"    ],    "table": [        "c_s"    ]}                        ],                        "password": "123456",                        "username": "root"                    }                },                "writer": {                    "name": "hdfswriter",                    "parameter": {                        "column": [{    "name": "id",    "type": "string"},{    "name": "c_id",    "type": "int"},{    "name": "s_id",    "type": "string"}                        ],                        "defaultFS": "hdfs://node01:8020",                        "fieldDelimiter": "\t",                        "fileName": "c_s.txt",                        "fileType": "text",                        "path": "/",                        "writeMode": "append"                    }                }            }        ],        "setting": {            "speed": {                "channel": "1"            }        }    }}

HDFS的端口号注意版本,2.7.4 是9000;hdfs://node01:9000

MySQL的参数介绍
在这里插入图片描述
HDFS参数介绍
在这里插入图片描述
运行脚本

[root@node01 datax]# bin/datax.py  job/mysqlToHDFS.json2020-10-02 16:12:16.358 [job-0] INFO  HookInvoker - No hook invoked, because base dir not exists or is a file: /export/servers/datax/hook2020-10-02 16:12:16.359 [job-0] INFO  JobContainer -         [total cpu info] =>                averageCpu                     | maxDeltaCpu                    | minDeltaCpu                -1.00%                         | -1.00%                         | -1.00%         [total gc info] =>                 NAME                 | totalGCCount       | maxDeltaGCCount    | minDeltaGCCount    | totalGCTime        | maxDeltaGCTime     | minDeltaGCTime                 PS MarkSweep         | 1                  | 1                  | 1                  | 0.245s             | 0.245s             | 0.245s                 PS Scavenge          | 1                  | 1                  | 1                  | 0.155s             | 0.155s             | 0.155s2020-10-02 16:12:16.359 [job-0] INFO  JobContainer - PerfTrace not enable!2020-10-02 16:12:16.359 [job-0] INFO  StandAloneJobContainerCommunicator - Total 5 records, 50 bytes | Speed 5B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%2020-10-02 16:12:16.360 [job-0] INFO  JobContainer -任务启动时刻                    : 2020-10-02 16:12:04任务结束时刻                    : 2020-10-02 16:12:16任务总计耗时                    :                 12s任务平均流量                    :                5B/s记录写入速度                    :              0rec/s读出记录总数                    :                   5读写失败总数                    :                   0

2.4 读取HDFS中的数据写入到Mysql

准备工作

create database test;use test;create table c_s2(   id   varchar(100) null,    c_id int          null,    s_id varchar(20)  null);

查看官方提供的模板

[root@node01 datax]# bin/datax.py -r hdfsreader -w mysqlwriterDataX (DATAX-OPENSOURCE-3.0), From Alibaba !Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.Please refer to the hdfsreader document:     https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.mdPlease refer to the mysqlwriter document:     https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.mdPlease save the following configuration as a json file and  use     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.jsonto run the job.{    "job": {        "content": [            {                "reader": {                    "name": "hdfsreader",                    "parameter": {                        "column": [],                        "defaultFS": "",                        "encoding": "UTF-8",                        "fieldDelimiter": ",",                        "fileType": "orc",                        "path": ""                    }                },                "writer": {                    "name": "mysqlwriter",                    "parameter": {                        "column": [],                        "connection": [{    "jdbcUrl": "",    "table": []}                        ],                        "password": "",                        "preSql": [],                        "session": [],                        "username": "",                        "writeMode": ""                    }                }            }        ],        "setting": {            "speed": {                "channel": ""            }        }    }}

根据官方提供模板进行修改

[root@node01 datax]# vim job/hdfsTomysql.json{    "job": {        "content": [            {                "reader": {                    "name": "hdfsreader",                    "parameter": {                        "column": ["*"                        ],                        "defaultFS": "hdfs://node01:8020",                        "encoding": "UTF-8",                        "fieldDelimiter": "\t",                        "fileType": "text",                        "path": "/c_s.txt"                    }                },                "writer": {                    "name": "mysqlwriter",                    "parameter": {                        "column": ["id","c_id","s_id"                        ],                        "connection": [{    "jdbcUrl": "jdbc:mysql://node02:3306/test",    "table": [        "c_s2"    ]}                        ],                        "password": "123456",                        "username": "root",                        "writeMode": "replace"                    }                }            }        ],        "setting": {            "speed": {                "channel": "1"            }        }    }}

脚本运行

[root@node01 datax]# bin/datax.py job/hdfsTomysql.json         [total cpu info] =>                averageCpu                     | maxDeltaCpu                    | minDeltaCpu                -1.00%                         | -1.00%                         | -1.00%         [total gc info] =>                 NAME                 | totalGCCount       | maxDeltaGCCount    | minDeltaGCCount    | totalGCTime        | maxDeltaGCTime     | minDeltaGCTime                 PS MarkSweep         | 1                  | 1                  | 1                  | 0.026s             | 0.026s             | 0.026s                 PS Scavenge          | 1                  | 1                  | 1                  | 0.015s             | 0.015s             | 0.015s2020-10-02 16:57:13.152 [job-0] INFO  JobContainer - PerfTrace not enable!2020-10-02 16:57:13.152 [job-0] INFO  StandAloneJobContainerCommunicator - Total 5 records, 50 bytes | Speed 5B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.033s | Percentage 100.00%2020-10-02 16:57:13.153 [job-0] INFO  JobContainer -任务启动时刻                    : 2020-10-02 16:57:02任务结束时刻                    : 2020-10-02 16:57:13任务总计耗时                    :                 11s任务平均流量                    :                5B/s记录写入速度                    :              0rec/s读出记录总数                    :                   5读写失败总数                    :                   0

2.5将Mysql表导入Hive

在hive中建表

-- hive建表CREATE TABLE student2 (classNo string,stuNo string,score int) row format delimited fields terminated by ',';-- 构造点mysql数据create table if not exists student2(    classNo varchar ( 50 ),    stuNo   varchar ( 50 ),    score    int )insert into student2 values('1001','1012ww10087',63);insert into student2 values('1002','1012aa10087',63);insert into student2 values('1003','1012bb10087',63);insert into student2 values('1004','1012cc10087',63);insert into student2 values('1005','1012dd10087',63);insert into student2 values('1006','1012ee10087',63);

编写mysql2hive.json配置文件

{    "job": {        "setting": {            "speed": {                "channel": 1            }        },        "content": [            {                "reader": {                    "name": "mysqlreader",                    "parameter": {                        "username": "root",                        "password": "root",                        "connection": [{    "table": [        "student2"    ],    "jdbcUrl": [        "jdbc:mysql://192.168.43.10:3306/mytestmysql"    ]}                        ],                        "column": ["classNo","stuNo","score"                        ]                    }                },                "writer": {                    "name": "hdfswriter",                    "parameter": {                        "defaultFS": "hdfs://192.168.43.10:9000",                        "path": "/hive/warehouse/home/myhive.db/student2",                        "fileName": "myhive",                        "writeMode": "append",                        "fieldDelimiter": ",",                        "fileType": "text",                        "column": [{    "name": "classNo",    "type": "string"},{    "name": "stuNo",    "type": "string"},{    "name": "score",    "type": "int"}                        ]                    }                }            }        ]    }}

运行脚本

bin/datax.py job/mysql2hive.json 

查看hive表是否有数据

2.6将Hive表数据导入Mysql

要先在mysql建好表

create table if not exists student(    classNo varchar ( 50 ),    stuNo   varchar ( 50 ),    score    int )

hive2mysql.json配置文件

{    "job": {        "setting": {            "speed": {                "channel": 3            }        },        "content": [            {                "reader": {                    "name": "hdfsreader",                    "parameter": {                        "path": "/hive/warehouse/home/myhive.db/student/*",                        "defaultFS": "hdfs://192.168.43.10:9000",                        "column": [   {    "index": 0,    "type": "string"   },   {    "index": 1,    "type": "string"   },   {    "index": 2,    "type": "Long"   }                        ],                        "fileType": "text",                        "encoding": "UTF-8",                        "fieldDelimiter": ","                    }                },                "writer": {                    "name": "mysqlwriter",                    "parameter": {                        "writeMode": "insert",                        "username": "root",                        "password": "root",                        "column": ["classNo","stuNo","score"                        ],                        "preSql": ["delete from student"                        ],                        "connection": [{    "jdbcUrl": "jdbc:mysql://192.168.43.10:3306/mytestmysql?useUnicode=true&characterEncoding=utf8",    "table": [        "student"    ]}                        ]                    }                }            }        ]    }}

注意事项:

在Hive的ODS层建表语句中,以“,”为分隔符;fields terminated by ','在DataX的json文件中,也以“,”为分隔符。"fieldDelimiter": "," 与hive表里面的分隔符保持一致即可

由于DataX不能完全支持所有Hive表的数据类型,应将DataX启动文件中的hdfsreader中的column字段的类型改成DataX支持的类型

来源地址:https://blog.csdn.net/qq_35224503/article/details/132008213

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

使用DataX实现mysql与hive数据互相导入导出

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

如何使用Oracle实现数据导入和导出

要使用Oracle实现数据导入和导出,可以使用以下方法:数据导入:使用Oracle SQL Developer工具:可以通过Oracle SQL Developer工具来导入数据。在SQL Developer中,选择要导入数据的表,然后使
如何使用Oracle实现数据导入和导出
2024-03-02

MySQL 实现点餐系统的数据导入导出功能

MySQL 实现点餐系统的数据导入导出功能,需要具体代码示例近年来,随着外卖和点餐平台的发展,点餐系统的使用越来越广泛。在这样的背景下,许多餐厅和餐饮企业都需要一个方便高效的数据导入导出功能,以便管理他们的菜单、订单以及客户信息。本文将介绍
MySQL 实现点餐系统的数据导入导出功能
2023-11-01

如何使用MongoDB实现数据的批量导入、导出功能

如何使用MongoDB实现数据的批量导入、导出功能MongoDB是一种NoSQL数据库,作为一种非关系型数据库,其在数据存储和查询上有着很大的灵活性和性能优势。对于需要批量导入和导出数据的应用场景,MongoDB也提供了相应的工具和接口。本
2023-10-22

redis中使用redis-dump导出、导入、还原数据实例

redis的备份和还原,借助了第三方的工具,redis-dump 1、安装redis-dump[root@localhost tank]# yum install ruby rubygems ruby-devel //安装rubygem
2022-06-04

Oracle数据泵实现不同用户导入导出表级

前言:先认识一个单词,schema:模式。再来了解一个概念。当创建一个用户的时候,会同时创建一个与用户同名的schema,这个schema的官方解释是对象的集合。举个例子,比如说我就是一个用户,叫A,住在某个公寓里,假如我住在4-4
2022-07-19

如何使用SQL语句在MySQL中进行数据导入和导出?

如何使用SQL语句在MySQL中进行数据导入和导出?MySQL是一种广泛使用的关系型数据库管理系统,它提供了一种简洁且强大的 SQL 语言来操作和管理数据。在实际应用中,我们经常需要将数据导入到MySQL数据库中或者将数据库中的数据导出到外
如何使用SQL语句在MySQL中进行数据导入和导出?
2023-12-17

编程热搜

目录