Java Kafka 消费积压监控的示例代码
短信预约 -IT技能 免费直播动态提醒
后端代码:
Monitor.java代码:
package com.suncreate.kafkaConsumerMonitor.service;
import com.suncreate.kafkaConsumerMonitor.model.ConsumerInfo;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.*;
public class Monitor {
private static final Logger log = LoggerFactory.getLogger(Monitor.class);
private String servers;
private String topic;
private String groupId;
private long lastTime;
private long lastTotalLag = 0L;
private long lastLogSize = 0L;
private long lastOffset = 0L;
private double lastRatio = 0;
private long speedLogSize = 0L;
private long speedOffset = 0L;
private String time;
private List<ConsumerInfo> list;
private final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public String getTime() {
return time;
}
public void setTime(String time) {
this.time = time;
}
public long getLastTotalLag() {
return lastTotalLag;
}
public double getLastRatio() {
return lastRatio;
}
public String getTopic() {
return topic;
}
public String getGroupId() {
return groupId;
}
public long getSpeedLogSize() {
return speedLogSize;
}
public long getSpeedOffset() {
return speedOffset;
}
public List<ConsumerInfo> getList() {
return list;
}
public void setList(List<ConsumerInfo> list) {
this.list = list;
}
private KafkaConsumer<String, String> consumer;
private List<TopicPartition> topicPartitionList;
private final DecimalFormat decimalFormat = new DecimalFormat("0.00");
public Monitor(String servers, String topic, String groupId) {
this.servers = servers;
this.topic = topic;
this.groupId = groupId;
this.list = new ArrayList<>();
//消费者
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.servers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumer = new KafkaConsumer<String, String>(properties);
//查询 topic partitions
topicPartitionList = new ArrayList<>();
List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
for (PartitionInfo partitionInfo : partitionInfoList) {
TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
topicPartitionList.add(topicPartition);
}
}
public void monitor(boolean addToList) {
try {
long startTime = System.currentTimeMillis();
//查询 log size
Map<Integer, Long> endOffsetMap = new HashMap<>();
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitionList);
for (TopicPartition partitionInfo : endOffsets.keySet()) {
endOffsetMap.put(partitionInfo.partition(), endOffsets.get(partitionInfo));
}
//查询消费 offset
Map<Integer, Long> commitOffsetMap = new HashMap<>();
for (TopicPartition topicAndPartition : topicPartitionList) {
OffsetAndMetadata committed = consumer.committed(topicAndPartition);
commitOffsetMap.put(topicAndPartition.partition(), committed.offset());
}
long endTime = System.currentTimeMillis();
log.info("查询logSize和offset耗时:" + (new DecimalFormat("0.000")).format((endTime - startTime) / 1000.0) + " 秒");
startTime = System.currentTimeMillis();
//累加lag
long totalLag = 0L;
long logSize = 0L;
long offset = 0L;
if (endOffsetMap.size() == commitOffsetMap.size()) {
for (Integer partition : endOffsetMap.keySet()) {
long endOffset = endOffsetMap.get(partition);
long commitOffset = commitOffsetMap.get(partition);
long diffOffset = endOffset - commitOffset;
totalLag += diffOffset;
logSize += endOffset;
offset += commitOffset;
}
} else {
log.error("Topic:" + topic + " consumer:" + consumer + " topic partitions lost");
}
log.info("Topic:" + topic + " logSize:" + logSize + " offset:" + offset + " totalLag:" + totalLag);
if (lastTime > 0) {
if (System.currentTimeMillis() - lastTime > 0) {
speedLogSize = (long) ((logSize - lastLogSize) / ((System.currentTimeMillis() - lastTime) / 1000.0));
speedOffset = (long) ((offset - lastOffset) / ((System.currentTimeMillis() - lastTime) / 1000.0));
}
if (speedLogSize > 0) {
String strRatio = decimalFormat.format(speedOffset * 100 / (speedLogSize * 1.0));
lastRatio = Double.parseDouble(strRatio);
log.info("Topic:" + topic + " speedLogSize:" + speedLogSize + " speedOffset:" + speedOffset + " 百分比:" + strRatio + "%");
}
}
lastTime = System.currentTimeMillis();
lastTotalLag = totalLag;
lastLogSize = logSize;
lastOffset = offset;
endTime = System.currentTimeMillis();
log.info("计算耗时:" + (new DecimalFormat("0.000")).format((endTime - startTime) / 1000.0) + " 秒");
if (addToList) {
this.setTime(simpleDateFormat.format(new Date()));
this.list.add(new ConsumerInfo(this.getTopic(), this.getGroupId(), this.getLastTotalLag(), this.getLastRatio(), this.getSpeedLogSize(), this.getSpeedOffset(), this.getTime()));
if (this.list.size() > 500) {
this.list.remove(0);
}
}
} catch (Exception e) {
log.error("Monitor error", e);
}
}
}
MonitorService.java代码:
package com.suncreate.kafkaConsumerMonitor.service;
import com.suncreate.kafkaConsumerMonitor.model.ConsumerInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.*;
@Service
public class MonitorService {
private static final Logger log = LoggerFactory.getLogger(MonitorService.class);
@Value("${kafka.consumer.servers}")
private String servers;
private Monitor monitor;
private List<Monitor> monitorList;
@PostConstruct
private void Init() {
monitorList = new ArrayList<>();
monitorList.add(new Monitor(servers, "wifiData", "wifi-kafka-hbase"));
monitorList.add(new Monitor(servers, "KK_PASS_INFO_TYCC", "EXTRACT-SAMPLE"));
monitorList.add(new Monitor(servers, "KK_PASS_INFO_TYCC", "dblrecog-upload2vcn"));
monitorList.add(new Monitor(servers, "KK_PASS_INFO_TYCC_FILTER", "yisa20210521000001"));
monitorList.add(new Monitor(servers, "KK_PASS_INFO_TYCC_FILTER", "kafka-filter-check-19"));
monitorList.add(new Monitor(servers, "motorVehicle", "unifiedstorage-downloader"));
monitorList.add(new Monitor(servers, "motorVehicle", "full-vehicle-data-storage-kafka2ch"));
monitorList.add(new Monitor(servers, "motorVehicle", "vehicle_store"));
monitorList.add(new Monitor(servers, "motorVehicle", "vcn-sk-upload-luyang"));
monitorList.add(new Monitor(servers, "motorVehicle", "vcn-sk-upload-yaohai"));
monitorList.add(new Monitor(servers, "motorVehicle", "vcn-sk-upload-baohe"));
monitorList.add(new Monitor(servers, "peopleFace", "kafka-filter-check-19"));
}
public void monitorOnce(boolean addToList) {
for (Monitor monitor : monitorList) {
monitor.monitor(addToList);
}
}
public List<ConsumerInfo> getConsumerList() {
List<ConsumerInfo> list = new ArrayList<>();
for (Monitor monitor : monitorList) {
list.add(new ConsumerInfo(monitor.getTopic(), monitor.getGroupId(), monitor.getLastTotalLag(), monitor.getLastRatio(), monitor.getSpeedLogSize(), monitor.getSpeedOffset(), monitor.getTime()));
}
return list;
}
public List<ConsumerInfo> getDetails(String topic, String groupId) {
for (Monitor monitor : monitorList) {
if (monitor.getTopic().equals(topic) && monitor.getGroupId().equals(groupId)) {
return monitor.getList();
}
}
return new ArrayList<>();
}
}
MonitorConfig.java代码:
package com.suncreate.kafkaConsumerMonitor.task;
import com.suncreate.kafkaConsumerMonitor.service.MonitorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.scheduling.support.CronTrigger;
import java.text.SimpleDateFormat;
@Configuration
@EnableScheduling
public class MonitorConfig implements SchedulingConfigurer {
private static final Logger logger = LoggerFactory.getLogger(MonitorConfig.class);
private String cronExpression = "0 */3 * * * ?";
//private String cronExpression = "*/20 * * * * ?";
private final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
@Autowired
private MonitorService monitorService;
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.addTriggerTask(() -> {
monitorService.monitorOnce(true);
}, triggerContext -> new CronTrigger(cronExpression).nextExecutionTime(triggerContext));
}
}
MonitorController.java代码:
package com.suncreate.kafkaConsumerMonitor.controller;
import com.suncreate.kafkaConsumerMonitor.model.ConsumerInfo;
import com.suncreate.kafkaConsumerMonitor.model.LayuiData;
import com.suncreate.kafkaConsumerMonitor.service.MonitorService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
@RestController
@RequestMapping("/monitor")
public class MonitorController {
@Autowired
private MonitorService monitorService;
@GetMapping("/getConsumers")
public LayuiData getConsumers() {
List<ConsumerInfo> list = monitorService.getConsumerList();
LayuiData data = new LayuiData(list);
return data;
}
@GetMapping("/monitorOnce")
public void monitorOnce() {
monitorService.monitorOnce(false);
}
@GetMapping("/getDetails")
public LayuiData getDetails(String topic, String groupId) {
List<ConsumerInfo> list = monitorService.getDetails(topic, groupId);
LayuiData data = new LayuiData(list);
return data;
}
}
pom.xml文件(有些东西没用到或者备用,没有删):
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.suncreate</groupId>
<artifactId>kafka-consumer-monitor</artifactId>
<version>1.0</version>
<name>kafka-consumer-monitor</name>
<description>Kafka消费积压监控预警</description>
<properties>
<java.version>1.8</java.version>
<elasticsearch.version>6.1.4</elasticsearch.version>
</properties>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.54</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.1.4</version>
</dependency>
<dependency>
<groupId>com.oracle</groupId>
<artifactId>ojdbc6</artifactId>
<version>11.1.0.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.11.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
前端使用了 Layui 和 ECharts 展示表格和图表
index.css代码:
.div-title {
font-size: 18px;
margin-top: 10px;
margin-left: 10px;
}
.div-right {
text-align: right;
}
.span-red {
color: #ff0000;
}
index.html代码(展示topic、消费者组Consumer GroupId、Total Lag、Kafka数据生产速度、Kafka数据消费速度等):
<!DOCTYPE html>
<html lang="zh">
<head>
<meta charset="UTF-8">
<title>Title</title>
<link rel="stylesheet" href="css/index.css" rel="external nofollow" rel="external nofollow" >
<link rel="stylesheet" href="js/layui-v2.6.8/css/layui.css" rel="external nofollow" rel="external nofollow" media="all">
<script type="text/javascript" class="lazy" data-src="js/jquery-1.7.1.js"></script>
<script type="text/javascript" class="lazy" data-src="js/layui-v2.6.8/layui.js" charset="utf-8"></script>
</head>
<body>
<div class="div-title">Kafka 监控
<button type="button" class="layui-btn layui-btn-sm" onclick="refreshTable()">刷新</button>
</div>
<table class="layui-hide" id="myTable"></table>
<script type="text/javascript">
var myTable;
layui.use('table', function () {
var table = layui.table;
myTable = table.render({
elem: '#myTable',
url: '/home/monitor/getConsumers',
cellMinWidth: 80, //全局定义常规单元格的最小宽度
cols: [[
{field: 'topic', width: 300, title: 'topic', sort: true},
{field: 'groupId', width: 300, title: 'groupId'},
{
field: 'totalLag', width: 150, title: 'Total Lag', sort: true, templet: function (d) {
if (d.delayDay * 24 > 2) {
return '<div class="div-right"><span class="span-red">' + d.totalLag + '</span></div>'
} else {
return '<div class="div-right"><span>' + d.totalLag + '</span></div>'
}
}
},
{
field: 'speedLogSize', width: 150, title: '生产速度(条/秒)', templet: function (d) {
return '<div class="div-right">' + d.speedLogSize + '</div>'
}
},
{
field: 'speedOffset', width: 150, title: '消费速度(条/秒)', templet: function (d) {
return '<div class="div-right">' + d.speedOffset + '</div>'
}
},
{
field: 'ratio', width: 100, title: '消费/生产', templet: function (d) {
if (d.ratio < 90) {
return '<div class="div-right"><span class="span-red">' + d.ratio + '%</span></div>'
} else {
return '<div class="div-right"><span>' + d.ratio + '%</span></div>'
}
}
},
{
field: 'delayDay', width: 150, title: '积压(天)', sort: true, templet: function (d) {
if (d.delayDay * 24 > 2) {
return '<div class="div-right"><span class="span-red">' + d.delayDay + '</span></div>'
} else {
return '<div class="div-right"><span>' + d.delayDay + '</span></div>'
}
}
},
{
field: 'ope', width: 100, title: '操作', templet: function (d) {
return '<a href="/home/detail.html?topic=' + d.topic + '&groupId=' + d.groupId + '" rel="external nofollow" target="_blank" class="layui-btn layui-btn-sm" >详细</a>';
}
}
]]
});
});
function refreshTable() {
if (myTable) {
myTable.reload();
}
}
setInterval(function () {
refreshTable();
}, 30000);
// setInterval(function () {
// $.get("/home/monitor/monitorOnce");
// }, 30000);
</script>
</body>
</html>
detail.html代码(展示单个消费者组的Total Lag、生产速度、消费速度以及Total Lag趋势图):
<!DOCTYPE html>
<html lang="zh">
<head>
<meta charset="UTF-8">
<title>Title</title>
<link rel="stylesheet" href="css/index.css" rel="external nofollow" rel="external nofollow" >
<link rel="stylesheet" href="js/layui-v2.6.8/css/layui.css" rel="external nofollow" rel="external nofollow" media="all">
<script type="text/javascript" class="lazy" data-src="js/jquery-1.7.1.js"></script>
<script type="text/javascript" class="lazy" data-src="js/layui-v2.6.8/layui.js" charset="utf-8"></script>
<script type="text/javascript" class="lazy" data-src="js/echarts-v4.7.0/echarts.min.js"></script>
</head>
<body>
<div class="div-title"><span id="detailTitle"></span> 明细
<button type="button" class="layui-btn layui-btn-sm" onclick="refreshTable()">刷新</button>
</div>
<div id="main" style="height:400px;"></div>
<table class="layui-hide" id="test"></table>
<script type="text/javascript">
var myTable;
var topic = getQueryVariable("topic");
var groupId = getQueryVariable("groupId");
$("#detailTitle").html(topic + " " + groupId);
layui.use('table', function () {
var table = layui.table;
myTable = table.render({
elem: '#test',
url: '/home/monitor/getDetails?topic=' + topic + '&groupId=' + groupId,
cellMinWidth: 80, //全局定义常规单元格的最小宽度
initSort: {
field: 'time', //排序字段,对应 cols 设定的各字段名
type: 'desc' //排序方式 asc: 升序、desc: 降序、null: 默认排序
},
cols: [[
{field: 'topic', width: 300, title: 'topic'},
{field: 'groupId', width: 300, title: 'groupId'},
{field: 'time', width: 180, title: '时间', sort: true},
{
field: 'totalLag', width: 150, title: 'Total Lag', templet: function (d) {
if (d.delayDay * 24 > 2) {
return '<div class="div-right"><span class="span-red">' + d.totalLag + '</span></div>'
} else {
return '<div class="div-right"><span>' + d.totalLag + '</span></div>'
}
}
},
{
field: 'speedLogSize', width: 150, title: '生产速度(条/秒)', templet: function (d) {
return '<div class="div-right">' + d.speedLogSize + '</div>'
}
},
{
field: 'speedOffset', width: 150, title: '消费速度(条/秒)', templet: function (d) {
return '<div class="div-right">' + d.speedOffset + '</div>'
}
},
{
field: 'ratio', width: 100, title: '消费/生产', templet: function (d) {
if (d.ratio < 90) {
return '<div class="div-right"><span class="span-red">' + d.ratio + '%</span></div>'
} else {
return '<div class="div-right"><span>' + d.ratio + '%</span></div>'
}
}
},
{
field: 'delayDay', width: 150, title: '积压(天)', templet: function (d) {
if (d.delayDay * 24 > 2) {
return '<div class="div-right"><span class="span-red">' + d.delayDay + '</span></div>'
} else {
return '<div class="div-right"><span>' + d.delayDay + '</span></div>'
}
}
}
]]
});
});
function refreshTable() {
if (myTable) {
myTable.reload();
}
showChart();
}
setInterval(function () {
refreshTable();
}, 30000);
function getQueryVariable(variable) {
var query = window.location.search.substring(1);
var vars = query.split("&");
for (var i = 0; i < vars.length; i++) {
var pair = vars[i].split("=");
if (pair[0] == variable) {
return pair[1];
}
}
return (false);
}
function showChart() {
$.ajax({
type: "GET",
url: '/home/monitor/getDetails?topic=' + topic + '&groupId=' + groupId,
success: function (data) {
if (data && data.data && data.data.length > 1) {
debugger;
var chartDom = document.getElementById('main');
var myChart = echarts.init(chartDom);
var option;
var xAxis = [];
var serseis = [];
for (var i = 0; i < data.data.length; i++) {
xAxis.push(data.data[i].time);
serseis.push(data.data[i].totalLag);
}
option = {
title: {
show: true,
text: "Total Lag 趋势图",
x: 'center'
},
xAxis: {
type: 'category',
data: xAxis
},
yAxis: {
type: 'value'
},
series: [{
data: serseis,
type: 'line'
}]
};
myChart.setOption(option);
}
}
});
}
showChart();
</script>
</body>
</html>
效果图:
消费者组列表:
消费者组明细:
到此这篇关于Java Kafka 消费积压监控的文章就介绍到这了,更多相关Java Kafka 消费监控内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341