Hadoop生态之分析MapReduce及Hive
这篇文章主要讲解了“Hadoop生态之分析MapReduce及Hive”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Hadoop生态之分析MapReduce及Hive”吧!
1.计算框架
Hadoop 是一个计算框架,目前大型数据计算框架常用的大致有五种:
仅批处理框架:Apache hadoop.
仅流处理框架:Apache Storm、Apache Samza.
混合框架:Apache Spark、Apache Flink.
这其中名气最大、使用最广的当属 Hadoop 和 Spark。
虽然两者都被称为大数据框架,但实际层级不同。Hadoop 是一个分布式数据基础设施,包括计算框架 MapReduce、分布式文件系统 HDFS、YARN 等。而Spark 是专门用来对分布式存储的大数据的处理工具,并不会进行数据存储,更像是 MapReduce 的替代。
在使用场景上,Hadoop 主要用于离线数据计算,Spark更适用于需要精准实时的场景。
2. MapReduce
2.1 MapReduce 是什么
一个基于 Java 的并行分布式计算框架。
前文有提到 HDFS 提供了基于主从结构的分布式文件系统,基于此存储服务支持,MapReduce 可以实现任务的分发、跟踪、执行等工作,并收集结果。
2.2 MapReduce 组成
MapReduce 主要思想讲的通俗一点就是将一个大的计算拆分成 Map(映射)和 Reduce(化简)。说到这里,其实 JAVA8 在引入 Lambda 后,也有 map 和 reduce 方法。下面是一段 Java 中的用法:
List<Integer> nums = Arrays.asList(1, 2, 3); List<Integer> doubleNums = nums.stream().map(number -> number * 2).collect(Collectors.toList()); 结果:[2,4,6] Optional<Integer> sum = nums.stream().reduce(Integer::sum); 结果:[6]
代码很简单,map 负责归类,reduce 负责计算。而 Hadoop 中的 MapReduce 也有异曲同工之处。
下面结合官方案例 WordCount 进行分析:
public class WordCount { // Mapper泛型类,4个参数分别代表输入键、值,输出键、值类型 public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { // 字符解析 StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { // nextToken():返回从当前位置到下一个分隔符的字符串 word.set(itr.nextToken()); context.write(word, one); } } } // Reducer同样也是四个参数 public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException,InterruptedException { int sum = 0; // 循环values,并记录“单词”个数 for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
在这段代码中,不难看出程序核心是 map 函数和 reduce 函数。是否 MapReduce 就是由这两者组成的?接着往下看。
2.3 Map 和 Reduce
2.3.1 Map
在 WordCount 案例中,明显看到 map 函数的输入主要是一个
Context 在这里暂时性忽略,其是 Mapper 类的内部抽象类,一般计算中不会用到,可以先当做“上下文”理解。
map 函数计算过程是: 将这行文本中的单词提取出来,针对每个单词输出一个
2.3.2 Reduce
接着就来看看 reduce ,这里输入参数 Values 就是上面提到的由很多个 1 组成的集合,而 Key 就是具体“单词” word。
它的计算过程是: 将集合里的1求和,再将单词(word)与这个和(sum)组成一个
假设有两个数据块的文本数据需要进行词频统计,MapReduce 计算过程如下图所示:
到这都很容易理解,毕竟只是个 HelloWorld 的例子~,但整个MapReduce过程中最关键的部分其实是在 map 到 reduce 之间。
还拿上面例子来说:统计相同单词在所有输入数据中出现的次数,一个 Map 只能处理一部分数据,而热点单词就很可能会出现在所有 Map 中了,意味着同一单词必须要合并到一起统计才能得到正确结果。这种数据关联几乎在所有的大数据计算场景都需要处理,如果是例子这种的当然只对 Key 合并就OK了,但类似数据库 join 操作这种较复杂的,就需对两种类型(或更多)的数据依据 Key 关联。
这个数据关联操作在 MapReduce中的叫做:shuffle。
2.4 shuffle
shuffle 从字面意思来看,洗牌。下面是一个完整的MR过程,看一看如何洗牌。
先看左半边
1. 从 HDFS 中读取数据,输入数据块到一个个的 map,其中 map 完成计算时,计算结果会存储到本地文件系统。而当 map 快要进行完时,就会启动 shuffle 过程。
2. 如图,shuffle 也可分为两种,在Map端的是 Map shuffle。大致过程为:Map 任务进程会调用一个 Partitioner 接口,对 Map 产生的每个
这里就实现了对 Map 结果的分区、排序、分割,以及将同一分区的输出合并写入磁盘,得到一个分区有序的文件。这样不管 Map 在哪个服务器节点,相同的 Key 一定会被发送给相同 Reduce 进程。Reduce 进程对收到的
再看右半边
1. Reduce shuffle,又可分为复制 Map 输出、排序合并两阶段。
Copy:Reduce 任务从各个 Map 任务拖取数据后,通知父 TaskTracker 状态已更新,TaskTracker 通知 JobTracker。Reduce 会定期向JobTracker 获取 Map 的输出位置,一旦拿到位置,Reduce 任务会从此输出对应的 TaskTracker 上复制输出到本地,不会等到所有的Map任务结束。
Merge sort:
Copy 的数据先放入内存缓冲区,若缓冲区放得下就把数据写入内存,即内存到内存 merge。
Reduce 向每个 Map 去拖取数据,内存中每个 Map 对应一块数据,当内存缓存区中存储的数据达到一定程度,开启内存中 merge,把内存中数据merge 输出到磁盘文件中,即内存到磁盘 merge。
当属于该 reduce 的 map 输出全部拷贝完成,会在 reduce 上生成多个文件,执行合并操作,即磁盘到磁盘 merge。此刻 Map 的输出数据已经是有序的,Merge 进行一次合并排序,所谓 Reduce 端的 sort 过程就是这个合并的过程。
2. 经过上一步Reduce shuffle后,reduce进行最后的计算,将输出写入HDFS中。
以上便是 shuffle 大致四个步骤,关键是 map 输出的 shuffle 到哪个 Reduce 进程,它由 Partitioner 来实现,MapReduce 框架默认的 Partitioner 用 Key 哈希值对 Reduce 任务数量取模,相同 Key 会落在相同的 Reduce 任务 ID 上。
public int getPartition(K2 key, V2 value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; }
如果对 Shuffle 总结一句话: 分布式计算将不同服务器中的数据合并到一起进行后续计算的过程。
shuffle 是大数据计算过程中神奇的地方,不管是 MapReduce 还是 Spark,只要是大数据批处理计算,一定会有 shuffle 过程,只有让数据关联起来,它的内在关系和价值才会呈现。
3. Hive
上一部分介绍了 MapReduce,接下来简单谈谈 Hive .
我觉得任何一项技术的出现都是为了解决某类问题, MapReduce 毫无疑问简化了大数据开发的编程难度。但实际上进行数据计算更常用的手段可能是 SQL,那么有没有办法直接运行 SQL ?
3.1 Hive是什么
基于Hadoop的一个数据仓库系统,定义了一种类SQL查询语言:Hive SQL。
这里有一个名词 数据仓库,数据仓库是指:面向主题(Subject Oriented)、集成(Integrated)、相对稳定(Non-Volatile)、反应历史变化(Time Variant)的数据集合,用于支持管理决策。
这么说可能有点抽象,分解一下:
主题:数据仓库针对某个主题来进行组织,指使用数据仓库决策时所关心的重点方面。比如订阅分析就可以当做一个主题。
集成:数据仓库要将多个数据源数据存到一起,但数据以前的存储方式不同,要经过抽取、清洗、转换。(也就是 ETL)
稳定:保存的数据是一系列历史快照,不允许修改,只能分析。
时变:会定期接收到新的数据,反应出新的数据变化。
现在再看下定义:数据仓库是将多个数据源的数据按照一定的主题集成,进行抽取、清洗、转换。且处理整合后的数据不允许随意修改,只能分析,还需定期更新。
3.2 为什么是 Hive
了解了 Hive 的基础定义,想一下:一个依赖于 HDFS 的数据仓库在 Hadoop 环境中可以扮演什么角色?
前面说到,可不可以让 SQL 直接运行在 Hadoop 平台,这里的答案便是 Hive。它可以将 Hive SQL 转换为 MapReduce 程序运行。
Hive 初期版本默认 Hive on Mapreduce
启动 hive 前通常要先启动 hdfs 和 yarn, 同时一般需要配置 MySQL,Hive 依赖于 HDFS 的数据存储,但为了能操作 HDFS 上的数据集,要知道数据切分格式、存储类型、地址等。这些信息通过一张表存储,称为元数据,可以存储到 MySQL 中。
现在来看下 Hive 的部分命令
新建数据库:create database xxx;
删除数据库:drop database xxx;
建表:
create table table_name(col_name data_type);
Hive 的表有两个概念:**内部表和外部表**。默认内部表,简单来说,内部表数据存储在每个表相应的HDFS目录下。外部表的数据存在别处,要删除这个外部表,该外部表所指向的数据是不会被删除的,只会删除外部表对应的元数据。
查询:
select * from t_table **where** a<100 **and** b>1000;
连接查询:
select a.*,b.* from t_a a join t_b b on a.name=b.name;
看到这里,可能会觉得我在写 SQL, 没错,对于熟悉 SQL 的人来说,Hive 是非常易于上手的。
3.3 HIVE SQL To MapReduce
前面说到 HQL 可以‘转换’为 MapReduce, 下面就来看看:一个 HQL 是如何转化为 MapReduce 的Hive的基础架构:
通过 Client 向 Hive 提交 SQL 命令。如果是 DDL,Hive 就会通过执行引擎 Driver 将数据表的信息记录在 Metastore 元数据组件中,这个组件通常用一个关系数据库实现,记录表名、字段名、字段类型、关联 HDFS 文件路径等 Meta 信息(元信息)。
如果是DQL,Driver 就会将该语句提交给自己的编译器 进行语法分析、解析、优化等一系列操作,最后生成一个 MapReduce 执行计划。再根据执行计划生成一个 MapReduce 的作业,提交给 Hadoop 的 MapReduce 计算框架处理。
比如输入一条 select xxx from a ; 其执行顺序为:首先在 metastore 查询--> sql 解析--> 查询优化---> 物理计划--> 执行 MapReduce。
感谢各位的阅读,以上就是“Hadoop生态之分析MapReduce及Hive”的内容了,经过本文的学习后,相信大家对Hadoop生态之分析MapReduce及Hive这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是编程网,小编将为大家推送更多相关知识点的文章,欢迎关注!
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341