我的编程空间,编程开发者的网络收藏夹
学习永远不晚

java中Pulsar InterruptedException异常怎么解决

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

java中Pulsar InterruptedException异常怎么解决

本篇内容主要讲解“java中Pulsar InterruptedException异常怎么解决”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“java中Pulsar InterruptedException异常怎么解决”吧!

背景

java中Pulsar InterruptedException异常怎么解决

今天收到业务团队反馈线上有个应用往 Pulsar 中发送消息失败了,经过日志查看得知是发送消息时候抛出了 java.lang.InterruptedException 异常。

和业务沟通后得知是在一个 gRPC 接口中触发的消息发送,大约持续了半个小时的异常后便恢复正常了,这是整个问题的背景。

前置排查

拿到该问题后首先排查下是否是共性问题,查看了其他的应用没有发现类似的异常;同时也查看了 Pulsar broker 的监控大盘,在这个时间段依然没有波动和异常;

这样可以初步排除是 Pulsar 服务端的问题。

接着便是查看应用那段时间的负载情况,从应用 QPS 到 JVM 的各个内存情况依然没发现有什么明显的变化。

Pulsar 源码排查

既然看起来应用本身和 Pulsar broker 都没有问题的话那就只能从异常本身来排查了。

首先第一步要得知具体使用的是 Pulsar-client 是版本是多少,因为业务使用的是内部基于官方 SDK 封装 springboot starter 所以第一步还得排查这个 starter 是否有影响。

通过查看源码基本排除了 starter 的嫌疑,里面只是简单的封装了 SDK 的功能而已。

org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: java.lang.InterruptedException at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1027) at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:91) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: java.lang.InterruptedException at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:89) ... 49 common frames omitted Caused by: org.apache.pulsar.client.api.PulsarClientException: java.lang.InterruptedException at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:775) at org.apache.pulsar.client.impl.ProducerImpl.sendAsync$original$BWm7PPlZ(ProducerImpl.java:393) at org.apache.pulsar.client.impl.ProducerImpl.sendAsync$original$BWm7PPlZ$accessor$i7NYMN6i(ProducerImpl.java) at org.apache.pulsar.client.impl.ProducerImpl$auxiliary$EfuVvJLT.call(Unknown Source) at org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstMethodsInter.intercept(InstMethodsInter.java:86) at org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java) at org.apache.pulsar.client.impl.ProducerImpl.internalSendAsync(ProducerImpl.java:292) at org.apache.pulsar.client.impl.ProducerImpl.internalSendWithTxnAsync(ProducerImpl.java:363) at org.apache.pulsar.client.impl.PartitionedProducerImpl.internalSendWithTxnAsync(PartitionedProducerImpl.java:191) at org.apache.pulsar.client.impl.PartitionedProducerImpl.internalSendAsync(PartitionedProducerImpl.java:167) at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.sendAsync(TypedMessageBuilderImpl.java:103) at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:82) ... 49 common frames omitted Caused by: java.lang.InterruptedException: nullat java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1343) at java.base/java.util.concurrent.Semaphore.acquire(Semaphore.java:318) at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:758)

接下来便只能是分析堆栈了,因为 Pulsar-client 的部分实现源码是没有直接打包到依赖中的,反编译的话许多代码行数对不上,所以需要将官方的源码拉到本地,切换到对于的分支进行查看。

这一步稍微有点麻烦,首先是代码库还挺大的,加上之前如果没有准备好 Pulsar 的开发环境的话估计会劝退一部分人;

但其实大部分问题都是网络造成的,只要配置一些 Maven 镜像多试几次总会编译成功。

我这里直接将分支切换到 branch-2.8

从堆栈的顶部开始排查 TypedMessageBuilderImpl.java:91

java中Pulsar InterruptedException异常怎么解决

看起来是内部异步发送消息的时候抛了异常。

接着往下看到这里:

java.lang.InterruptedException at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:775) at

java中Pulsar InterruptedException异常怎么解决

看起来是这里没错,但是代码行数明显不对;因为 2.8 这个分支也是修复过几个版本,所以中间有修改导致代码行数与最新代码对不上也正常。

semaphore.get().acquire();

不过初步来看应该是这行代码抛出的线程终端异常,这里看起来只有他最有可能了。

java中Pulsar InterruptedException异常怎么解决

为了确认是否是真的是这行代码,这个文件再往前翻了几个版本最终确认了就是这行代码没错了。

我们点开java.util.concurrent.Semaphore#acquire()的源码,

        public void acquire() throws InterruptedException {        sync.acquireSharedInterruptibly(1);    }    public final void acquireSharedInterruptibly(int arg)        throws InterruptedException {        if (Thread.interrupted() ||            (tryAcquireShared(arg) < 0 &&             acquire(null, arg, true, true, false, 0L) < 0))            throw new InterruptedException();    }

通过源码会发现 acquire() 函数确实会响应中断,一旦检测到当前线程被中断后便会抛出 InterruptedException 异常。

定位问题

所以问题的原因基本确定了,就是在 Pulsar 的发送消息线程被中断了导致的,但为啥会被中断还需要继续排查。

我们知道线程中断是需要调用 Thread.currentThread().interrupt(); API的,首先猜测是否 Pulsar 客户端内部有个线程中断了这个发送线程。

于是我在 pulsar-client 这个模块中搜索了相关代码:

java中Pulsar InterruptedException异常怎么解决

排除掉和 producer 不相关的地方,其余所有中断线程的代码都是在有了该异常之后继续传递而已;所以初步来看 pulsar-client 内部没有主动中断的操作。

既然 Pulsar 自己没有做,那就只可能是业务做的了?

于是我在业务代码中搜索了一下:

java中Pulsar InterruptedException异常怎么解决

果然在业务代码中搜到了唯一一处中断的地方,而且通过调用关系得知这段代码是在消息发送前执行的,并且和 Pulsar 发送函数处于同一线程。

大概的伪代码如下:

        List.of(1, 2, 3).stream().map(e -> {                    return CompletableFuture.supplyAsync(() -> {                        try {                            TimeUnit.MILLISECONDS.sleep(10);                        } catch (InterruptedException ex) {                            throw new RuntimeException(ex);                        }                        return e;                    });                }        ).collect(Collectors.toList()).forEach(f -> {            try {                Integer integer = f.get();                log.info("====" + integer);                if (integer==3){                    TimeUnit.SECONDS.sleep(10);                    Thread.currentThread().interrupt();                }            } catch (InterruptedException e) {                throw new RuntimeException(e);            } catch (ExecutionException e) {                throw new RuntimeException(e);            }        });   MessageId send = producer.newMessage().value(msg.getBytes()).send();

执行这段代码可以完全复现同样的堆栈。

幸好中断这里还打得有日志:

java中Pulsar InterruptedException异常怎么解决

java中Pulsar InterruptedException异常怎么解决

通过日志搜索发现异常的时间和这个中断的日志时间点完全重合,这样也就知道根本原因了。

因为业务线程和消息发送线程是同一个,在某些情况下会执行 Thread.currentThread().interrupt();,其实单纯执行这行函数并不会发生什么,只要没有去响应这个中断,也就是 Semaphore 源码中的判断了线程中断的标记:

    public final void acquireSharedInterruptibly(int arg)        throws InterruptedException {        if (Thread.interrupted() ||            (tryAcquireShared(arg) < 0 &&             acquire(null, arg, true, true, false, 0L) < 0))            throw new InterruptedException();    }

但恰好这里业务中断后自己并没有去判断这个标记,导致 Pulsar 内部去判断了,最终抛出了这个异常。

到此,相信大家对“java中Pulsar InterruptedException异常怎么解决”有了更深的了解,不妨来实际操作一番吧!这里是编程网网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

java中Pulsar InterruptedException异常怎么解决

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

java中Pulsar InterruptedException异常怎么解决

本篇内容主要讲解“java中Pulsar InterruptedException异常怎么解决”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“java中Pulsar InterruptedExce
2023-07-05

java中Pulsar InterruptedException 异常

这篇文章主要为大家介绍了java中Pulsar InterruptedException 异常分析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
2023-02-23

Java中RuntimeException异常怎么解决

在Java中,RuntimeException是一种unchecked异常,它们在编译时不需要显式地处理。然而,为了提高程序的健壮性和可靠性,可以采取以下几种方式来解决RuntimeException异常:1. 使用try-catch块捕获
2023-08-08

Java常见异常怎么解决

本篇内容介绍了“Java常见异常怎么解决”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!1、java.lang.NullPointerExce
2023-06-02

Java NegativeArraySizeException异常怎么解决

Java中的NegativeArraySizeException异常表示数组大小为负数,这是因为在创建数组时指定了一个负数大小。要解决NegativeArraySizeException异常,您需要确保您的代码中不会出现负数大小的数组。以
Java NegativeArraySizeException异常怎么解决
2024-03-06

怎么在java中使用BigDecimal解决异常

本篇文章为大家展示了怎么在java中使用BigDecimal解决异常,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。Java的优点是什么1. 简单,只需理解基本的概念,就可以编写适合于各种情况的应用程
2023-06-14

java中jdbctemplate空指针异常怎么解决

在Java中使用JdbcTemplate时,可能会出现空指针异常。要解决空指针异常,可以按照以下步骤进行:1. 检查JdbcTemplate对象是否已正确初始化。确保在使用JdbcTemplate之前,已经正确实例化了JdbcTemplat
java中jdbctemplate空指针异常怎么解决
2023-10-30

java connection使用异常怎么解决

在Java中,使用java.sql.Connection对象连接数据库时可能会出现异常。以下是处理异常的一些常见方法:1. 检查数据库驱动程序是否正确加载:在使用数据库连接之前,确保已正确加载数据库驱动程序。可以使用Class.forNam
2023-10-23

java integer.parseint使用异常怎么解决

在使用Integer.parseInt()方法时,如果字符串不能转换为整数,则会抛出NumberFormatException异常。为了解决这个异常,你可以使用异常处理机制来捕获并处理异常。下面是一个使用try-catch块来处理Numb
java integer.parseint使用异常怎么解决
2024-02-29

java中ClassCastException异常如何解决

在Java中,ClassCastException异常是指在强制类型转换时发生的异常。这通常发生在试图将一个对象转换为与其不兼容的类型时。要解决ClassCastException异常,你可以采取以下几种方法:1. 检查强制类型转换前对象的
2023-08-23

java中EOFException异常如何解决

在Java中,EOFException是指在读取输入流的过程中遇到了文件的结尾。通常,EOFException发生在使用DataInputStream或ObjectInputStream读取数据时,当读取到文件的结尾时,就会抛出EOFExc
2023-09-04

java类型转换异常怎么解决

Java类型转换异常通常是由于将一个不兼容的类型赋给另一个类型造成的。解决这个异常的方法有以下几种:1. 检查类型是否兼容:在进行类型转换之前,先检查两个类型是否兼容。可以使用`instanceof`关键字来检查对象是否属于某个类型。```
2023-08-17

java concurrent集合类异常怎么解决

在Java中,Concurrent集合类是线程安全的,可以在多个线程之间进行并发操作。但是,使用Concurrent集合类时仍然可能会发生异常。解决这些异常的方法如下:使用正确的并发集合类:确保使用正确的并发集合类来满足你的并发需求。Jav
2023-10-26

在Java项目中运行异常处理时出现异常怎么解决

这篇文章将为大家详细讲解有关在Java项目中运行异常处理时出现异常怎么解决,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。 Java异常处理运行时异常(RuntimeException)详解
2023-05-31

java.lang.ArrayStoreException异常怎么解决

本篇内容主要讲解“java.lang.ArrayStoreException异常怎么解决”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“java.lang.ArrayStoreException异
2023-06-22

java mockito模拟抛出异常怎么解决

在使用Mockito模拟方法抛出异常时,可以使用Mockito的doThrow()方法来模拟方法抛出异常。下面是一个简单的示例:假设有一个UserService类,其中有一个方法getUserById(),当传入的id为null时会抛出N
java mockito模拟抛出异常怎么解决
2024-03-13

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录