我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Java Kafka 消费积压监控的示例代码

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Java Kafka 消费积压监控的示例代码

后端代码:

Monitor.java代码:


package com.suncreate.kafkaConsumerMonitor.service;

import com.suncreate.kafkaConsumerMonitor.model.ConsumerInfo;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.*;


public class Monitor {
    private static final Logger log = LoggerFactory.getLogger(Monitor.class);

    private String servers;

    private String topic;

    private String groupId;

    private long lastTime;

    private long lastTotalLag = 0L;

    private long lastLogSize = 0L;

    private long lastOffset = 0L;

    private double lastRatio = 0;

    private long speedLogSize = 0L;

    private long speedOffset = 0L;

    private String time;

    private List<ConsumerInfo> list;

    private final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    public String getTime() {
        return time;
    }

    public void setTime(String time) {
        this.time = time;
    }

    public long getLastTotalLag() {
        return lastTotalLag;
    }

    public double getLastRatio() {
        return lastRatio;
    }

    public String getTopic() {
        return topic;
    }

    public String getGroupId() {
        return groupId;
    }

    public long getSpeedLogSize() {
        return speedLogSize;
    }

    public long getSpeedOffset() {
        return speedOffset;
    }

    public List<ConsumerInfo> getList() {
        return list;
    }

    public void setList(List<ConsumerInfo> list) {
        this.list = list;
    }

    private KafkaConsumer<String, String> consumer;

    private List<TopicPartition> topicPartitionList;

    private final DecimalFormat decimalFormat = new DecimalFormat("0.00");

    public Monitor(String servers, String topic, String groupId) {
        this.servers = servers;
        this.topic = topic;
        this.groupId = groupId;

        this.list = new ArrayList<>();

        //消费者
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.servers);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        consumer = new KafkaConsumer<String, String>(properties);

        //查询 topic partitions
        topicPartitionList = new ArrayList<>();
        List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
        for (PartitionInfo partitionInfo : partitionInfoList) {
            TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
            topicPartitionList.add(topicPartition);
        }
    }

    public void monitor(boolean addToList) {
        try {
            long startTime = System.currentTimeMillis();

            //查询 log size
            Map<Integer, Long> endOffsetMap = new HashMap<>();
            Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitionList);
            for (TopicPartition partitionInfo : endOffsets.keySet()) {
                endOffsetMap.put(partitionInfo.partition(), endOffsets.get(partitionInfo));
            }

            //查询消费 offset
            Map<Integer, Long> commitOffsetMap = new HashMap<>();
            for (TopicPartition topicAndPartition : topicPartitionList) {
                OffsetAndMetadata committed = consumer.committed(topicAndPartition);
                commitOffsetMap.put(topicAndPartition.partition(), committed.offset());
            }

            long endTime = System.currentTimeMillis();
            log.info("查询logSize和offset耗时:" + (new DecimalFormat("0.000")).format((endTime - startTime) / 1000.0) + " 秒");
            startTime = System.currentTimeMillis();

            //累加lag
            long totalLag = 0L;
            long logSize = 0L;
            long offset = 0L;
            if (endOffsetMap.size() == commitOffsetMap.size()) {
                for (Integer partition : endOffsetMap.keySet()) {
                    long endOffset = endOffsetMap.get(partition);
                    long commitOffset = commitOffsetMap.get(partition);
                    long diffOffset = endOffset - commitOffset;
                    totalLag += diffOffset;
                    logSize += endOffset;
                    offset += commitOffset;
                }

            } else {
                log.error("Topic:" + topic + "  consumer:" + consumer + "  topic partitions lost");
            }

            log.info("Topic:" + topic + "  logSize:" + logSize + "  offset:" + offset + "  totalLag:" + totalLag);

            if (lastTime > 0) {
                if (System.currentTimeMillis() - lastTime > 0) {
                    speedLogSize = (long) ((logSize - lastLogSize) / ((System.currentTimeMillis() - lastTime) / 1000.0));
                    speedOffset = (long) ((offset - lastOffset) / ((System.currentTimeMillis() - lastTime) / 1000.0));
                }

                if (speedLogSize > 0) {
                    String strRatio = decimalFormat.format(speedOffset * 100 / (speedLogSize * 1.0));
                    lastRatio = Double.parseDouble(strRatio);
                    log.info("Topic:" + topic + "  speedLogSize:" + speedLogSize + "  speedOffset:" + speedOffset + "  百分比:" + strRatio + "%");
                }
            }

            lastTime = System.currentTimeMillis();
            lastTotalLag = totalLag;
            lastLogSize = logSize;
            lastOffset = offset;

            endTime = System.currentTimeMillis();
            log.info("计算耗时:" + (new DecimalFormat("0.000")).format((endTime - startTime) / 1000.0) + " 秒");

            if (addToList) {
                this.setTime(simpleDateFormat.format(new Date()));
                this.list.add(new ConsumerInfo(this.getTopic(), this.getGroupId(), this.getLastTotalLag(), this.getLastRatio(), this.getSpeedLogSize(), this.getSpeedOffset(), this.getTime()));
                if (this.list.size() > 500) {
                    this.list.remove(0);
                }
            }

        } catch (Exception e) {
            log.error("Monitor error", e);
        }
    }

}

MonitorService.java代码:


package com.suncreate.kafkaConsumerMonitor.service;

import com.suncreate.kafkaConsumerMonitor.model.ConsumerInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.*;

@Service
public class MonitorService {
    private static final Logger log = LoggerFactory.getLogger(MonitorService.class);

    @Value("${kafka.consumer.servers}")
    private String servers;

    private Monitor monitor;

    private List<Monitor> monitorList;

    @PostConstruct
    private void Init() {
        monitorList = new ArrayList<>();

        monitorList.add(new Monitor(servers, "wifiData", "wifi-kafka-hbase"));
        monitorList.add(new Monitor(servers, "KK_PASS_INFO_TYCC", "EXTRACT-SAMPLE"));
        monitorList.add(new Monitor(servers, "KK_PASS_INFO_TYCC", "dblrecog-upload2vcn"));
        monitorList.add(new Monitor(servers, "KK_PASS_INFO_TYCC_FILTER", "yisa20210521000001"));
        monitorList.add(new Monitor(servers, "KK_PASS_INFO_TYCC_FILTER", "kafka-filter-check-19"));
        monitorList.add(new Monitor(servers, "motorVehicle", "unifiedstorage-downloader"));
        monitorList.add(new Monitor(servers, "motorVehicle", "full-vehicle-data-storage-kafka2ch"));
        monitorList.add(new Monitor(servers, "motorVehicle", "vehicle_store"));
        monitorList.add(new Monitor(servers, "motorVehicle", "vcn-sk-upload-luyang"));
        monitorList.add(new Monitor(servers, "motorVehicle", "vcn-sk-upload-yaohai"));
        monitorList.add(new Monitor(servers, "motorVehicle", "vcn-sk-upload-baohe"));
        monitorList.add(new Monitor(servers, "peopleFace", "kafka-filter-check-19"));
    }

    public void monitorOnce(boolean addToList) {
        for (Monitor monitor : monitorList) {
            monitor.monitor(addToList);
        }
    }

    public List<ConsumerInfo> getConsumerList() {
        List<ConsumerInfo> list = new ArrayList<>();

        for (Monitor monitor : monitorList) {
            list.add(new ConsumerInfo(monitor.getTopic(), monitor.getGroupId(), monitor.getLastTotalLag(), monitor.getLastRatio(), monitor.getSpeedLogSize(), monitor.getSpeedOffset(), monitor.getTime()));
        }

        return list;
    }

    public List<ConsumerInfo> getDetails(String topic, String groupId) {
        for (Monitor monitor : monitorList) {
            if (monitor.getTopic().equals(topic) && monitor.getGroupId().equals(groupId)) {
                return monitor.getList();
            }
        }
        return new ArrayList<>();
    }

}

MonitorConfig.java代码:


package com.suncreate.kafkaConsumerMonitor.task;

import com.suncreate.kafkaConsumerMonitor.service.MonitorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.scheduling.support.CronTrigger;

import java.text.SimpleDateFormat;

@Configuration
@EnableScheduling
public class MonitorConfig implements SchedulingConfigurer {
    private static final Logger logger = LoggerFactory.getLogger(MonitorConfig.class);

    private String cronExpression = "0 */3 * * * ?";

    //private String cronExpression = "*/20 * * * * ?";

    private final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

    @Autowired
    private MonitorService monitorService;

    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        taskRegistrar.addTriggerTask(() -> {

            monitorService.monitorOnce(true);

        }, triggerContext -> new CronTrigger(cronExpression).nextExecutionTime(triggerContext));
    }
}

MonitorController.java代码:


package com.suncreate.kafkaConsumerMonitor.controller;

import com.suncreate.kafkaConsumerMonitor.model.ConsumerInfo;
import com.suncreate.kafkaConsumerMonitor.model.LayuiData;
import com.suncreate.kafkaConsumerMonitor.service.MonitorService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;

@RestController
@RequestMapping("/monitor")
public class MonitorController {

    @Autowired
    private MonitorService monitorService;

    @GetMapping("/getConsumers")
    public LayuiData getConsumers() {

        List<ConsumerInfo> list = monitorService.getConsumerList();

        LayuiData data = new LayuiData(list);
        return data;
    }

    @GetMapping("/monitorOnce")
    public void monitorOnce() {
        monitorService.monitorOnce(false);
    }

    @GetMapping("/getDetails")
    public LayuiData getDetails(String topic, String groupId) {

        List<ConsumerInfo> list = monitorService.getDetails(topic, groupId);

        LayuiData data = new LayuiData(list);
        return data;
    }
}

pom.xml文件(有些东西没用到或者备用,没有删):


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.6.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.suncreate</groupId>
    <artifactId>kafka-consumer-monitor</artifactId>
    <version>1.0</version>
    <name>kafka-consumer-monitor</name>
    <description>Kafka消费积压监控预警</description>

    <properties>
        <java.version>1.8</java.version>
        <elasticsearch.version>6.1.4</elasticsearch.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.54</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>

        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>6.1.4</version>
        </dependency>

        <dependency>
            <groupId>com.oracle</groupId>
            <artifactId>ojdbc6</artifactId>
            <version>11.1.0.7.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.11.0.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.1</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

前端使用了 Layui 和 ECharts 展示表格和图表

index.css代码:


.div-title {
    font-size: 18px;
    margin-top: 10px;
    margin-left: 10px;
}

.div-right {
    text-align: right;
}

.span-red {
    color: #ff0000;
}

index.html代码(展示topic、消费者组Consumer GroupId、Total Lag、Kafka数据生产速度、Kafka数据消费速度等):


<!DOCTYPE html>
<html lang="zh">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
    <link rel="stylesheet" href="css/index.css" rel="external nofollow"  rel="external nofollow" >
    <link rel="stylesheet" href="js/layui-v2.6.8/css/layui.css" rel="external nofollow"  rel="external nofollow"  media="all">
    <script type="text/javascript" class="lazy" data-src="js/jquery-1.7.1.js"></script>
    <script type="text/javascript" class="lazy" data-src="js/layui-v2.6.8/layui.js" charset="utf-8"></script>
</head>
<body>

<div class="div-title">Kafka 监控
    <button type="button" class="layui-btn layui-btn-sm" onclick="refreshTable()">刷新</button>
</div>
<table class="layui-hide" id="myTable"></table>

<script type="text/javascript">
    var myTable;

    layui.use('table', function () {
        var table = layui.table;

        myTable = table.render({
            elem: '#myTable',
            url: '/home/monitor/getConsumers',
            cellMinWidth: 80, //全局定义常规单元格的最小宽度
            cols: [[
                {field: 'topic', width: 300, title: 'topic', sort: true},
                {field: 'groupId', width: 300, title: 'groupId'},
                {
                    field: 'totalLag', width: 150, title: 'Total Lag', sort: true, templet: function (d) {
                        if (d.delayDay * 24 > 2) {
                            return '<div class="div-right"><span class="span-red">' + d.totalLag + '</span></div>'
                        } else {
                            return '<div class="div-right"><span>' + d.totalLag + '</span></div>'
                        }
                    }
                },
                {
                    field: 'speedLogSize', width: 150, title: '生产速度(条/秒)', templet: function (d) {
                        return '<div class="div-right">' + d.speedLogSize + '</div>'
                    }
                },
                {
                    field: 'speedOffset', width: 150, title: '消费速度(条/秒)', templet: function (d) {
                        return '<div class="div-right">' + d.speedOffset + '</div>'
                    }
                },
                {
                    field: 'ratio', width: 100, title: '消费/生产', templet: function (d) {
                        if (d.ratio < 90) {
                            return '<div class="div-right"><span class="span-red">' + d.ratio + '%</span></div>'
                        } else {
                            return '<div class="div-right"><span>' + d.ratio + '%</span></div>'
                        }
                    }
                },
                {
                    field: 'delayDay', width: 150, title: '积压(天)', sort: true, templet: function (d) {
                        if (d.delayDay * 24 > 2) {
                            return '<div class="div-right"><span class="span-red">' + d.delayDay + '</span></div>'
                        } else {
                            return '<div class="div-right"><span>' + d.delayDay + '</span></div>'
                        }
                    }
                },
                {
                    field: 'ope', width: 100, title: '操作', templet: function (d) {
                        return '<a href="/home/detail.html?topic=' + d.topic + '&groupId=' + d.groupId + '" rel="external nofollow"  target="_blank" class="layui-btn layui-btn-sm" >详细</a>';
                    }
                }
            ]]
        });
    });

    function refreshTable() {
        if (myTable) {
            myTable.reload();
        }
    }

    setInterval(function () {
        refreshTable();
    }, 30000);

    // setInterval(function () {
    //     $.get("/home/monitor/monitorOnce");
    // }, 30000);
</script>

</body>
</html>

detail.html代码(展示单个消费者组的Total Lag、生产速度、消费速度以及Total Lag趋势图):


<!DOCTYPE html>
<html lang="zh">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
    <link rel="stylesheet" href="css/index.css" rel="external nofollow"  rel="external nofollow" >
    <link rel="stylesheet" href="js/layui-v2.6.8/css/layui.css" rel="external nofollow"  rel="external nofollow"  media="all">
    <script type="text/javascript" class="lazy" data-src="js/jquery-1.7.1.js"></script>
    <script type="text/javascript" class="lazy" data-src="js/layui-v2.6.8/layui.js" charset="utf-8"></script>
    <script type="text/javascript" class="lazy" data-src="js/echarts-v4.7.0/echarts.min.js"></script>
</head>
<body>

<div class="div-title"><span id="detailTitle"></span> 明细
    <button type="button" class="layui-btn layui-btn-sm" onclick="refreshTable()">刷新</button>
</div>
<div id="main" style="height:400px;"></div>
<table class="layui-hide" id="test"></table>

<script type="text/javascript">
    var myTable;
    var topic = getQueryVariable("topic");
    var groupId = getQueryVariable("groupId");

    $("#detailTitle").html(topic + "  " + groupId);

    layui.use('table', function () {
        var table = layui.table;

        myTable = table.render({
            elem: '#test',
            url: '/home/monitor/getDetails?topic=' + topic + '&groupId=' + groupId,
            cellMinWidth: 80, //全局定义常规单元格的最小宽度
            initSort: {
                field: 'time', //排序字段,对应 cols 设定的各字段名
                type: 'desc' //排序方式  asc: 升序、desc: 降序、null: 默认排序
            },
            cols: [[
                {field: 'topic', width: 300, title: 'topic'},
                {field: 'groupId', width: 300, title: 'groupId'},
                {field: 'time', width: 180, title: '时间', sort: true},
                {
                    field: 'totalLag', width: 150, title: 'Total Lag', templet: function (d) {
                        if (d.delayDay * 24 > 2) {
                            return '<div class="div-right"><span class="span-red">' + d.totalLag + '</span></div>'
                        } else {
                            return '<div class="div-right"><span>' + d.totalLag + '</span></div>'
                        }
                    }
                },
                {
                    field: 'speedLogSize', width: 150, title: '生产速度(条/秒)', templet: function (d) {
                        return '<div class="div-right">' + d.speedLogSize + '</div>'
                    }
                },
                {
                    field: 'speedOffset', width: 150, title: '消费速度(条/秒)', templet: function (d) {
                        return '<div class="div-right">' + d.speedOffset + '</div>'
                    }
                },
                {
                    field: 'ratio', width: 100, title: '消费/生产', templet: function (d) {
                        if (d.ratio < 90) {
                            return '<div class="div-right"><span class="span-red">' + d.ratio + '%</span></div>'
                        } else {
                            return '<div class="div-right"><span>' + d.ratio + '%</span></div>'
                        }
                    }
                },
                {
                    field: 'delayDay', width: 150, title: '积压(天)', templet: function (d) {
                        if (d.delayDay * 24 > 2) {
                            return '<div class="div-right"><span class="span-red">' + d.delayDay + '</span></div>'
                        } else {
                            return '<div class="div-right"><span>' + d.delayDay + '</span></div>'
                        }
                    }
                }
            ]]
        });
    });

    function refreshTable() {
        if (myTable) {
            myTable.reload();
        }
        showChart();
    }

    setInterval(function () {
        refreshTable();
    }, 30000);

    function getQueryVariable(variable) {
        var query = window.location.search.substring(1);
        var vars = query.split("&");
        for (var i = 0; i < vars.length; i++) {
            var pair = vars[i].split("=");
            if (pair[0] == variable) {
                return pair[1];
            }
        }
        return (false);
    }

    function showChart() {
        $.ajax({
            type: "GET",
            url: '/home/monitor/getDetails?topic=' + topic + '&groupId=' + groupId,
            success: function (data) {
                if (data && data.data && data.data.length > 1) {
                    debugger;
                    var chartDom = document.getElementById('main');
                    var myChart = echarts.init(chartDom);
                    var option;

                    var xAxis = [];
                    var serseis = [];
                    for (var i = 0; i < data.data.length; i++) {
                        xAxis.push(data.data[i].time);
                        serseis.push(data.data[i].totalLag);

                    }

                    option = {
                        title: {
                            show: true,
                            text: "Total Lag 趋势图",
                            x: 'center'
                        },
                        xAxis: {
                            type: 'category',
                            data: xAxis
                        },
                        yAxis: {
                            type: 'value'
                        },
                        series: [{
                            data: serseis,
                            type: 'line'
                        }]
                    };

                    myChart.setOption(option);
                }
            }
        });
    }

    showChart();

</script>

</body>
</html>

效果图:

消费者组列表:

消费者组明细:

到此这篇关于Java Kafka 消费积压监控的文章就介绍到这了,更多相关Java Kafka 消费监控内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Java Kafka 消费积压监控的示例代码

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

Java中如何实现Kafka消费积压监控

小编给大家分享一下Java中如何实现Kafka消费积压监控,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!后端代码:Monitor.java代码:package c
2023-06-20

Kafka Java客户端代码的示例分析

这篇文章将为大家详细讲解有关Kafka Java客户端代码的示例分析,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。kafka是一种高吞吐量的分布式发布订阅消息系统kafka是linkedin
2023-06-17

Redis shake实现可视化监控的示例代码

使用RedisShake可视化监控示例代码,可轻松实现RedisShake的运行状况和性能监控。该代码涵盖了RedisShakePrometheusExporter的安装、配置和使用,以及在Prometheus和Grafana中设置RedisShake目标、仪表盘和关键监控指标。通过可视化监控,可以实时了解RedisShake的运行状况,及时识别问题并采取纠正措施,从而提高其可用性和可靠性。
Redis shake实现可视化监控的示例代码
2024-04-02

SpringBoot集成Druid实现监控功能的示例代码

本文详细介绍了如何在SpringBoot应用程序中集成Druid以实现监控功能。Druid提供了一个控制台,用于监控数据库性能、SQL查询和系统资源。通过添加Druid依赖项、配置数据源、创建配置文件、启用控制台和运行应用程序,开发人员可以轻松启用Druid监控。Druid提供实时监控、SQL查询监控和系统资源监控等优点,帮助识别和优化数据库性能。
SpringBoot集成Druid实现监控功能的示例代码
2024-04-02

Java利用Redis实现消息队列的示例代码

本文介绍了Java利用Redis实现消息队列的示例代码,分享给大家,具体如下:应用场景为什么要用redis二进制存储、java序列化传输、IO连接数高、连接频繁一、序列化这里编写了一个java序列化的工具,主要是将对象转化为byte数组,和
2023-05-31

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录