我的编程空间,编程开发者的网络收藏夹
学习永远不晚

大数据之使用Spark全量抽取MySQL的数据到Hive数据库

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

大数据之使用Spark全量抽取MySQL的数据到Hive数据库

文章目录

前言

一、读题分析

二、使用步骤

1.导入配置文件到pom.xml

2.代码部分

三、重难点分析

总结


前言

本题来源于全国职业技能大赛之大数据技术赛项赛题-离线数据处理-数据抽取(其他暂不透露)

题目:编写Scala代码,使用Spark将MySQL的shtd_industry库中表EnvironmentData,ChangeRecord,BaseMachine,MachineData,ProduceRecord全量抽取到Hive的ods库(需自建)中对应表environmentdata,changerecord,basemachine, machinedata, producerecord中。

以下面题目为例:

抽取MySQL的shtd_industry库中EnvironmentData表的全量数据进入Hive的ods库中表environmentdata,字段排序、类型不变,同时添加静态分区,分区字段类型为String,且值为当前日期的前一天日期(分区字段格式为yyyyMMdd)。并在hive cli执行show partitions ods.environmentdata命令,将结果截图粘贴至对应报告中;


提示:以下是本篇文章正文内容,下面案例可供参考(使用Scala语言编写)

一、读题分析

涉及组件:Spark,Mysql,Hive

涉及知识点:

  1. Spark读取数据库数据
  2. DataFrameAPI的使用(重点)
  3. Spark写入数据库数据
  4. Hive数据库的基本操作

二、使用步骤

1.导入配置文件到pom.xml

                            org.apache.spark            spark-sql_2.11            ${spark.version}                                    org.apache.spark            spark-hive_2.11            ${spark.version}                                    mysql            mysql-connector-java            5.1.37        

2.代码部分

由于不是很难,直接上代码,代码如下(示例):

package A.offlineDataProcessing.shtd_industry.task1_dataExtractionimport org.apache.spark.sql.functions.litimport org.apache.spark.sql.{DataFrame, SparkSession}import java.text.SimpleDateFormatimport java.util.{Calendar, Properties}object SparkToMysqlToHive {  def main(args: Array[String]): Unit = {    // 创建Spark对象会话    val spark = SparkSession.builder()      .appName("MySQL to Hive")      .master("spark://bigdata1:7077")      .enableHiveSupport().getOrCreate()    // 连接MySQL数据库并设置属性    val jdbcUrl = "jdbc:mysql://bigdata1:3306/shtd_industry"    val table = "EnvironmentData"    val properties = new Properties    properties.put("user", "root")    properties.put("password", "123456")    // Read data from MySQL    val df: DataFrame = spark.read.jdbc(jdbcUrl, table, properties)    println("-------------------自定义操作-------------------------")    // Add partition column    val dateFormat = new SimpleDateFormat("yyyyMMdd")    //    第一个getTime返回的是一个 Date 对象    //    第二个 getTime 方法返回的是一个整数值,表示此 Date 对象表示的时间距离标准基准时间(1970年1月1日00:00:00 GMT)的毫秒数。    val yesterday = dateFormat.format(Calendar.getInstance().getTime.getTime - 24 * 60 * 60 * 1000)    //对MySQL来的数据进行withCoulum操作,有就修改,没有就添加    val dfWithPartition: DataFrame = df.withColumn("etldate", lit(yesterday))    println("-------------------写入数据-------------------------")    // Write data to Hive    //    mode模式为覆盖,还有append为追加    //    partitionBy 根据指定列进行分区    //    saveAsTable保存表    dfWithPartition.write.mode("overwrite")      .partitionBy("etldate")      .saveAsTable("ods.environmentdata")  }}

hive数据库相关的操作在这不做演示


三、重难点分析

没有难点,主要涉及能否自定义函数完成任务需求

val dateFormat = new SimpleDateFormat("yyyyMMdd")    //    第一个getTime返回的是一个 Date 对象    //    第二个 getTime 方法返回的是一个整数值,表示此 Date 对象表示的时间距离标准基准时间(1970年1月1日00:00:00 GMT)的毫秒数。    val yesterday = dateFormat.format(Calendar.getInstance().getTime.getTime - 24 * 60 * 60 * 1000)    //对MySQL来的数据进行withCoulum操作,有就修改,没有就添加    val dfWithPartition: DataFrame = df.withColumn("etldate", lit(yesterday))

总结

本文仅仅介绍了Spark读取MySQL的数据到hive数据库的操作,spark提供了许多方法,我们不必写SQL语法就可以直接对数据进行操作,还是很方便的,并且难度也不高(比flink简单)。

如转载请标明出处

来源地址:https://blog.csdn.net/qq_36920766/article/details/129774263

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

大数据之使用Spark全量抽取MySQL的数据到Hive数据库

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

使用.Net Core怎么将大量数据导入至Mysql数据库

这期内容当中小编将会给大家带来有关使用.Net Core怎么将大量数据导入至Mysql数据库,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。订单测试表CREATE TABLE `trade` ( `id`
2023-06-15

如何使用PDO获取MySQL数据库中的数据

使用PDO获取MySQL数据库中的数据有以下几个步骤:建立数据库连接:$host = 'localhost';$dbname = 'my_database';$username = 'm
如何使用PDO获取MySQL数据库中的数据
2024-04-29

使用Neo4j的apoc插件,实现数据从MySQL抽取到Neo4j

1、准备下载apoc插件:apoc-3.5.0.15-all.jar下载地址:https://github.com/neo4j-contrib/neo4j-apoc-procedures/releases/3.4.0.7注意:apoc版本必须和你Neo4j的版
使用Neo4j的apoc插件,实现数据从MySQL抽取到Neo4j
2019-04-19

MySQL数据库备份之mysqldump的使用

原文:https://www.cnblogs.com/tiaopixiaonaofu/p/13976681.html
MySQL数据库备份之mysqldump的使用
2016-12-02

ETL工具之kettle的使用之异构数据抽取MongoDB——>DB2

DB2" src="/upload/image/images/imgsql/4.jpg">环境简介:数据库版本:MongoDB 5.0.4  (查询语句:db.version(); ); DB2 9.7  ;     kettle版本: 9.2实现目标:将Mo
ETL工具之kettle的使用之异构数据抽取MongoDB——>DB2
2016-03-18

Python中如何建立与MySQL数据库的安全连接并进行数据交互?(如何使用Python安全地连接到MySQL数据库并执行数据操作?)

本文章详细讲解了如何使用Python安全地连接MySQL数据库并进行数据操作。为了建立安全连接,需配置数据库访问权限、使用SSL加密和密码。Python中的连接配置可通过mysql.connector库实现。数据交互操作包括创建游标、执行查询、获取结果、修改数据和提交更改。安全性最佳实践建议遵循最小权限原则、定期更改密码、使用防火墙、IDS和保持软件更新。
Python中如何建立与MySQL数据库的安全连接并进行数据交互?(如何使用Python安全地连接到MySQL数据库并执行数据操作?)
2024-04-02

mysql数据库操作_高手进阶常用的sql命令语句大全

mysql数据库操作sql命令语句大全:三表连表查询、更新时批量替换字段部分字符、判断某一张表是否存在、自动增长恢复从1开始、查询重复记录、更新时字段值等于原值加上一个字符串、更新某字段为随机值、复制表数据到另一个表、创建表时拷贝其他表的数
2022-11-20

【MySQL】使用LOAD DATA INFILE命令加载数据文件到MySQL数据库的方法和常见错误及解决方法

文章目录 【MySQL】使用LOAD DATA INFILE命令加载数据文件到MySQL数据库的方法和常见错误及解决方法LOAD DATA INFILE的语法详细1.创建(选择)目标数据库和表2.将数据从 CSV 文件导入已创建的表
2023-08-17

mysql数据库操作_高手进阶常用的sql命令语句大全 原创

mysql数据库操作sql命令语句大全:三表连表查询、更新时批量替换字段部分字符、判断某一张表是否存在、自动增长恢复从1开始、查询重复记录、更新时字段值等于原值加上一个字符串、更新某字段为随机值、复制表数据到另一个表、创建表时拷贝其他表的数据和结构...
2022-11-21

如何使用 JDBC API 选择或转移到 MySQL 中的另一个数据库?

一般来说,您可以使用 USE 查询更改 MySQL 中的当前数据库。语法Use DatabaseName;要使用 JDBC API 更改当前数据库,您需要:注册驱动程序 :使用DriverManager类的registerDriver()方
2023-10-22

编程热搜

目录