我的编程空间,编程开发者的网络收藏夹
学习永远不晚

流式图表拒绝增删改查之kafka核心消费逻辑下篇

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

流式图表拒绝增删改查之kafka核心消费逻辑下篇

前篇回顾

  • 流式图表框架搭建
  • kafka核心消费逻辑线程池搭建

kafka消费者线程

突击检查八股文,实现线程的方法有哪些?嗯?没复习是吧,行没关系,那感谢参加本次面试哈。

常用的几种方式分别是:

  • 继承Thread类,重写run方法
  • 实现Runbale接口,重写run方法
  • 实现Callable接口,重写call方法

这里我们直接创捷出一个任务类实现Runable方法,重写run方法,一个线程当作一个kafka client,所以要在任务类中声明一个KafkaConsumer的成员变量,另外创建任务需要指定当前任务的名称也就是线程名,还有要监听的topic主题。

private KafkaConsumer<String, String> consumer;
private String topic;
private String threadName;

name和topic通过构造方法传进来,同时在构造方法里完成对client的初始化操作。


   public KafkaConsumerRunnable(String bootServer, String groupId, String topic) {
       this.topic = topic;
       Properties props = new Properties();
       props.put("bootstrap.servers", bootServer);
       props.put("group.id", groupId);
       props.put("enable.auto.commit", "false");
       props.put("auto.offset.reset", "latest");
       props.put("max.poll.records", 5);
       props.put("session.timeout.ms", "60000");
       props.put("max.poll.interval.ms", 300000);
       props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  //键反序列化方式
       props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
       this.consumer = new KafkaConsumer&lt;&gt;(props);
   }

这里封装kafka client的必要信息,入参bootServer为kafka集群ip,groupId为threadName,我们规定一个线程为一个kafka消费链接,消费一个topic。

上一篇线程池保证了任务不会轻易挂掉,就算挂掉了也会重新提交,所以为了节省资源不做所谓的同groupId的负载操作。session.timeout.ms和max.poll.interval.ms可以根据当前的kafka资源灵活配置,不然可能会引发一些reblance。

enable.auto.commit设置为false,手动提交offset,auto.offset.reset这块由于业务特殊,本来就是流式图表瞬时的展示,如果真的出现了数据丢失那就丢了吧,从最新的数据读取。

接下来只需要处理下消费逻辑,consumer.subscribe(Collections.singletonList(this.topic))开始订阅监听kafka数据,搞一个while true不断的消费数据,try catch只需要对WakeupException做处理,kafka客户端会在关闭的时候抛出WakeupException异常。

finally里提交offset,无论这条offset对应的数据消费成功还是失败都是消费过了,失败了就过去了。

   @Override
   public void run() {
   consumer.subscribe(Collections.singletonList(this.topic));
   String key = "stream_chart:" + this.name;
   Thread.currentThread().setName(key);
   try {
      while (true) {
         ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
         // 如果队列中没有消息 等待KAFKA_TIME_OUT后调用poll,如果有消息立即消费
         for (ConsumerRecord<String, String> record : records) {
            String value = record.value();
            log.info("线程 {} 消费kafka数据 -> {} \n", Thread.currentThread().getName(), value);
            RedisConfig.getRedisTemplate().opsForZSet().add(key, value, Instant.now().getEpochSecond() * 1000);
         }
      }
   } catch (WakeupException e) {
      log.info("ignore for shutdown", e);
   } finally {
      consumer.commitAsync();
   }
}

我们消费到数据直接放到redis的zset结构里,当前的时间戳作为score,最后留一个关闭客户端的后门

// 退出后关掉客户端
public void shutDown() {
   consumer.wakeup();
}

任务提交

任务提交这块只需要在业务service中注入线程池,创建对应的KafkaRunable任务封装对应的信息,执行execute即可。

这里有个坑需要注意下,第二次突击检查八股文,线程池提交方法submitexecute的区别说一下。不知道的立刻去熟读并背诵。

public class TestTheadPool {
    public static void main(String[] args) {
        ExecutorService executorService= Executors.newFixedThreadPool(1);
        executorService.submit(new task("submit"));
        executorService.execute(new task("execute"));
    }
}
class task implements  Runnable{
    private String name;
    public task(String name) {
        this.name = name;
    }
    @Override
    public void run() {
        System.out.println(this.name + " start task");
        int i=1/0;
    }
}

熟悉的同学通过示例代码可以看出来,submit提交的线程不会抛出异常代码,只有获取Future返回值并执行get方法才会捕获到异常。这块涉及到异步的东西不再赘述

try {
    Future<?> submit = executorService.submit(new task("submit"));
    submit.get();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}

所以我们要使用execute执行,不然kafka消费线程里消费失败了拦截不到就不会被重新提交,导致线程挂掉。

以上就是流式图表拒绝增删改查之kafka核心消费逻辑下篇的详细内容,更多关于kafka消费流式图表的资料请关注编程网其它相关文章!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

流式图表拒绝增删改查之kafka核心消费逻辑下篇

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

流式图表拒绝增删改查之kafka核心消费逻辑下篇

这篇文章主要为大家介绍了流式图表拒绝增删改查之kafka核心消费逻辑讲解的下篇,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
2023-05-15

流式图表拒绝增删改查之kafka核心消费逻辑上篇

这篇文章主要为大家介绍了流式图表拒绝增删改查之kafka核心消费逻辑详解的上篇,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
2023-05-15

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录