我的编程空间,编程开发者的网络收藏夹
学习永远不晚

kafka安装部署超详细步骤

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

kafka安装部署超详细步骤

概述

Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。

主要应用场景是:日志收集系统和消息系统。

Kafka主要设计目标如下:

  • 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能。
  • 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。
  • 支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输。
  • 同时支持离线数据处理和实时数据处理。
  • Scale out:支持在线水平扩展

Step 1: 下载代码

你可以登录Apache kafka 官方下载。
http://kafka.apache.org/downloads.html
备注:2.11-1.1.0版本才与JDK1.7兼容,否则更高版本需要JDK1.8

在这里插入图片描述

Step 2: 启动服务

运行kafka需要使用Zookeeper,所以你需要先启动Zookeeper,如果你没有Zookeeper,你可以使用kafka自带打包和配置好的Zookeeper(PS:在kafka包里)。


//这是前台启动,启动以后,当前就无法进行其他操作(不推荐)
./zookeeper-server-start.sh ../config/zookeeper.properties

//后台启动(推荐)
./zookeeper-server-start.sh ../config/zookeeper.properties 1>/dev/null 2>&1 &

现在启动kafka


config/server1.properties:
	broker.id=0
	listeners=PLAINTEXT://192.168.10.130:9092
	log.dirs=kafka-logs
	zookeeper.connect=localhost:2181

//后台启动kafka
./kafka-server-start.sh ../config/server.properties 1>/dev/null 2>&1 &

Step 3:创建一个主题

  创建一个名为“test”的Topic,只有一个分区和备份(2181是zookeeper的默认端口)


./kafka-topics.sh --create --zookeeper localhost:2181 --config max.message.bytes=12800000 --config flush.messages=1 --replication-factor 1 --partitions 1 --topic test

命令解析:
--create: 指定创建topic动作

--topic:指定新建topic的名称

--zookeeper: 指定kafka连接zk的连接url,该值和server.properties文件中的配置项{zookeeper.connect}一样

--config:指定当前topic上有效的参数值,参数列表参考文档为: http://kafka.apache.org/082/documentation.html#brokerconfigs

--partitions:指定当前创建的kafka分区数量,默认为1个

--replication-factor:指定每个分区的复制因子个数,默认1个

创建好之后,可以通过运行以下命令,查看已创建的topic信息:


>./kafka-topics.sh --list --zookeeper localhost:2181
test

  或者,除了手工创建topic外,你也可以配置你的broker,当发布一个不存在的topic时自动创建topic。

  补充:
(1)查看对应topic的描述信息


./kafka-topics.sh --describe --zookeeper localhost:2181  --topic test0

命令解析:
--describe: 指定是展示详细信息命令

--zookeeper: 指定kafka连接zk的连接url,该值和server.properties文件中的配置项{zookeeper.connect}一样

--topic:指定需要展示数据的topic名称

这里写图片描述

  (2)Topic信息修改


bin/kafka-topics.sh --zookeeper 192.168.187.146:2181 --alter --topic test0 --config max.message.bytes=128000
bin/kafka-topics.sh --zookeeper 192.168.187.146:2181 --alter --topic test0 --delete-config max.message.bytes
bin/kafka-topics.sh --zookeeper 192.168.187.146:2181 --alter --topic test0 --partitions 10 
bin/kafka-topics.sh --zookeeper 192.168.187.146:2181 --alter --topic test0 --partitions 3 ## Kafka分区数量只允许增加,不允许减少

  (3)Topic删除
默认情况下Kafka的Topic是没法直接删除的,需要进行相关参数配置


bin/kafka-topics.sh --delete --topic test0 --zookeeper 192.168.187.146:2181

加粗样式
Note: This will have no impact if delete.topic.enable is not set to true.## 默认情况下,删除是标记删除,没有实际删除这个Topic;如果运行删除Topic,两种方式:
方式一:通过delete命令删除后,手动将本地磁盘以及zk上的相关topic的信息删除即可
方式二:配置server.properties文件,给定参数delete.topic.enable=true,重启kafka服务,此时执行delete命令表示允许进行Topic的删除

Step 4: 发送消息

  Kafka提供了一个命令行的工具,可以从输入文件或者命令行中读取消息并发送给Kafka集群。每一行是一条消息。

  运行producer(生产者),然后在控制台输入几条消息到服务器。
  备注:这里的localhost:9092不是固定的,需要根据server.properties中配置的地址来写这里的地址!


[root@administrator bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic test
>this is a message
>this is another message
//按`Ctrl+C`终止输入

Step 5: 消费消息

  Kafka也提供了一个消费消息的命令行工具,将存储的信息输出出来。
  备注:这里的localhost:9092不是固定的,需要根据server.properties中配置的地址来写这里的地址!


[root@administrator bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
this is a message
this is another message
//按`Ctrl+C`终止读取消息

  如果你有2台不同的终端上运行上述命令,那么当你在运行生产者时,消费者就能消费到生产者发送的消息。

Step 6: 设置多个broker集群(单机伪集群的配置)

  到目前,我们只是单一的运行一个broker,没什么意思。对于Kafka,一个broker仅仅只是一个集群的大小,所有让我们多设几个broker。

  首先为每个broker创建一个配置文件:


cp config/server.properties config/server-1.properties 
cp config/server.properties config/server-2.properties 

现在编辑这些新建的文件,设置以下属性:


vim config/server.properties 

config/server1.properties:
	broker.id=0
	listeners=PLAINTEXT://192.168.10.130:9092
	log.dirs=kafka-logs
	zookeeper.connect=localhost:2181
	
config/server-1.properties: 
    broker.id=1
	listeners=PLAINTEXT://192.168.10.130:9093
	log.dirs=kafka-logs-1
	zookeeper.connect=localhost:2181

config/server-2.properties: 
    broker.id=2
	listeners=PLAINTEXT://192.168.10.130:9094
	log.dirs=kafka-logs-2
	zookeeper.connect=localhost:2181

  备注1listeners一定要配置成为IP地址;如果配置为localhost或服务器的hostname,在使用java发送数据时就会抛出异 常:org.apache.kafka.common.errors.TimeoutException: Batch Expired 。因为在没有配置advertised.host.name 的情况下,Kafka并没有像官方文档宣称的那样改为广播我们配置的host.name,而是广播了主机配置的hostname。远端的客户端并没有配置 hosts,所以自然是连接不上这个hostname的。

  备注2:当使用java客户端访问远程的kafka时,一定要把集群中所有的端口打开,否则会连接超时


/sbin/iptables -I INPUT -p tcp --dport 9092 -j ACCEPT
/sbin/iptables -I INPUT -p tcp --dport 9093 -j ACCEPT
/sbin/iptables -I INPUT -p tcp --dport 9094 -j ACCEPT
/etc/rc.d/init.d/iptables save

  broker.id是集群中每个节点的唯一且永久的名称,我们修改端口和日志目录是因为我们现在在同一台机器上运行,我们要防止broker在同一端口上注册和覆盖对方的数据。

  我们已经运行了zookeeper和刚才的一个kafka节点,所有我们只需要在启动2个新的kafka节点。


./kafka-server-start.sh ../config/server-1.properties 1>/dev/null 2>&1 &
./kafka-server-start.sh ../config/server-2.properties 1>/dev/null 2>&1 &

  现在,我们创建一个新topic,把备份设置为:3


./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

  好了,现在我们已经有了一个集群了,我们怎么知道每个集群在做什么呢?运行命令“describe topics”


> ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
//所有分区的摘要
Topic:my-replicated-topic    PartitionCount:1    ReplicationFactor:3    Configs:
//提供一个分区信息,因为我们只有一个分区,所以只有一行。
Topic: my-replicated-topic    Partition: 0    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
  • “leader”:该节点负责该分区的所有的读和写,每个节点的leader都是随机选择的。
  • “replicas”:备份的节点列表,无论该节点是否是leader或者目前是否还活着,只是显示。
  • “isr”:“同步备份”的节点列表,也就是活着的节点并且正在同步leader

  其中ReplicasIsr中的1,2,0就对应着3个broker他们的broker.id属性!

  我们运行这个命令,看看一开始我们创建的那个节点:


> ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test    PartitionCount:1    ReplicationFactor:1    Configs:
Topic: test    Partition: 0    Leader: 0    Replicas: 0    Isr: 0

  这并不奇怪,刚才创建的主题没有Replicas,并且在服务器“0”上,我们创建它的时候,集群中只有一个服务器,所以是“0”。

Step 7: 测试集群的容错能力

7.1发布消息到集群


[root@administrator bin]# ./kafka-console-producer.sh --broker-list 192.168.10.130:9092 --topic my-replicated-topic
>cluster message 1
>cluster message 2
//Ctrl+C终止产生消息

7.2消费消息


[root@administrator bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.10.130:9093 --from-beginning --topic my-replicated-topic
cluster message 1
cluster message 2
//Ctrl+C终止消费消息

7.3干掉leader,测试集群容错

首先查询谁是leader


> ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
//所有分区的摘要
Topic:my-replicated-topic    PartitionCount:1    ReplicationFactor:3    Configs:
//提供一个分区信息,因为我们只有一个分区,所以只有一行。
Topic: my-replicated-topic    Partition: 0    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0

  可以看到Leaderbroker.id1,找到对应的Broker


[root@administrator bin]# jps -m
5130 Kafka ../config/server.properties
4861 QuorumPeerMain ../config/zookeeper.properties
1231 Bootstrap start start
7420 Kafka ../config/server-2.properties
7111 Kafka ../config/server-1.properties
9139 Jps -m

  通过以上查询到LeaderPIDKafka ../config/server-1.properties)为7111,杀掉该进程


//杀掉该进程
kill -9 7111
//再查询一下,确认新的Leader已经产生,新的Leader为broker.id=0
[root@administrator bin]# ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic       PartitionCount:1        ReplicationFactor:3    Configs:
//备份节点之一成为新的leader,而broker1已经不在同步备份集合里了
Topic: my-replicated-topic      Partition: 0    Leader: 0       Replicas: 1,0,2 Isr: 0,2


7.4再次消费消息,确认消息没有丢失


[root@administrator bin]# ./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
cluster message 1
cluster message 2

  消息依然存在,故障转移成功!!

到此这篇关于kafka安装部署的文章就介绍到这了,更多相关kafka安装部署内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

kafka安装部署超详细步骤

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

在docker上部署postgreSQL主从的超详细步骤

目录一、主从规划二、创建PostgresSQL的docker镜像三、主库部署1、建立pgsql主库的data地址2、启动docker镜像3、docker内操作4、修改配置文件四、部署从数据库1、建立psql备库的data地址2、启动dock
在docker上部署postgreSQL主从的超详细步骤
2024-08-23

CentOS下MySQL 8.0安装部署,超详细!

MySQL 8正式版8.0.11已发布,官方表示MySQL8要比MySQL 5.7快2倍,还带来了大量的改进和更快的性能!

超详细JDK下载与安装步骤

文章目录 😋开发环境-JDK安装😆 1. 下载地址😛 2. 安装JDK😚 3. 配置系统环境 😋开发环境-JDK安装 无论在我们开始学习Ja
2023-08-16

Docker安装mysql超详细步骤记录

查看需要安装的镜像版本dockerHub官网地址 1.搜索mysql2.点击标签3.点击Tags,查看想要的版本号在安装好docker的linux中执行命令拉取mysql最新版本docphpker pull mysql 拉取mysq
2022-07-12

KubeSphere部署mysql的详细步骤

目录1.创建mysql基础配置2.创建pvc挂载3.创建工作负载4.创建服务(创建服务后才能进行外部访问)5.测试mysql是否能正常访问演示示例使用的是3.4.1,各版本有名字差异 功能是一样的 由于mysql需要做数据持久化所以需要挂载
KubeSphere部署mysql的详细步骤
2024-10-14

Docker安装Kafka教程(超详细)

首先创建一个网络 app-tier:网络名称 –driver:网络类型为bridge docker network create app-tier --driver bridge 1、安装zookeeper Kafka依赖zook
2023-08-19

Centos7安装部署免费confluencewiki(知识库)详细操作步骤

Centos7安装部署免费confluence wiki(知识库)详细操作步骤 前言:confluence是团队协作软件,改变团队工作方式,作为现代化办公不可缺少的工具 wiki所需的安装包: 链接: https://pan.baidu.c
2022-05-19

ASP.NET安装详细步骤

这篇文章主要介绍“ASP.NET安装详细步骤”,在日常操作中,相信很多人在ASP.NET安装详细步骤问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”ASP.NET安装详细步骤”的疑惑有所帮助!接下来,请跟着小编
2023-06-18

pip安装详细步骤

安装步骤:1、确保Python已经正确安装在您的计算机上;2、下载“get-pip.py”脚本;3、按下Win + R键,然后输入cmd并按下Enter键来打开命令行窗口;4、在命令行窗口中,使用cd命令切换到“get-pip.py”所在的
2023-10-22

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录