我的编程空间,编程开发者的网络收藏夹
学习永远不晚

如何进行storm1.1.3与kafka1.0.0整合

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

如何进行storm1.1.3与kafka1.0.0整合

本篇文章给大家分享的是有关如何进行storm1.1.3与kafka1.0.0整合,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。

package hgs.core.sk;import java.util.Map;import org.apache.storm.Config;import org.apache.storm.LocalCluster;import org.apache.storm.StormSubmitter;import org.apache.storm.kafka.BrokerHosts;import org.apache.storm.kafka.KafkaSpout;import org.apache.storm.kafka.SpoutConfig;import org.apache.storm.kafka.ZkHosts;import org.apache.storm.task.OutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.TopologyBuilder;import org.apache.storm.topology.base.BaseRichBolt;import org.apache.storm.tuple.Tuple;@SuppressWarnings("deprecation")public class StormKafkaMainTest {public static void main(String[] args) {TopologyBuilder builder = new TopologyBuilder();//zookeeper链接地址BrokerHosts hosts = new ZkHosts("bigdata01:2181,bigdata02:2181,bigdata03:2181");//KafkaSpout需要一个config,参数代表的意义1:zookeeper链接,2:消费kafka的topic,3,4:记录消费offset的zookeeper地址 ,这里会保存在 zookeeper//集群的/test7/consume下面SpoutConfig sconfig = new SpoutConfig(hosts, "test7", "/test7", "consume");//消费的时候忽略offset从头开始消费,这里可以注释掉,因为消费的offset在zookeeper中可以找到sconfig.ignoreZkOffsets=true;//sconfig.scheme = new SchemeAsMultiScheme( new StringScheme() );builder.setSpout("kafkaspout", new KafkaSpout(sconfig), 1);builder.setBolt("mybolt1", new MyboltO(), 1).shuffleGrouping("kafkaspout");     Config config = new Config();     config.setNumWorkers(1);     try {StormSubmitter.submitTopology("storm----kafka--test", config, builder.createTopology());} catch (Exception e) {e.printStackTrace();}      }}class  MyboltO extends  BaseRichBolt{private static final long serialVersionUID = 1L;OutputCollector collector = null;public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;}public void execute(Tuple input) {//这里把消息大一出来,在对应的woker下面的日志可以找到打印的内容//因为得到的内容是byte数组,所以需要转换String out = new String((byte[])input.getValue(0));System.out.println(out);collector.ack(input);}public void declareOutputFields(OutputFieldsDeclarer declarer) {}}
pom.xml文件的依赖
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  <modelVersion>4.0.0</modelVersion>  <groupId>hgs</groupId>  <artifactId>core.sk</artifactId>  <version>1.0.0-SNAPSHOT</version>  <packaging>jar</packaging>  <name>core.sk</name>  <url>http://maven.apache.org</url>  <properties>    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>  </properties>  <dependencies>    <dependency>      <groupId>junit</groupId>      <artifactId>junit</artifactId>      <version>3.8.1</version>      <scope>test</scope>    </dependency>        <dependency>    <groupId>org.apache.storm</groupId>    <artifactId>storm-kafka</artifactId>    <version>1.1.3</version></dependency><dependency>  <groupId>org.apache.storm</groupId>  <artifactId>storm-core</artifactId>  <version>1.1.3</version>  <scope>provided</scope></dependency><dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka_2.11</artifactId>    <version>1.0.0</version>    <exclusions>    <exclusion>          <groupId>org.slf4j</groupId>          <artifactId>slf4j-log4j12</artifactId>        </exclusion>        <exclusion>            <groupId>org.apache.zookeeper</groupId>            <artifactId>zookeeper</artifactId>       </exclusion>    </exclusions></dependency><!-- <dependency>    <groupId>org.apache.storm</groupId>    <artifactId>storm-kafka-monitor</artifactId>    <version>1.2.2</version></dependency> --><!-- <dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka-clients</artifactId>    <version>0.8.2.1</version></dependency> --><dependency>    <groupId>org.clojure</groupId>    <artifactId>clojure</artifactId>    <version>1.7.0</version></dependency><!-- 尝试了很多次 都会有这个错误:java.lang.NullPointerException at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.getOffsetLags(KafkaOffsetLagUtil.java:272)最后修改为kafka相应的kafka-clients版本后问题得到解决,应该是该出的问题--><dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka-clients</artifactId>    <version>1.0.0</version></dependency> </dependencies>        <build>        <plugins>            <plugin>                <artifactId>maven-assembly-plugin</artifactId>                <version>2.2</version>                <configuration>                    <archive>                        <manifest>                            <!-- 我运行这个jar所运行的主类 -->                            <mainClass>hgs.core.sk.StormKafkaMainTest</mainClass>                        </manifest>                    </archive>                    <descriptorRefs>                        <descriptorRef>                            <!-- 必须是这样写 -->                            jar-with-dependencies                        </descriptorRef>                    </descriptorRefs>                </configuration>                                <executions>                    <execution>                        <id>make-assembly</id>                        <phase>package</phase>                        <goals>                            <goal>single</goal>                        </goals>                    </execution>                </executions>            </plugin>                         <plugin>                <groupId>org.apache.maven.plugins</groupId>                <artifactId>maven-compiler-plugin</artifactId>                <configuration>                    <source>1.8</source>                    <target>1.8</target>                </configuration>            </plugin>        </plugins>    </build></project>

以上就是如何进行storm1.1.3与kafka1.0.0整合,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注编程网行业资讯频道。

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

如何进行storm1.1.3与kafka1.0.0整合

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

如何进行storm1.1.3与kafka1.0.0整合

本篇文章给大家分享的是有关如何进行storm1.1.3与kafka1.0.0整合,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。package hgs.core.sk;impor
2023-06-02

如何将spring与quartz进行整合

如何将spring与quartz进行整合?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。第0步:在spring配置包扫描以及在 pom导入包spring.xml:pom.xml1
2023-05-31

springboot如何进行整合mongodb

springboot如何进行整合mongodb,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。准备工作安装 MongoDBjdk 1.8maven 3.0idea环境依赖在p
2023-06-19

如何进行SpringBoot 整合JPA

如何进行SpringBoot 整合JPA,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。JPA全称Java Persistence API.JPA通过JDK 5.
2023-06-05

使用spring如何实现springmvc与mybatis进行整合

使用spring如何实现springmvc与mybatis进行整合?针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。1.jar包 2.引入web.xml文件
2023-05-31

如何对SSM框架进行整合

今天就跟大家聊聊有关如何对SSM框架进行整合,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。SSM(Spring+SpringMVC+Mybatis)是目前较为主流的企业级架构方案,不
2023-05-31

使用MongoDB如何对Spring进行整合

本篇文章给大家分享的是有关使用MongoDB如何对Spring进行整合,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。添加依赖 or
2023-05-31

如何进行SpringBoot整合JWT的实现

这篇文章跟大家分析一下“如何进行SpringBoot整合JWT的实现”。内容详细易懂,对“如何进行SpringBoot整合JWT的实现”感兴趣的朋友可以跟着小编的思路慢慢深入来阅读一下,希望阅读后能够对大家有所帮助。下面跟着小编一起深入学习
2023-06-26

怎么对struts、spring与hibernate进行整合

怎么对struts、spring与hibernate进行整合?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。准备三个框架结合的lib包Spring3结合Struts2的步骤如下:
2023-05-31

使用Spring Boot如何对Mybatis进行整合

今天就跟大家聊聊有关使用Spring Boot如何对Mybatis进行整合,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。依赖配置结合前面的内容,这里我们要嵌入数据库的操作,这里以操作
2023-05-31

利用Android如何实现对 ToolBar进行整合

本篇文章给大家分享的是有关利用Android如何实现对 ToolBar进行整合,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。CustomeToolBar继承原生ToolBarp
2023-05-31

使用springboot如何实现对freemarker进行整合

这篇文章将为大家详细讲解有关使用springboot如何实现对freemarker进行整合,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。前提:开发工具:idea框架:spring boot、
2023-05-31

使用springboot如何实现对 beatlsql进行整合

这篇文章给大家介绍使用springboot如何实现对 beatlsql进行整合,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。BeetSql是一个全功能DAO工具, 同时具有hibernate 优点 & Mybatis优
2023-05-31

如何利用VSTS跟Kubernetes整合进行CI/CD

这篇文章主要讲解了“如何利用VSTS跟Kubernetes整合进行CI/CD”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“如何利用VSTS跟Kubernetes整合进行CI/CD”吧!为什么
2023-06-19

使用Spring4如何实现对Hibernate5进行整合

使用Spring4如何实现对Hibernate5进行整合?很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。Spring与Hiberante整合通过hibernate
2023-05-31

使用spring如何对cxf框架进行整合

使用spring如何对cxf框架进行整合?很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。1.创建动态web项目2.导入cxf和spring相关jar包(CXF核心
2023-05-31

使用SpringBoot如何实现对ElasticSearch进行整合

这篇文章给大家介绍使用SpringBoot如何实现对ElasticSearch进行整合,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。一、实体设计:Tutorial.javapublic class Tutorial i
2023-05-31

使用springboot如何实现对mongodb进行整合

使用springboot如何实现对mongodb进行整合?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。环境依赖在pom文件引入spring-boot-starter-data-
2023-05-31

redis与ssm如何整合

这篇文章主要介绍redis与ssm如何整合,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!SSM+redis整合ssm框架之前已经搭建过了,这里不再做代码复制工作。这里主要是利用redis去做mybatis的二级缓存,
2023-05-30

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录