我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Kafka心跳与消费机制是什么

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Kafka心跳与消费机制是什么

这篇“Kafka心跳与消费机制是什么”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“Kafka心跳与消费机制是什么”文章吧。

Kafka是通过心跳机制来控制消费超时,心跳机制对于消费者客户端来说是无感的,它是一个异步线程,当我们启动一个消费者实例时,心跳线程就开始工作了。心跳超时会导致消息重复消费

Kafka心跳与消费机制是什么

1、Kafka消费

首先,我们来看看消费。Kafka提供了非常简单的消费API,使用者只需初始化Kafka的Broker Server地址,然后实例化KafkaConsumer类即可拿到Topic中的数据。一个简单的Kafka消费实例代码如下所示:

public class JConsumerSubscribe extends Thread {   public static void main(String[] args) {        JConsumerSubscribe jconsumer = new JConsumerSubscribe();        jconsumer.start();    }        private Properties configure() {        Properties props = new Properties();        props.put("bootstrap.servers", "dn1:9092,dn2:9092,dn3:9092");// 指定Kafka集群地址       props.put("group.id", "ke");// 指定消费者组       props.put("enable.auto.commit", "true");// 开启自动提交       props.put("auto.commit.interval.ms", "1000");// 自动提交的时间间隔       // 反序列化消息主键        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");       // 反序列化消费记录        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");       return props;   }        @Override    public void run() {        // 创建一个消费者实例对象        KafkaConsumer consumer = new KafkaConsumer(configure());        // 订阅消费主题集合        consumer.subscribe(Arrays.asList("test_kafka_topic"));       // 实时消费标识        boolean flag = true;       while (flag) {           // 获取主题消息数据            ConsumerRecords records = consumer.poll(Duration.ofMillis(100));           for (ConsumerRecord record : records)               // 循环打印消息记录                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());       }        // 出现异常关闭消费者对象        consumer.close();   }}

上述代码我们就可以非常便捷的拿到Topic中的数据。但是,当我们调用poll方法拉取数据的时候,Kafka Broker Server做了那些事情。接下来,我们可以去看看源代码的实现细节。核心代码如下: org.apache.kafka.clients.consumer.KafkaConsumer

private ConsumerRecords poll(final long timeoutMs, final boolean includeMetadataInTimeout) {       acquireAndEnsureOpen();        try {           if (timeoutMs "Timeout must not be negative");           if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {               throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");           }            // poll for new data until the timeout expires           long elapsedTime = 0L;           do {               client.maybeTriggerWakeup();                final long metadataEnd;                if (includeMetadataInTimeout) {                   final long metadataStart = time.milliseconds();                    if (!updateAssignmentMetadataIfNeeded(remainingTimeAtLeastZero(timeoutMs, elapsedTime))) {                       return ConsumerRecords.empty();                   }                    metadataEnd = time.milliseconds();                    elapsedTime += metadataEnd - metadataStart;                } else {                   while (!updateAssignmentMetadataIfNeeded(Long.MAX_VALUE)) {                       log.warn("Still waiting for metadata");                   }                    metadataEnd = time.milliseconds();                }                final Map>> records = pollForFetches(remainingTimeAtLeastZero(timeoutMs, elapsedTime));                if (!records.isEmpty()) {                   // before returning the fetched records, we can send off the next round of fetches                   // and avoid block waiting for their responses to enable pipelining while the user                   // is handling the fetched records.                   //                   // NOTE: since the consumed position has already been updated, we must not allow                   // wakeups or any other errors to be triggered prior to returning the fetched records.                   if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {                       client.pollNoWakeup();                    }                    return this.interceptors.onConsume(new ConsumerRecords(records));               }                final long fetchEnd = time.milliseconds();                elapsedTime += fetchEnd - metadataEnd;            } while (elapsedTime return ConsumerRecords.empty();       } finally {           release();        }    }

上述代码中有个方法pollForFetches,它的实现逻辑如下:

private Map>> pollForFetches(final long timeoutMs) {       final long startMs = time.milliseconds();       long pollTimeout = Math.min(coordinator.timeToNextPoll(startMs), timeoutMs);       // if data is available already, return it immediately       final Map>> records = fetcher.fetchedRecords();       if (!records.isEmpty()) {           return records;       }       // send any new fetches (won't resend pending fetches)       fetcher.sendFetches();       // We do not want to be stuck blocking in poll if we are missing some positions       // since the offset lookup may be backing off after a failure       // NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call       // updateAssignmentMetadataIfNeeded before this method.       if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {           pollTimeout = retryBackoffMs;       }       client.poll(pollTimeout, startMs, () -> {           // since a fetch might be completed by the background thread, we need this poll condition           // to ensure that we do not block unnecessarily in poll()           return !fetcher.hasCompletedFetches();       });       // after the long poll, we should check whether the group needs to rebalance       // prior to returning data so that the group can stabilize faster       if (coordinator.rejoinNeededOrPending()) {           return Collections.emptyMap();       }       return fetcher.fetchedRecords();   }

上述代码中加粗的位置,我们可以看出每次消费者客户端拉取数据时,通过poll方法,先调用fetcher中的fetchedRecords函数,如果获取不到数据,就会发起一个新的sendFetches请求。而在消费数据的时候,每个批次从Kafka Broker Server中拉取数据是有最大数据量限制,默认是500条,由属性(max.poll.records)控制,可以在客户端中设置该属性值来调整我们消费时每次拉取数据的量。

**提示:**这里需要注意的是,max.poll.records返回的是一个poll请求的数据总和,与多少个分区无关。因此,每次消费从所有分区中拉取Topic的数据的总条数不会超过max.poll.records所设置的值。

而在Fetcher的类中,在sendFetches方法中有限制拉取数据容量的限制,由属性(max.partition.fetch.bytes),默认1MB。可能会有这样一个场景,当满足max.partition.fetch.bytes限制条件,如果需要Fetch出10000条记录,每次默认500条,那么我们需要执行20次才能将这一次通过网络发起的请求全部Fetch完毕。

这里,可能有同学有疑问,我们不能将默认的max.poll.records属性值调到10000吗?可以调,但是还有个属性需要一起配合才可以,这个就是每次poll的超时时间(Duration.ofMillis(100)),这里需要根据你的实际每条数据的容量大小来确定设置超时时间,如果你将最大值调到10000,当你每条记录的容量很大时,超时时间还是100ms,那么可能拉取的数据少于10000条。

而这里,还有另外一个需要注意的事情,就是会话超时的问题。session.timeout.ms默认是10s,group.min.session.timeout.ms默认是6s,group.max.session.timeout.ms默认是30min。当你在处理消费的业务逻辑的时候,如果在10s内没有处理完,那么消费者客户端就会与Kafka Broker Server断开,消费掉的数据,产生的offset就没法提交给Kafka,因为Kafka Broker Server此时认为该消费者程序已经断开,而即使你设置了自动提交属性,或者设置auto.offset.reset属性,你消费的时候还是会出现重复消费的情况,这就是因为session.timeout.ms超时的原因导致的。

2、心跳机制

上面在末尾的时候,说到会话超时的情况导致消息重复消费,为什么会有超时?有同学会有这样的疑问,我的消费者线程明明是启动的,也没有退出,为啥消费不到Kafka的消息呢?消费者组也查不到我的ConsumerGroupID呢?这就有可能是超时导致的,而Kafka是通过心跳机制来控制超时,心跳机制对于消费者客户端来说是无感的,它是一个异步线程,当我们启动一个消费者实例时,心跳线程就开始工作了。

在org.apache.kafka.clients.consumer.internals.AbstractCoordinator中会启动一个HeartbeatThread线程来定时发送心跳和检测消费者的状态。每个消费者都有个org.apache.kafka.clients.consumer.internals.ConsumerCoordinator,而每个ConsumerCoordinator都会启动一个HeartbeatThread线程来维护心跳,心跳信息存放在org.apache.kafka.clients.consumer.internals.Heartbeat中,声明的Schema如下所示:

private final int sessionTimeoutMs;   private final int heartbeatIntervalMs;   private final int maxPollIntervalMs;   private final long retryBackoffMs;   private volatile long lastHeartbeatSend;     private long lastHeartbeatReceive;   private long lastSessionReset;   private long lastPoll;   private boolean heartbeatFailed;

心跳线程中的run方法实现代码如下:

public void run() {           try {               log.debug("Heartbeat thread started");               while (true) {                   synchronized (AbstractCoordinator.this) {                       if (closed)                           return;                       if (!enabled) {                           AbstractCoordinator.this.wait();                           continue;                       }                        if (state != MemberState.STABLE) {                           // the group is not stable (perhaps because we left the group or because the coordinator                           // kicked us out), so disable heartbeats and wait for the main thread to rejoin.                           disable();                           continue;                       }                       client.pollNoWakeup();                       long now = time.milliseconds();                       if (coordinatorUnknown()) {                           if (findCoordinatorFuture != null || lookupCoordinator().failed())                               // the immediate future check ensures that we backoff properly in the case that no                               // brokers are available to connect to.                               AbstractCoordinator.this.wait(retryBackoffMs);                       } else if (heartbeat.sessionTimeoutExpired(now)) {                           // the session timeout has expired without seeing a successful heartbeat, so we should                           // probably make sure the coordinator is still healthy.                           markCoordinatorUnknown();                       } else if (heartbeat.pollTimeoutExpired(now)) {                           // the poll timeout has expired, which means that the foreground thread has stalled                           // in between calls to poll(), so we explicitly leave the group.                           maybeLeaveGroup();                       } else if (!heartbeat.shouldHeartbeat(now)) {                           // poll again after waiting for the retry backoff in case the heartbeat failed or the                           // coordinator disconnected                           AbstractCoordinator.this.wait(retryBackoffMs);                       } else {                           heartbeat.sentHeartbeat(now);                           sendHeartbeatRequest().addListener(new RequestFutureListener() {                               @Override                               public void onSuccess(Void value) {                                   synchronized (AbstractCoordinator.this) {                                       heartbeat.receiveHeartbeat(time.milliseconds());                                   }                               }                               @Override                               public void onFailure(RuntimeException e) {                                   synchronized (AbstractCoordinator.this) {                                       if (e instanceof RebalanceInProgressException) {                                           // it is valid to continue heartbeating while the group is rebalancing. This                                           // ensures that the coordinator keeps the member in the group for as long                                           // as the duration of the rebalance timeout. If we stop sending heartbeats,                                           // however, then the session timeout may expire before we can rejoin.                                           heartbeat.receiveHeartbeat(time.milliseconds());                                       } else {                                           heartbeat.failHeartbeat();                                           // wake up the thread if it's sleeping to reschedule the heartbeat                                           AbstractCoordinator.this.notify();                                       }                                   }                               }                           });                       }                   }               }           } catch (AuthenticationException e) {               log.error("An authentication error occurred in the heartbeat thread", e);               this.failed.set(e);           } catch (GroupAuthorizationException e) {               log.error("A group authorization error occurred in the heartbeat thread", e);               this.failed.set(e);           } catch (InterruptedException | InterruptException e) {               Thread.interrupted();               log.error("Unexpected interrupt received in heartbeat thread", e);               this.failed.set(new RuntimeException(e));           } catch (Throwable e) {               log.error("Heartbeat thread failed due to unexpected error", e);               if (e instanceof RuntimeException)                   this.failed.set((RuntimeException) e);               else                   this.failed.set(new RuntimeException(e));           } finally {               log.debug("Heartbeat thread has closed");           }       }

在心跳线程中这里面包含两个最重要的超时函数,它们是sessionTimeoutExpired和pollTimeoutExpired。

public boolean sessionTimeoutExpired(long now) {       return now - Math.max(lastSessionReset, lastHeartbeatReceive) > sessionTimeoutMs;}public boolean pollTimeoutExpired(long now) {       return now - lastPoll > maxPollIntervalMs;}
2.1、sessionTimeoutExpired

如果是sessionTimeout超时,则会被标记为当前协调器处理断开,此时,会将消费者移除,重新分配分区和消费者的对应关系。在Kafka Broker Server中,Consumer Group定义了5中(如果算上Unknown,应该是6种状态)状态,org.apache.kafka.common.ConsumerGroupState,如下图所示:

Kafka心跳与消费机制是什么
2.2、pollTimeoutExpired

如果触发了poll超时,此时消费者客户端会退出ConsumerGroup,当再次poll的时候,会重新加入到ConsumerGroup,触发RebalanceGroup。而KafkaConsumer Client是不会帮我们重复poll的,需要我们自己在实现的消费逻辑中不停的调用poll方法。

3.分区与3消费线程

关于消费分区与消费线程的对应关系,理论上消费线程数应该小于等于分区数。之前是有这样一种观点,一个消费线程对应一个分区,当消费线程等于分区数是最大化线程的利用率。直接使用KafkaConsumer Client实例,这样使用确实没有什么问题。但是,如果我们有富裕的CPU,其实还可以使用大于分区数的线程,来提升消费能力,这就需要我们对KafkaConsumer Client实例进行改造,实现消费策略预计算,利用额外的CPU开启更多的线程,来实现消费任务分片。

以上就是关于“Kafka心跳与消费机制是什么”这篇文章的内容,相信大家都有了一定的了解,希望小编分享的内容对大家有帮助,若想了解更多相关的知识内容,请关注编程网行业资讯频道。

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Kafka心跳与消费机制是什么

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

Kafka心跳与消费机制是什么

这篇“Kafka心跳与消费机制是什么”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“Kafka心跳与消费机制是什么”文章吧。K
2023-06-27

Kafka消费与心跳机制如何理解

Kafka消费与心跳机制如何理解,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。导读kafka是一个分布式,分区的,多副本的,多订阅者的消息发布订阅系统(分布式MQ系统),可以用
2023-06-15

kafka核心消费逻辑是什么

这篇文章主要介绍“kafka核心消费逻辑是什么”,在日常操作中,相信很多人在kafka核心消费逻辑是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”kafka核心消费逻辑是什么”的疑惑有所帮助!接下来,请跟
2023-07-06

kafka的重试机制和ack机制是什么

Kafka的重试机制是指在消息发送过程中,如果发送失败或者出现异常,Kafka会自动尝试重新发送消息。重试机制的目的是确保消息能够成功发送到目标主题。Kafka的重试机制包括两个方面:Producer端重试:当Producer发送消息时,
2023-10-26

kafka副本同步机制是什么

Kafka副本同步机制是指Kafka集群中的副本之间的数据同步方式。在Kafka中,每个分区都有多个副本,其中一个被选为leader副本,其余副本为follower副本。副本同步机制保证了数据在分区的所有副本之间的一致性。Kafka使用的是
2023-10-12

Kafka中生产者和消费者指的是什么

在Kafka中,生产者和消费者是指Kafka消息系统中参与消息传递的两种角色。生产者是指负责向Kafka集群中的主题(topic)发布消息的客户端应用程序。生产者将消息发送到指定的主题,并且可以选择指定消息的键(key),以及消息所属的分
Kafka中生产者和消费者指的是什么
2024-03-14

Kafka消费者组和负载均衡策略是什么

Kafka消费者组是一组消费者实例的集合,它们共同消费一个或多个主题的消息。消费者组中的每个消费者实例会被分配一个或多个分区来消费消息。负载均衡策略是指Kafka消费者组中,如何分配分区给各个消费者实例,以实现消费者之间的负载均衡。Kaf
Kafka消费者组和负载均衡策略是什么
2024-04-22

Java线程核心机制是什么

这篇文章主要介绍“Java线程核心机制是什么”,在日常操作中,相信很多人在Java线程核心机制是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Java线程核心机制是什么”的疑惑有所帮助!接下来,请跟着小编
2023-06-02

rabbitmq消息确认机制是什么

RabbitMQ消息确认机制是一种用于保证消息可靠传输的机制。它确保生产者发送的消息被正确地传递给消费者并被消费者成功处理。在RabbitMQ中,消息确认机制可以通过以下两种方式实现:1. 生产者确认:生产者发送消息后,等待RabbitMQ
2023-10-09

android消息推送机制是什么

Android消息推送机制是一种通过网络将消息推送给已经安装了应用程序的Android设备的技术。它主要依靠Google提供的Firebase Cloud Messaging(FCM)服务来实现。在Android应用程序中,开发者可以集成F
2023-09-28

Android handler异步消息机制是什么

Android中的Handler是一种基于消息机制的异步处理机制。它可以用来将消息或Runnable对象发送到主线程或者后台线程中执行。在Android中,UI操作必须在主线程中执行,否则会出现异常。如果在后台线程中执行耗时操作,就需要使用
2023-10-18

hadoop心跳时间与冗余快清除方法是什么

这篇文章主要讲解了“hadoop心跳时间与冗余快清除方法是什么”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“hadoop心跳时间与冗余快清除方法是什么”吧!1.Hadoop datanode
2023-06-03

mq消息丢失补偿机制是什么

MQ消息丢失补偿机制是一种在消息中间件(MQ)中,当消息发送或消费遇到异常情况导致消息丢失时,能够进行补偿和保证消息不丢失的一种机制。常见的MQ消息丢失补偿机制有以下几种:1. 重试机制:当消息发送或消费失败时,自动进行重试操作,多次尝试发
2023-10-20

android异步消息处理机制是什么

Android异步消息处理机制是一种在主线程以外的线程中执行任务的机制。它主要包括以下几个重要的组件:1. Handler:负责发送和处理消息,它可以与Looper关联,通过Looper从消息队列中获取消息并处理。2. Message:消息
2023-09-13

RocketMQ消息存储文件的加载与恢复机制是什么

RocketMQ的消息存储文件加载与恢复机制主要包括两个方面:文件的加载和文件的恢复。文件加载:RocketMQ使用MmappedFile来加载消息存储文件。MmappedFile是一种内存映射文件,通过将文件映射到内存中,可以提高文件的读
RocketMQ消息存储文件的加载与恢复机制是什么
2024-04-09

Android线程间消息传递机制是什么

本篇内容介绍了“Android线程间消息传递机制是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!1、消息是怎么发送的?这个问题还可以进行
2023-06-04

MongoDB的并发控制与锁机制是什么

MongoDB使用乐观并发控制(Optimistic Concurrency Control)来处理并发操作。在MongoDB中,并发操作通过基于文档级别的锁来实现。当一个客户端请求对一个文档进行更新时,MongoDB会先获取该文档的锁,然
MongoDB的并发控制与锁机制是什么
2024-05-07

Redis主从复制与哨兵机制是什么

这篇文章主要介绍了Redis主从复制与哨兵机制是什么的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇Redis主从复制与哨兵机制是什么文章都会有所收获,下面我们一起来看看吧。一、Redis复制是什么?Redis复
2023-07-05

MongoDB复制集与故障恢复机制是什么

MongoDB复制集是一组维护相同数据集的MongoDB实例。其中有一个主节点(primary)负责处理所有的写操作,其他节点是从节点(secondary),负责复制主节点上的数据并处理读操作。复制集还包括一个仲裁节点(arbiter),用
MongoDB复制集与故障恢复机制是什么
2024-05-07

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录