我的编程空间,编程开发者的网络收藏夹
学习永远不晚

一站式Kafka平台解决方案——KafkaCenter

短信预约 信息系统项目管理师 报名、考试、查分时间动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

一站式Kafka平台解决方案——KafkaCenter

一站式Kafka平台解决方案——KafkaCenter

KafkaCenter是什么

KafkaCenter是一个针对Kafka的一站式,解决方案。用于Kafka集群的维护与管理,生产者和消费者的监控,以及Kafka部分生态组件的使用。

对于Kafka的平台化,一直缺少一个成熟的解决方案,之前比较流行的kafka监控方案,如kafka-manager提供了集群管理与topic管理等等功能。但是对于生产者、消费者的监控,以及Kafka的新生态,如Connect,KSQL还缺少响应的支持。Confluent Control Center功能要完整一些,但却是非开源收费的。

对于Kafka的使用,一直都是一个让人头疼的问题,由于实时系统的强运维特性,我们不得不投入大量的时间用于集群的维护,kafka的运维,比如:

  • 人工创建topic,特别费力
  • 相关kafka运维,监控孤岛化
  • 现有消费监控工具监控不准确
  • 无法拿到Kafka 集群的summay信息
  • 无法快速知晓集群健康状态
  • 无法知晓业务对team kafka使用情况
  • kafka管理,监控工具稀少,没有一个好的工具我们直接可以使用
  • 无法快速查询topic消息

功能模块介绍

  • Home-> 查看平台管理的Kafka Cluster集群信息及监控信息
  • Topic-> 用户可以在此模块查看自己的Topic,发起申请新建Topic,同时可以对Topic进行生产消费测试。
  • Monitor-> 用户可以在此模块中可以查看Topic的生产以及消费情况,同时可以针对消费延迟情况设置预警信息。
  • Connect-> 实现用户快速创建自己的Connect Job,并对自己的Connect进行维护。
  • KSQL-> 实现用户快速创建自己的KSQL Job,并对自己的Job进行维护。
  • Approve-> 此模块主要用于当普通用户申请创建Topic,管理员进行审批操作。
  • Setting-> 此模块主要功能为管理员维护User、Team以及kafka cluster信息
  • Kafka Manager-> 此模块用于管理员对集群的正常维护操作。

系统截图:

安装与入门

安装需要依赖 mysql es email server

组件 是否必须 功能
mysql 必须 配置信息存在mysql
elasticsearch(7.0+) 可选 各种监控信息的存储
email server 可选 Apply, approval, warning e-mail alert

1、初始化

在MySQL中执行sql建表

-- Dumping database structure for kafka_center
CREATE DATABASE IF NOT EXISTS `kafka_center` ;
USE `kafka_center`;


-- Dumping structure for table kafka_center.alert_group
CREATE TABLE IF NOT EXISTS `alert_group` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `cluster_id` int(11) NOT NULL,
  `topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `consummer_group` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `consummer_api` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `threshold` int(11) DEFAULT NULL,
  `dispause` int(11) DEFAULT NULL,
  `mail_to` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT "",
  `webhook` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT "",
  `create_date` datetime DEFAULT NULL,
  `owner_id` int(11) DEFAULT NULL,
  `team_id` int(11) DEFAULT NULL,
  `disable_alerta` tinyint(1) DEFAULT 0,
  `enable` tinyint(1) NOT NULL DEFAULT 1,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.cluster_info
CREATE TABLE IF NOT EXISTS `cluster_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) COLLATE utf8_bin NOT NULL,
  `zk_address` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `broker` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `create_time` datetime DEFAULT NULL,
  `comments` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `enable` int(11) DEFAULT NULL,
  `broker_size` int(4) DEFAULT 0,
  `kafka_version` varchar(10) COLLATE utf8_bin DEFAULT "",
  `location` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `graf_addr` varchar(255) COLLATE utf8_bin DEFAULT "",
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.ksql_info
CREATE TABLE IF NOT EXISTS `ksql_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `cluster_id` int(11) DEFAULT NULL,
  `cluster_name` varchar(255) DEFAULT NULL,
  `ksql_url` varchar(255) DEFAULT NULL,
  `ksql_serverId` varchar(255) DEFAULT NULL,
  `version` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.task_info
CREATE TABLE IF NOT EXISTS `task_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `cluster_ids` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `location` varchar(20) COLLATE utf8_bin NOT NULL DEFAULT "",
  `topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `partition` int(11) DEFAULT NULL,
  `replication` int(11) DEFAULT NULL,
  `message_rate` int(50) DEFAULT NULL,
  `ttl` int(11) DEFAULT NULL,
  `owner_id` int(11) DEFAULT NULL,
  `team_id` int(11) DEFAULT NULL,
  `comments` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT "",
  `create_time` datetime DEFAULT NULL,
  `approved` int(11) DEFAULT NULL,
  `approved_id` int(11) DEFAULT NULL,
  `approved_time` datetime DEFAULT NULL,
  `approval_opinions` varchar(1000) COLLATE utf8_bin DEFAULT "",
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.team_info
CREATE TABLE IF NOT EXISTS `team_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `own` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.topic_collection
CREATE TABLE IF NOT EXISTS `topic_collection` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `cluster_id` int(11) NOT NULL,
  `user_id` int(11) NOT NULL,
  `name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `type` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.topic_info
CREATE TABLE IF NOT EXISTS `topic_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `cluster_id` int(11) NOT NULL,
  `topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `partition` int(11) DEFAULT NULL,
  `replication` int(11) DEFAULT NULL,
  `ttl` bigint(11) DEFAULT NULL,
  `config` varchar(512) COLLATE utf8_bin DEFAULT NULL,
  `owner_id` int(11) DEFAULT NULL,
  `team_id` int(11) DEFAULT NULL,
  `comments` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT "",
  `create_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.user_info
CREATE TABLE IF NOT EXISTS `user_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `real_name` varchar(255) COLLATE utf8_bin DEFAULT "",
  `email` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `role` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "100",
  `create_time` datetime DEFAULT NULL,
  `password` varchar(255) COLLATE utf8_bin DEFAULT "",
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.user_team
CREATE TABLE IF NOT EXISTS `user_team` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `user_id` int(11) DEFAULT NULL,
  `team_id` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

2、配置

相关配置位于application.properties

可对端口 日志等信息做一些修改

server.port=8080
debug=false
# 设置session timeout为6小时
server.servlet.session.timeout=21600
spring.security.user.name=admin
spring.security.user.password=admin
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/kafka_center?useUnicode=true&characterEncoding=utf-8
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.type=com.zaxxer.hikari.HikariDataSource
spring.datasource.hikari.minimum-idle=5
spring.datasource.hikari.maximum-pool-size=15
spring.datasource.hikari.pool-name=KafkaCenterHikariCP
spring.datasource.hikari.max-lifetime =30000
spring.datasource.hikari.connection-test-query=SELECT 1
management.health.defaults.enabled=false

public.url=http://localhost:8080
connect.url=http://localhost:8000/#/
system.topic.ttl.h=16

monitor.enable=true
monitor.collect.period.minutes=5
monitor.elasticsearch.hosts=localhost:9200
monitor.elasticsearch.index=kafka_center_monitor
#是否启用收集线程指定集群收集
monitor.collector.include.enable=false
#收集线程指定location,必须属于remote.locations之中
monitor.collector.include.location=dev
collect.topic.enable=true
collect.topic.period.minutes=10
# remote的功能是为了提高lag查询和收集,解决跨location网络延迟问题
remote.query.enable=false
remote.hosts=gqc@localhost2:8080
remote.locations=dev,gqc
#发送consumer group的lag发送给alert service
alert.enable=false
alert.dispause=2
alert.service=
alert.threshold=1000
alter.env=other
#是否开启邮件功能,true:启用,false:禁用
mail.enable=false
spring.mail.host=
spring.mail.username=KafkaCenter@xaecbd.com
# oauth2
generic.enabled=false
generic.name=oauth2 Login
generic.auth_url=
generic.token_url=
generic.redirect_utl=
generic.api_url=
generic.client_id=
generic.client_secret=
generic.scopes=

3、运行

推荐使用docker

docker run -d -p 8080:8080 --name KafkaCenter -v ${PWD}/application.properties:/opt/app/kafka-center/config/application.properties xaecbd/kafka-center:2.1.0

不用docker

$ git clone https://github.com/xaecbd/KafkaCenter.git
$ cd KafkaCenter
$ mvn clean package -Dmaven.test.skip=true
$ cd KafkaCenterKafkaCenter-Core	arget
$ java -jar KafkaCenter-Core-2.1.0-SNAPSHOT.jar

4、查看

访问http://localhost:8080 管理员用户与密码默认:admin / admin

功能介绍

Topics

用户可以在此模块完成Topic查看,已经申请新建Topic,同时可以对Topic进行生产消费测试。

Monitor

用户可以在此模块中可以查看Topic的生成以及消费情况,同时可以针对消费延迟情况设置预警信息。

Alerts

此模块用于维护预警信息。用户可以看到自己所有预警信息,管理员可以看到所有人的预警信息。

Kafka Connect

实现用户快速创建自己的Connect Job,并对自己的Connect进行维护。

KSQL

实现用户快速创建自己的KSQL Job,并对自己的Job进行维护。

Approve

此模块主要用于当普通用户申请创建Topic 或者Job时,管理员进行审批操作。

Setting

此模块主要功能为管理员维护User、Team以及kafka cluster信息

Cluster Manager

此模块用于管理员对集群的正常维护操作。

Home

这里是一些基本的统计信息

My Favorite

集群与topic列表

Topic

这里是一些topic的管理功能

Topic List

操作范围:

用户所属Team的所有Topic

  • Topic -> Topic List -> Detail 查看Topic的详细信息
  • Topic -> Topic List -> Mock 对Topic进行生产测试

申请创建topic

Important: admin不能申请task,普通用户必须先让管理员新建team后,将用户加入指定team后,才可以申请task。

操作范围:

用户所属Team的所有Task

  • Topic -> My Task -> Detail 查看申请的Task信息

  • Topic -> My Task -> Delete 删除被拒绝或待审批的Task

  • Topic -> My Task -> Edit 修改被拒绝的Task

  • Topic -> My Task -> Create Topic Task 创建Task

    • 按照表单各字段要求填写信息
    • 点击确认,提交申请

    审批结果:

    • 审批通过:Topic将会被创建在管理员指定的集群
    • 审批拒绝:用户收到邮件,返回到My Task,点击对应Task后面的Edit,针对审批意见进行修改

Topic命名规则:

只能包含:数字、大小写字母、下划线、中划线、点;长度大于等于3小于等于100。

不推荐:下划线开头;

可对所有Topic进行消费测试

Monitor

监控模块

生产者监控

消费者监控

消息积压

报警功能

Connect

这里是一些Connect的操作

KSQL

可以进行KQL的查询操作

Approve

这里主要是管理员做一些审核操作

  • Approve->check 审批用户的Task
  • 根据用户选择的location指定cluster
  • 检查用户设置的partition和replication大小是否合理,如不合理做出调整
  • 检查其他字段是否合理,如需要拒绝该申请,点击Reject并填写意见。

Kafka Manager
Topic管理

Cluster管理

broker管理

group管理

Setting

这些主要是用户的一些设置

KafkaCenter还是一个非常不错的kafka管理工具,可以满足大部分需求。
更多实时数据分析相关博文与科技资讯,欢迎关注 “实时流式计算”

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

一站式Kafka平台解决方案——KafkaCenter

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

一站式Kafka平台解决方案——KafkaCenter

KafkaCenter是什么KafkaCenter是一个针对Kafka的一站式,解决方案。用于Kafka集群的维护与管理,生产者和消费者的监控,以及Kafka部分生态组件的使用。对于Kafka的平台化,一直缺少一个成熟的解决方案,之前比较流行的kafka监控方
一站式Kafka平台解决方案——KafkaCenter
2019-07-25

探究有哪些企业培训平台一站式员工发展解决方案

在当今数字化、信息化的时代背景下,企业培训已成为企业提升竞争力,实现持续发展的关键要素。而企业培训平台作为企业培训的重要载体,其作用和价值也越来越受到企业的重视。本文将介绍目前市场上有哪些主要的企业培训平台,帮助企业更好地选择和使用适合自己的培训平台,提升员工能力和企业竞争力。一、企业培训平台的主要类型在线培训平
探究有哪些企业培训平台一站式员工发展解决方案
2023-11-19

windows平台的分布式微服务解决方案(4)-

本文讲述的数据库读写分离的实现方式,是基于前述“数据库的负载均衡”这篇文章的内容发展而来,请先对其阅读参考。 在某些大型应用系统中,往往需要把数据库的“读操作”与“写操作”分为两个数据库(甚至两组数据库)分开操作,以达到读写分离的目的。要实现数据库的读写分离,
windows平台的分布式微服务解决方案(4)-
2016-10-06

可道云服务器站点:一站式解决方案

1.什么是可道云服务器站点?可道云服务器站点是一种全面的解决方案,旨在为用户提供高效、可靠的云服务器托管服务。它结合了可道云的强大功能和灵活性,为用户提供了一个完整的云服务器管理平台。2.可道云服务器站点的特点2.1简单易用可道云服务器站点提供了直观的用户界面,使用户能够轻松管理和监控他们的云服务器。无论是创建新的实例、调整资源配置还是监控服务器性能,都可以通过简单的几个步骤完成。2.2高性能可道云服务器站...
2023-10-27

Java 文件操作:跨平台解决方案揭秘

Java文件操作的跨平台解决方案,助力开发人员解决文件处理的难题,本文详细阐述Java文件操作的跨平台实现方法,并提供代码示例。
Java 文件操作:跨平台解决方案揭秘
2024-02-26

阿里云托管数据库一站式解决方案

随着互联网的发展,数据量日益增大,如何高效、安全地存储和管理数据成为了企业和个人关注的焦点。阿里云作为全球领先的云计算服务提供商,提供了全面的数据库解决方案,包括云数据库服务、数据库备份、恢复等。本文将详细介绍阿里云托管数据库,帮助您了解并选择适合自己的数据库解决方案。一、什么是阿里云托管数据库?阿里云托管数据库
阿里云托管数据库一站式解决方案
2023-10-30

亚马逊云服务:一站式云计算解决方案

1.亚马逊云服务(AmazonWebServices,简称AWS)的概述亚马逊云服务(AWS)是亚马逊公司提供的一系列云计算服务,旨在帮助个人、企业和组织构建和管理各种应用程序、存储数据和进行分析。AWS提供了一种灵活、可扩展和经济高效的方式来使用云计算资源,无论是构建简单的网站还是复杂的企业级应用程序。2.亚马逊云服务的服务类型AWS提供了多种服务类型,以满足不同的需求和应用场景。以下是一些常见的服务类型:2.1.计...
2023-10-27

亚马逊服务器:一站式云计算解决方案

1.什么是亚马逊服务器?亚马逊服务器(AmazonWebServices,简称AWS)是由亚马逊公司提供的一种云计算服务。它提供了一系列的基础设施和平台服务,帮助用户构建和扩展各种应用程序,从简单的网站到复杂的企业级应用。2.为什么选择亚马逊服务器?2.1强大的可扩展性亚马逊服务器提供了强大的可扩展性,可以根据业务需求快速扩展或缩减服务器资源。无论是应对突发流量还是应用程序的持续增长,亚马逊服务器都能够满足需求。2.2可...
2023-10-27

跨平台 C++ 代码中设计模式的移植问题与解决方案

在跨平台 c++++ 开发中,设计模式移植问题包括:平台依赖性、头文件可用性、命名冲突、内存管理。解决方案包括使用跨平台库、预处理器指令、命名空间、跨平台内存管理库等。跨平台 C++ 代码中设计模式的移植问题与解决方案在跨平台 C++ 开
跨平台 C++ 代码中设计模式的移植问题与解决方案
2024-05-13

数商云汽车后市场电商平台解决方案

(一) 汽车市场发展简况自中国改革开放以来,国民收入的逐步提高,截止2016年,中国汽车市场销量已经连续8年蝉联世界第一。据最新数据显示,2017年上半年汽车销量同期上涨3.81%,达到1,335万辆,这还不含二手车销售量。中国消费者对汽车
2023-06-05

阿里云 ECS 服务区一站式云服务解决方案

阿里云ECS服务区,是阿里云推出的一种一站式的云服务解决方案,旨在帮助企业快速构建、部署和管理云应用。阿里云ECS服务区提供了一系列的云服务,包括计算服务、存储服务、网络服务和安全服务等,能够满足企业对云计算的各种需求。阿里云ECS服务区的计算服务,包括弹性计算实例、容器服务和虚拟机服务,可以根据企业的需求,提供
阿里云 ECS 服务区一站式云服务解决方案
2023-10-31

阿里云服务器下载镜像一站式解决方案

阿里云服务器是一款基于云计算技术的虚拟服务器产品,提供了多种操作系统选择,满足用户不同需求。本文将详细介绍如何在阿里云服务器上下载和使用镜像。一、什么是镜像?镜像是一个完整的操作系统实例的副本,包括操作系统内核、系统库、应用软件、文档、配置文件等。镜像文件通常以ISO或VHD格式保存,便于在不同的计算机上安装和运
阿里云服务器下载镜像一站式解决方案
2023-11-05

亚马逊AWS服务器:一站式云计算解决方案

1.什么是亚马逊AWS服务器?亚马逊AWS(AmazonWebServices)是亚马逊公司提供的一项云计算服务,它为企业和个人提供了一系列灵活、可扩展和安全的云计算解决方案。其中,AWS服务器是AWS服务中的核心组件之一。2.AWS服务器的特点和优势2.1弹性和可扩展性AWS服务器提供了弹性和可扩展性的特点,使用户能够根据实际需求灵活地调整计算资源。无论是需要增加还是减少服务器的容量,都可以通过简单的操作来实现,避免...
2023-10-27

PHP跨平台开发的安全性挑战与解决方案

在跨平台 php 开发中,主要的安全挑战包括代码注入攻击(使用预处理语句、验证用户输入和安全框架)、跨站脚本 (xss) 攻击(html 实体编码、验证用户输入和 csp 标头)、跨域请求伪造 (csrf) 攻击(同步令牌模式、第三方域请求
PHP跨平台开发的安全性挑战与解决方案
2024-05-21

SAP云平台上的Low Code Development的解决方案是什么

SAP云平台上的Low Code Development的解决方案是什么,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。今天我们来简单了解SAP云平台上的快速应用
2023-06-03

编程热搜

目录