我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Java大数据处理的核心技术MapReduce框架

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Java大数据处理的核心技术MapReduce框架

MapReduce框架

1、框架图

Input→Mapper→shuffle→Reducer→Output

2、Input数据输入

2.1概念

(1)数据块(Block),物理存储,Block是HDFS物理上把文件分成一块一块。数据块是HDFS存储数据单位。

(2)数据切片,逻辑存储,数据切片是MapReduce程序j最小计算输入数据的单位。一个切片会启动一个MapTask

2.2数据切片与MapTask并行度

(1)一个Job的Map阶段并行度由客户端在提交job时的切片数决定;

(2)每一个split切片分配一个MapTask并行实例片

(3)切片是针对每一个文件单独切片

(4)默认情况下,切片大小等于Block Size块大小

MapTask数据=输入文件切片数据

2.3切片过程

(1)程序先找到数据存储目录

(2)开始遍历处理目录下的每一个文件

A、按每个文件进行切片

B、判断文件是否可以切片(snappy、Gzip压缩不能切)

(3)遍历第一个文件

获取文件大小→计算切片大小→开始切片→将切片信息写入切片规划文件中→提交切片规划文件到yarn

A、获取文件大小:fs.size(文件)

B、计算切片大小:设置minsize、maxsize、blocksize

mapreduce.input.fileinputformat.split.minsize=1 默认值为1
mapreduce.input.fileinputformat.split.maxsize=Long.MAXValue 默认值Long.MAXValue

计算公式 :computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))

最大取最小,最小取最大。因此切片大小默认与 HDFS 的 block 保持一致。

maxsize(切片最大值): 参数如果调到比 blocksize 小,则会让切片变小,而且就等于配置的这个参数的值。minsize(切片最小值): 参数调的比 blockSize 大,则可以让切片变得比 blocksize 还大。

C、开始切片:getSplit()

每次切片时,都要判断剩下的是否大于块的1.1倍,不大于1.1倍就切分成一块切片

D、将切片信息写入切片规划文件中:job.split

记录起始位置、长度、所在切点列表等

E、提交切片规划文件到yarn

yarn上MRAppMaster根据切片规划计算MapTask数

三个文件:切片规则文件(job.split)、参数配置文件(job.xml)、程序jar包

2.4类图

2.5TextInputFormat

(1)是FileInputFormat默认的实现类

(2)按行读取每条记录,Key为该行在整个文件的超始字节偏移量,LongWritable型。Value为行内容,不包括任何终止符(换行符、回车符),Text型。

2.6CombineTextInputFormat

(1)应用场景:用于小文件过多的场景,将多个小文件从逻辑上规划到一个切片中, 这样多个小文件交给一个MapTask处理;

(2)虚拟存储切片最大值默认4M,最好根据实际的小文件大小来设置

CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);

2.7Read阶段

MapTask通过InputFormat获得RecordReader,从输入InputSplit中解析出一个个Key/Value

3、Map阶段

将解析出来的Key/Value交给用户编写的map()函数处理,并产生一系列新的Key/Value

4、Collect收集阶段

(1)map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果

(2)在该函数内部,它会将生成的Key/Value分区(调用Partitioner),并写入一个环形内存缓冲区中

5、Shuffle阶段

(1)map方法之后 ,reduce方法之前的数据处理过程称之为Shuffle;

(2)环形内存缓冲区

(3)Partition分区-默认分区

A、根据需求按照条件输出到不同分区

B、默认分区:根据key的hashcode对ReduceTask数理取模

C、默认的ReduceTask的数量为1,对应参数mapreduce.job.reduces

(4)Partition分区-自定义Partitioner

A、自定义类继承Partitioner<key,value>,重写getPartition()方法

public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
    @Override
    public int getPartition(Text text, FlowBean flowBean, int i) {
        // 获取手机号前三位prePhone
        String phone = text.toString();
        String prePhone = phone.substring(0, 3);
        //定义一个分区号变量partition,根据prePhone 设置分区号
        int partition;
        if ("136".equals(prePhone)) {
            partition = 0;
        } else if ("137".equals(prePhone)) {
            partition = 1;
        } else if ("138".equals(prePhone)) {
            partition = 2;
        } else if ("139".equals(prePhone)) {
            partition = 3;
        } else {
            partition = 4;
        }
        //最后返回分区号partition
        return partition;
    }
}

B、在job驱动中,设置自定义partitioner,job.setPartitionerClass(自定义分区类.class)

C、自定义Partition后,要根据自定义Partitioner的逻辑设置相应的数量的ReduceTask:job.setNumReduceTasks(数量)

public class FlowDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1.获取job对象
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        // 2.关联本Driver类
        job.setJarByClass(FlowDriver.class);
        // 3.关联Mapper和Reducer类
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);
        // 4.设置Map端输出KV类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        // 5.设置最终输出KV类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        // 6.设置程序的输入和输出路径
        FileInputFormat.setInputPaths(job, new Path("C:\\install\\temp\\input\\input02\\phone_data.txt"));
        FileOutputFormat.setOutputPath(job, new Path("C:\\install\\temp\\output\\output06"));
        // 8.指定自定义分区器
        job.setPartitionerClass(ProvincePartitioner.class);
        // 9.同时也指定相应数量的ReduceTask--对应的参数mapreduce.job.reduces,默认为1
        job.setNumReduceTasks(5);
        // 7.提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

(5)Partition分区总结

A、如果ReduceTask数量 > getPartition()结果数,则会多产生几个空的输出文件

B、如果 1 <ReduceTask数量 <getPartition()结果数,则有一部分分区数据无处安放,会异常

C、如果ReduceTask数量=1,则不管MapTask输出多少个分区文件,最终结果只有一个ReduceTask,只会产生一个结果文件。(分区数不大于1,不会走默认hash分区器和自定义分区器,直接返回)

D、分区号必须从0开始,逐一累加

(6)排序

A、排序是MapReduce框架中最重要的操作之一

B、MapTask和ReduceTask均会对数据按key进行排序,该 操作属于Hadoop的默认行为 。任务应用程序中的数据均会被排序,而不管逻辑上是否需要。

C、默认排序是按照字典顺序排序,排序的方法为快速排序

D、排序分类:部分排序、全排序、辅助排序、二次排序

(7)溢写

A、当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件

(8)Combiner

A、Combiner是MR程序中Mapper和Reducer之外的一种组件

B、Combiner的父类是Reducer

C、Combiner与Reducer区别:在于运行的位置 ,Combiner是在每一个MapTask所在节点运行,即在分区、排序后准备溢写前可以进行combiner。Reducer是接收全局所有MapTask输出结果。

D、Combiner的意义是对每一个MapTask的输出进行局部汇总,以减少网络传输量

E、Combiner应用前提是不影响最终的业务逻辑

public class WordCountCombiner extends Reducer<Text, IntWritable, Text,IntWritable> {
    private IntWritable outV = new IntWritable();
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        outV.set(sum);
        context.write(key,outV);
    }
}
public class WordCountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 一获取二关联三设置一提交
        // 1.获取配置信息及Job对象
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        // 2.关联本Driver程序的类
        job.setJarByClass(WordCountDriver.class);
        // 3.关联Mapper和Reducer的业务类
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        // 4.设置Mapper输出的KV类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 5.设置最终输出的KV类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 6.设置输入和输出路径
        //FileInputFormat.setInputPaths(job, new Path(args[0]));
        //FileOutputFormat.setOutputPath(job, new Path(args[1]));
        FileInputFormat.setInputPaths(job, new Path("C:\\install\\temp\\input\\hadoop.txt"));
        FileOutputFormat.setOutputPath(job, new Path("C:\\install\\temp\\output\\output01-2"));
        // 8.设置Combiner类--方式一
        //job.setCombinerClass(WordCountCombiner.class);
        // 方式二:其新建的WordCountCombiner的reduce方法处理与正常的WordCountReducer中的reduce方法处理逻辑是一样
        // 因此可以直接用此类作为combiner类
        job.setCombinerClass(WordCountReducer.class);
        // 9.设置ReduceTasks的数量--这样就没有reduce阶段,就不会有shuffle,Combiner也就没有用,直接由map输出,
        // 文件名为part-m-00000,就是不part-r-00000,两者结果是不一样的
        // 即如果没有reduce阶段,即使设置了combiner也不起作用
        // job.setNumReduceTasks(0);
        // 7.提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1); // 0-正常退出 非0(1)异常终止(结束)
    }
}

(9)Meger

A、MapTask以分区为单位进行合并,对所有临时文件合并成一个大文件(output/file.out),同时生成相应索引文件(output/file.out.index)

B、对某个分区采用多轮递归合并的方式,每次合并默认10个文件,每个MapTask最终得到一个大文件

6、ReduceTask

(1)Copy阶段

ReduceTask从各个MapTask上远程拷贝一片数据,如大小超过阀值,则写到磁盘上,否则直接放在内存中

(2)Sort阶段

由于各个MapTask已经实现了对自己处理结果进行了局部排序,因此ReduceTask只需要对所有数据进行一次归并排序即可

(3)Reducer阶段

reduce()函数将计算结果写到HDFS上

(4)其他

A、ReduceTask数量默认是1,可手动设置job.setNumReduceTasks(数量)

B、ReduceTask=0,表示没有reduce阶段,输出文件个数和Map个数一致

C、如果数据分布不均匀,就会在reduce阶段产生数据倾斜

D、ReduceTask数量并不能任意设置,要考虑业务逻辑需求,具体多少个ReduceTask,需要根据集群性能确定

E、如果分区数不是1,但ReduceTask为1,不执行分区过程(执行分区的前提是判断ReduceNum个数是否大于1)

到此这篇关于Java大数据处理的核心技术MapReduce框架的文章就介绍到这了,更多相关Java MapReduce框架内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Java大数据处理的核心技术MapReduce框架

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

Java大数据处理的核心技术MapReduce框架

MapReduce是一种分布式计算框架,适用于大规模的数据处理。它将大数据分成多个小数据块,通过Map和Reduce两个阶段对数据进行处理和分析。MapReduce框架具有可靠、高效、可扩展等特点,已经成为大数据处理的核心技术
2023-05-19

C++技术中的大数据处理:如何使用MapReduce框架进行分布式大数据处理?

通过使用 c++++ 中的 hadoop mapreduce 框架,可以实现以下大数据处理步骤:1. 将数据映射到键值对;2. 汇总或处理具有相同键的值。该框架包括 mapper 和 reducer 类,用于分别执行映射和汇总阶段。C++
C++技术中的大数据处理:如何使用MapReduce框架进行分布式大数据处理?
2024-05-12

大数据处理领域的经典框架:MapReduce详解与应用

MapReduce是一种经典的大数据处理框架,最早由Google提出,并在后来由Apache Hadoop项目开发和推广。MapReduce的设计目标是为了方便并行处理大规模数据集。MapReduce框架分为两个主要步骤:Map和Reduc
2023-10-11

C++技术中的大数据处理:如何使用第三方库和框架简化大数据处理?

使用第三方库(如 apac++he hadoop 和 apache spark)以及框架在 c++ 中处理大数据变得更加容易,从而提高了开发效率、性能和可扩展性。具体来说:第三方库提供处理海量数据集的强大功能,例如 hadoop 和 spa
C++技术中的大数据处理:如何使用第三方库和框架简化大数据处理?
2024-05-11

C++技术中的大数据处理:如何采用流处理技术处理大数据流?

流处理技术用于大数据处理流处理是一种即时处理数据流的技术。在 c++++ 中,apache kafka 可用于流处理。流处理提供实时数据处理、可伸缩性和容错性。本例使用 apache kafka 从 kafka 主题读取数据并计算平均值。C
C++技术中的大数据处理:如何采用流处理技术处理大数据流?
2024-05-11

​能处理大数据的技术有哪些

小编给大家分享一下能处理大数据的技术有哪些,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!能处理大数据的技术:Hadoop离线计算 spark实时计算 stro
2023-06-02

C++技术中的大数据处理:如何利用人工智能技术增强大数据处理能力?

在 c++++ 中处理大数据时,我们可以利用人工智能 (ai) 技术来增强处理能力,包括集成机器学习 (ml)、深度学习 (dl) 和自然语言处理 (nlp) 算法。通过集成 ai,我们可以提高预测和分类的准确性、自动化繁琐任务并增强对数据
C++技术中的大数据处理:如何利用人工智能技术增强大数据处理能力?
2024-05-11

iterate在大数据批处理框架中的优化

在大数据批处理框架中,如Apache Hadoop和Spark,iterate函数或类似的概念通常用于迭代数据集。优化iterate函数的性能可以显著提高数据处理速度。以下是一些建议:使用更高效的数据结构:根据你的需求选择合适的数据结构。例
iterate在大数据批处理框架中的优化
2024-09-22

Java Map技术大揭秘,高效处理数据的不二法门

Java Map技术是一种高效处理数据的数据结构,它允许用户根据键来快速查找和检索值。它广泛应用于各种场景,例如缓存、数据库索引、查找表等。
Java Map技术大揭秘,高效处理数据的不二法门
2024-02-05

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录