我的编程空间,编程开发者的网络收藏夹
学习永远不晚

基于Storm的WordCount

短信预约 信息系统项目管理师 报名、考试、查分时间动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

基于Storm的WordCount

Storm WordCount 工作过程

Storm 版本:
1、Spout 从外部数据源中读取数据,随机发送一个元组对象出去;
2、SplitBolt 接收 Spout 中输出的元组对象,将元组中的数据切分成单词,并将切分后的单词发射出去;
3、WordCountBolt 接收 SplitBolt 中输出的单词数组,对里面单词的频率进行累加,将累加后的结果输出。

Java 版本:
1、读取文件中的数据,一行一行的读取;
2、将读到的数据进行切割;
3、对切割后的数组中的单词进行计算。

Hadoop 版本:
1、按行读取文件中的数据;
2、在 Mapper()函数中对每一行的数据进行切割,并输出切割后的数据数组;
3、接收 Mapper()中输出的数据数组,在 Reducer()函数中对数组中的单词进行计算,将计算后的统计结果输出。

源代码

storm的配置、eclipse里maven的配置以及创建项目部分省略。

Mainclass

package com.test.stormwordcount;
import backtype.storm.Config; 
import backtype.storm.LocalCluster; 
import backtype.storm.StormSubmitter; 
import backtype.storm.generated.AlreadyAliveException; 
import backtype.storm.generated.InvalidTopologyException; 
import backtype.storm.topology.TopologyBuilder; 
import backtype.storm.tuple.Fields; 

public class MainClass { 

    public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {         
        //创建一个 TopologyBuilder         
        TopologyBuilder tb = new TopologyBuilder();         
        tb.setSpout("SpoutBolt", new SpoutBolt(), 2);         tb.setBolt("SplitBolt", new SplitBolt(), 2).shuffleGrouping("SpoutBolt");         
        tb.setBolt("CountBolt", new CountBolt(), 4).fieldsGrouping("SplitBolt", new Fields("word"));         
        //创建配置         
        Config conf = new Config();         
        //设置 worker 数量         
        conf.setNumWorkers(2);         
        //提交任务         
        //集群提交         
        //StormSubmitter.submitTopology("myWordcount", conf, tb.createTopology());         
        //本地提交         
        LocalCluster localCluster = new LocalCluster();         
        localCluster.submitTopology("myWordcount", conf, tb.createTopology()); 
    }  
} 

SplitBolt 部分

package com.test.stormwordcount;
import java.util.Map; 
import backtype.storm.task.OutputCollector; 
import backtype.storm.task.TopologyContext; 
import backtype.storm.topology.OutputFieldsDeclarer; 
import backtype.storm.topology.base.BaseRichBolt; 
import backtype.storm.tuple.Fields; 
import backtype.storm.tuple.Tuple; 
import backtype.storm.tuple.Values; 

public class SplitBolt extends BaseRichBolt{      
    OutputCollector collector; 

         
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {         
        this.collector = collector;     
        } 

         
    public void execute(Tuple input) {         
        String line = input.getString(0);         
        String[] split = line.split(" ");         
        for (String word : split) {             
            collector.emit(new Values(word));         
            }     
        } 

         
    public void declareOutputFields(OutputFieldsDeclarer declarer) {         
        declarer.declare(new Fields("word"));     
        } 
} 

CountBolt 部分

package com.test.stormwordcount;
import java.util.HashMap; 
import java.util.Map; 
import backtype.storm.task.OutputCollector; 
import backtype.storm.task.TopologyContext; 
import backtype.storm.topology.OutputFieldsDeclarer; 
import backtype.storm.topology.base.BaseRichBolt; 
import backtype.storm.tuple.Tuple; 

public class CountBolt extends BaseRichBolt{ 

    OutputCollector collector;
    Map map = new HashMap(); 

         
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {         
        this.collector = collector;     
        } 


         
public void execute(Tuple input) {         
    String word = input.getString(0);         
    if(map.containsKey(word)){             
    Integer c = map.get(word);             
        map.put(word, c+1);         
        }else{             
        map.put(word, 1);         
        }         
    //测试输出         
    System.out.println("结果:"+map);     
    } 

         
public void declareOutputFields(OutputFieldsDeclarer declarer) {     
    
} 
} 

SpoutBolt 部分

package com.test.stormwordcount;
import java.util.Map; 
import backtype.storm.spout.SpoutOutputCollector; 
import backtype.storm.task.TopologyContext; 
import backtype.storm.topology.OutputFieldsDeclarer; 
import backtype.storm.topology.base.BaseRichSpout; 
import backtype.storm.tuple.Fields; 
import backtype.storm.tuple.Values; 

public class SpoutBolt extends BaseRichSpout{ 

    SpoutOutputCollector collector;
         
    public void open(Map map, TopologyContext context, SpoutOutputCollector collector) {         
        this.collector = collector;     
        } 

         
    public void nextTuple() {         
        collector.emit(new Values("hello world this is a test"));     
        } 

         
    public void declareOutputFields(OutputFieldsDeclarer declarer) {         
        declarer.declare(new Fields("test"));     
        } 
} 

POM.XML 文件内容


4.0.0

com.test
stormwordcount
0.9.6
jar

stormwordcount
http://maven.apache.org


    UTF-8


    
        junit
        junit
        3.8.1
        test
    
    
        org.apache.storm
        storm-core
        0.9.6
    


    
        
            maven-assembly-plugin
            
                
                    jar-with-dependencies
                
                
                    
                        com.test.stormwordcount.MainClass
                    
                
            
            
                
                    make-assembly
                    package
                    
                        single
                    
                
            
        
        
            org.apache.maven.plugins
            maven-compiler-plugin
            
                1.7
                1.7
            
        
    

遇到的问题

基于Storm的WordCount需要eclipse安装了maven插件,之前的大数据实践安装的eclipse版本为Eclipse IDE for Eclipse Committers4.5.2,这个版本不自带maven插件,后续安装失败了几次(网上很多的教程都已经失效),这里分享一下我成功安装的方法:
使用链接下载,Help->Install New SoftWare

点击Add,name输入随意,在location输入下载eclipse的maven插件,下载地址可以这样获取
点击连接:http://www.eclipse.org/m2e/index.html 进入网站后点击download,拉到最下面可以看到很多eclipse maven插件的版本和发布时间,选在适合eclipse的版本复制链接即可。建议取消选中Contack all update sites during install to find required software(耗时太久)。

但是安装成功后还是无法配置(这里原因不太清楚,没找到解决办法),就直接上官网换成自己maven插件的JavaEE IDE了...

后续的maven的配置这些都比较顺利,第一次创建maven-archetype-quickstat项目报错,试了网上很多办法都还没成功,然后打开 Windows->Preferencs->Maven->Installation发现之前配置了的maven的安装路径没了...重新配置了下就可以创建项目了。

最后运行成功的结果:

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

基于Storm的WordCount

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

基于Storm的WordCount

Storm WordCount 工作过程Storm 版本:1、Spout 从外部数据源中读取数据,随机发送一个元组对象出去;2、SplitBolt 接收 Spout 中输出的元组对象,将元组中的数据切分成单词,并将切分后的单词发射出去;3、WordCountB
基于Storm的WordCount
2016-06-19

如何分析基于Spark Streaming Direct方式的WordCount

这期内容当中小编将会给大家带来有关如何分析基于Spark Streaming Direct方式的WordCount,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。1.前提 a. flume 收集--》flu
2023-06-03

基于Django 的 FreeSwitc

YouPBX 是一个强大 FreeSwift (电话软交换系统) 的管理GUI系统,基于Django开发,功能全面,体验友好,可以基于此项目做一个完善的IPPBX系统、呼叫中心应用等 https://github.com/JoneXiong
2023-01-30

基于python2.7的opencv3.

当初一开始就是如此设想,通过opencv获取视频(摄像头)的图片帧,图像处理识别之后加工(绘制)图片,并把该图片作为视频流的一帧推送rtmp,然后远端直播,之间走了很多很多弯路(甚至想要手动实现rtmp推流)也就是了,搜索了一两周的攻略,断
2023-01-31

Django 基于 jquery 的 a

<1> $.ajax的两种写法: $.ajax("url",{}) $.ajax({})<2> $.ajax的基本使用$.ajax({ url:"//", data:{a:1,b:2}, type:"G
2023-01-31

基于 Python 和 Pandas 的

Pandas 是 Python 的一个模块(module), 我们将用 Python 完成接下来的数据分析的学习. Pandas 模块是一个高性能,高效率和高水平的数据分析库.从本质上讲,它非常像操作电子表格的无头版本,如Excel. 我们
2023-01-30

基于python 3 的selenium

本文主要是运用selenium模块模拟登陆新浪微博python webdriver环境搭建教程:http://blog.csdn.net/nanjunxiao/article/details/7957326# -*- coding: utf
2023-01-31

如何配置LINUX系统apache基于IP,基于port和基于域名的三种虚拟主机

这篇文章主要介绍“如何配置LINUX系统apache基于IP,基于port和基于域名的三种虚拟主机”,在日常操作中,相信很多人在如何配置LINUX系统apache基于IP,基于port和基于域名的三种虚拟主机问题上存在疑惑,小编查阅了各式资
2023-06-10

基于tkinter的GUI编程

tkinter:tkinter是绑定了Python的TKGUI工具集,就是Python包装的Tcl代码,通过内嵌在Python解释器内部的Tcl解释器实现的,它是Python标准库的一部分,所以使用它进行GUI编程不需要另外安装第三方库的。
2023-01-31

H3C 基于IP的限速

Eudemon系列防火墙可以设置流量监管,用QOS CAR做限速。不过好像不支持每IP限速,只能针对IP地址段 去做限制,不能做的太细致。  举个例子 — 比如上网的网段IP地址范围是“192.168.1.0/24”,防火墙连接内网的端口号
2023-01-31

基于C# 的 WinForm 开发

WinForm 是一种用于开发 Windows 应用程序的技术,可以使用 C# 语言进行开发。以下是基于 C# 的 WinForm 开发的一些关键点:1. Visual Studio:使用 Visual Studio 是进行 WinForm
2023-10-12

编程热搜

目录