我的编程空间,编程开发者的网络收藏夹
学习永远不晚

如何进行Flink Checkpoint问题排查

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

如何进行Flink Checkpoint问题排查

这篇文章给大家介绍如何进行Flink Checkpoint问题排查,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。

在 Flink 中,状态可靠性保证由 Checkpoint 支持,当作业出现 failover 的情况下,Flink 会从最近成功的

Checkpoint 恢复。在实际情况中,我们可能会遇到 Checkpoint 失败,或者 Checkpoint 慢的情况,本文会统一聊一聊Flink 中 Checkpoint 异常的情况(包括失败和慢),以及可能的原因和排查思路。

1. Checkpoint 流程简介
首先我们需要了解 Flink 中 Checkpoint 的整个流程是怎样的,在了解整个流程之后,我们才能在出问题的时候,更好的进行定位分析。

如何进行Flink Checkpoint问题排查

从上图我们可以知道,Flink 的 Checkpoint 包括如下几个部分:

1.JM trigger checkpoint
2.Source 收到 trigger checkpoint 的 PRC,自己开始做 snapshot,并往下游发送 barrier
3.下游接收 barrier(需要 barrier 都到齐才会开始做 checkpoint)
4.Task 开始同步阶段 snapshot
5.Task 开始异步阶段 snapshot
6.Task snapshot 完成,汇报给 JM
上面的任何一个步骤不成功,整个 checkpoint 都会失败。

**2 Checkpoint 异常情况排查
2.1 Checkpoint 失败**
可以在 Checkpoint 界面看到如下图所示,下图中 Checkpoint 10423 失败了。

如何进行Flink Checkpoint问题排查

点击 Checkpoint 10423 的详情,我们可以看到类系下图所示的表格(下图中将 operator 名字截取掉了)。

如何进行Flink Checkpoint问题排查

上图中我们看到三行,表示三个 operator,其中每一列的含义分别如下:
1.其中 Acknowledged 一列表示有多少个 subtask 对这个 Checkpoint 进行了 ack,从图中我们可以知道第三个 operator 总共有 5 个 subtask,但是只有 4 个进行了 ack;
2.第二列 Latest Acknowledgement 表示该 operator 的所有 subtask 最后 ack 的时间;
3.End to End Duration 表示整个 operator 的所有 subtask 中完成 snapshot 的最长时间;
4.State Size 表示当前 Checkpoint 的 state 大小 – 主要这里如果是增量 checkpoint 的话,则表示增量大小;
5.Buffered During Alignment 表示在 barrier 对齐阶段积攒了多少数据,如果这个数据过大也间接表示对齐比较慢);
Checkpoint 失败大致分为两种情况:Checkpoint Decline 和 Checkpoint Expire。

2.1.1 Checkpoint Decline
我们能从 jobmanager.log 中看到类似下面的日志
Decline checkpoint 10423 by task 0b60f08bf8984085b59f8d9bc74ce2e1 of job 85d268e6fbc19411185f7e4868a44178. 其中
10423 是 checkpointID,0b60f08bf8984085b59f8d9bc74ce2e1 是 execution id,85d268e6fbc19411185f7e4868a44178 是 job id,我们可以在 jobmanager.log 中查找 execution id,找到被调度到哪个 taskmanager 上,类似如下所示:

019-09-02 16:26:20,972 INFO  [jobmanager-future-thread-61] org.apache.flink.runtime.executiongraph.ExecutionGraph        - XXXXXXXXXXX (100/289) (87b751b1fd90e32af55f02bb2f9a9892) switched from SCHEDULED to DEPLOYING.2019-09-02 16:26:20,972 INFO  [jobmanager-future-thread-61] org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying XXXXXXXXXXX (100/289) (attempt #0) to slot container_e24_1566836790522_8088_04_013155_1 on hostnameABCDE

从上面的日志我们知道该 execution 被调度到 hostnameABCDE 的 container_e24_1566836790522_8088_04_013155_1 slot 上,接下来我们就可以到 container container_e24_1566836790522_8088_04_013155 的 taskmanager.log 中查找 Checkpoint 失败的具体原因了。

另外对于 Checkpoint Decline 的情况,有一种情况我们在这里单独抽取出来进行介绍:Checkpoint Cancel。

当前 Flink 中如果较小的 Checkpoint 还没有对齐的情况下,收到了更大的 Checkpoint,则会把较小的 Checkpoint 给取消掉。我们可以看到类似下面的日志:

$taskNameWithSubTaskAndID: Received checkpoint barrier for checkpoint 20 before completing current checkpoint 19. Skipping current checkpoint.

这个日志表示,当前 Checkpoint 19 还在对齐阶段,我们收到了 Checkpoint 20 的 barrier。然后会逐级通知到下游的 task checkpoint 19 被取消了,同时也会通知 JM 当前 Checkpoint 被 decline 掉了。

在下游 task 收到被 cancelBarrier 的时候,会打印类似如下的日志:

DEBUG$taskNameWithSubTaskAndID: Checkpoint 19 canceled, aborting alignment.或者DEBUG$taskNameWithSubTaskAndID: Checkpoint 19 canceled, skipping alignment.或者WARN$taskNameWithSubTaskAndID: Received cancellation barrier for checkpoint 20 before completing current checkpoint 19. Skipping current checkpoint.

上面三种日志都表示当前 task 接收到上游发送过来的 barrierCancel 消息,从而取消了对应的 Checkpoint。

2.1.2 Checkpoint Expire
如果 Checkpoint 做的非常慢,超过了 timeout 还没有完成,则整个 Checkpoint 也会失败。当一个 Checkpoint 由于超时而失败是,会在 jobmanager.log 中看到如下的日志:

Checkpoint 1 of job 85d268e6fbc19411185f7e4868a44178  expired before completing.

表示 Chekpoint 1 由于超时而失败,这个时候可以可以看这个日志后面是否有类似下面的日志:

Received late message for now expired checkpoint attempt 1 from 0b60f08bf8984085b59f8d9bc74ce2e1 of job 85d268e6fbc19411185f7e4868a44178.

可以按照 2.1.1 中的方法找到对应的 taskmanager.log 查看具体信息。

下面的日志如果是 DEBUG 的话,我们会在开始处标记 DEBUG

我们按照下面的日志把 TM 端的 snapshot 分为三个阶段,开始做 snapshot 前,同步阶段,异步阶段:

DEBUGStarting checkpoint (6751) CHECKPOINT on task taskNameWithSubtasks (4/4)

这个日志表示 TM 端 barrier 对齐后,准备开始做 Checkpoint。

DEBUG2019-08-06 13:43:02,613 DEBUG org.apache.flink.runtime.state.AbstractSnapshotStrategy       - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@70442baf, checkpointDirectory=xxxxxxxx, sharedStateDirectory=xxxxxxxx, taskOwnedStateDirectory=xxxxxx, metadataFilePath=xxxxxx, reference=(default), fileStateSizeThreshold=1024}, synchronous part) in thread Thread[Async calls on Source: xxxxxx_source -> Filter (27/70),5,Flink Task Threads] took 0 ms.

上面的日志表示当前这个 backend 的同步阶段完成,共使用了 0 ms。

DEBUGDefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@7908affe, checkpointDirectory=xxxxxx, sharedStateDirectory=xxxxx, taskOwnedStateDirectory=xxxxx,  metadataFilePath=xxxxxx, reference=(default), fileStateSizeThreshold=1024}, asynchronous part) in thread Thread[pool-48-thread-14,5,Flink Task Threads] took 369 ms

上面的日志表示异步阶段完成,异步阶段使用了 369 ms

在现有的日志情况下,我们通过上面三个日志,定位 snapshot 是开始晚,同步阶段做的慢,还是异步阶段做的慢。然后再按照情况继续进一步排查问题。

2.2 Checkpoint 慢
在 2.1 节中,我们介绍了 Checkpoint 失败的排查思路,本节会分情况介绍 Checkpoint 慢的情况。

Checkpoint 慢的情况如下:比如 Checkpoint interval 1 分钟,超时 10 分钟,Checkpoint 经常需要做 9 分钟(我们希望 1 分钟左右就能够做完),而且我们预期 state size 不是非常大。

对于 Checkpoint 慢的情况,我们可以按照下面的顺序逐一检查。

2.2.0 Source Trigger Checkpoint 慢
这个一般发生较少,但是也有可能,因为 source 做 snapshot 并往下游发送 barrier 的时候,需要抢锁(这个现在社区正在进行用 mailBox 的方式替代当前抢锁的方式,详情参考[1])。如果一直抢不到锁的话,则可能导致 Checkpoint 一直得不到机会进行。如果在 Source 所在的 taskmanager.log 中找不到开始做 Checkpoint 的 log,则可以考虑是否属于这种情况,可以通过 jstack 进行进一步确认锁的持有情况。

2.2.1 使用增量 Checkpoint
现在 Flink 中 Checkpoint 有两种模式,全量 Checkpoint 和 增量 Checkpoint,其中全量 Checkpoint 会把当前的 state 全部备份一次到持久化存储,而增量 Checkpoint,则只备份上一次 Checkpoint 中不存在的 state,因此增量 Checkpoint 每次上传的内容会相对更好,在速度上会有更大的优势。

现在 Flink 中仅在 RocksDBStateBackend 中支持增量 Checkpoint,如果你已经使用 RocksDBStateBackend,可以通过开启增量 Checkpoint 来加速,具体的可以参考 [2]。

2.2.2 作业存在反压或者数据倾斜
我们知道 task 仅在接受到所有的 barrier 之后才会进行 snapshot,如果作业存在反压,或者有数据倾斜,则会导致全部的 channel 或者某些 channel 的 barrier 发送慢,从而整体影响 Checkpoint 的时间,这两个可以通过如下的页面进行检查:

如何进行Flink Checkpoint问题排查

上图中我们选择了一个 task,查看所有 subtask 的反压情况,发现都是 high,表示反压情况严重,这种情况下会导致下游接收 barrier 比较晚。

如何进行Flink Checkpoint问题排查

上图中我们选择其中一个 operator,点击所有的 subtask,然后按照 Records Received/Bytes Received/TPS 从大到小进行排序,能看到前面几个 subtask 会比其他的 subtask 要处理的数据多。

如果存在反压或者数据倾斜的情况,我们需要首先解决反压或者数据倾斜问题之后,再查看 Checkpoint 的时间是否符合预期。

2.2.2 Barrier 对齐慢
从前面我们知道 Checkpoint 在 task 端分为 barrier 对齐(收齐所有上游发送过来的 barrier),然后开始同步阶段,再做异步阶段。如果 barrier 一直对不齐的话,就不会开始做 snapshot。

barrier 对齐之后会有如下日志打印:

DEBUGStarting checkpoint (6751) CHECKPOINT on task taskNameWithSubtasks (4/4)

如果 taskmanager.log 中没有这个日志,则表示 barrier 一直没有对齐,接下来我们需要了解哪些上游的 barrier 没有发送下来,如果你使用 At Least Once 的话,可以观察下面的日志:

DEBUGReceived barrier for checkpoint 96508 from channel 5

表示该 task 收到了 channel 5 来的 barrier,然后看对应 Checkpoint,再查看还剩哪些上游的 barrier 没有接受到,对于 ExactlyOnce 暂时没有类似的日志,可以考虑自己添加,或者 jmap 查看。

2.2.3 主线程太忙,导致没机会做 snapshot
在 task 端,所有的处理都是单线程的,数据处理和 barrier 处理都由主线程处理,如果主线程在处理太慢(比如使用 RocksDBBackend,state 操作慢导致整体处理慢),导致 barrier 处理的慢,也会影响整体 Checkpoint 的进度,在这一步我们需要能够查看某个 PID 对应 hotmethod,这里推荐两个方法:

1.多次连续 jstack,查看一直处于 RUNNABLE 状态的线程有哪些;
2.使用工具 AsyncProfile dump 一份火焰图,查看占用 CPU 最多的栈;
如果有其他更方便的方法当然更好,也欢迎推荐。

2.2.4 同步阶段做的慢
同步阶段一般不会太慢,但是如果我们通过日志发现同步阶段比较慢的话,对于非 RocksDBBackend 我们可以考虑查看是否开启了异步 snapshot,如果开启了异步 snapshot 还是慢,需要看整个 JVM 在干嘛,也可以使用前一节中的工具。对于 RocksDBBackend 来说,我们可以用 iostate 查看磁盘的压力如何,另外可以查看 tm 端 RocksDB 的 log 的日志如何,查看其中 SNAPSHOT 的时间总共开销多少。

RocksDB 开始 snapshot 的日志如下:

2019/09/10-14:22:55.734684 7fef66ffd700 [utilities/checkpoint/checkpoint_impl.cc:83] Started the snapshot process -- creating snapshot in directory /tmp/flink-io-87c360ce-0b98-48f4-9629-2cf0528d5d53/XXXXXXXXXXX/chk-92729

snapshot 结束的日志如下:

2019/09/10-14:22:56.001275 7fef66ffd700 [utilities/checkpoint/checkpoint_impl.cc:145] Snapshot DONE. All is good

2.2.5 异步阶段做的慢
对于异步阶段来说,tm 端主要将 state 备份到持久化存储上,对于非 RocksDBBackend 来说,主要瓶颈来自于网络,这个阶段可以考虑观察网络的 metric,或者对应机器上能够观察到网络流量的情况(比如 iftop)。

对于 RocksDB 来说,则需要从本地读取文件,写入到远程的持久化存储上,所以不仅需要考虑网络的瓶颈,还需要考虑本地磁盘的性能。另外对于 RocksDBBackend 来说,如果觉得网络流量不是瓶颈,但是上传比较慢的话,还可以尝试考虑开启多线程上传功能[3]。

3 总结
在第二部分内容中,我们介绍了官方编译的包的情况下排查一些 Checkpoint 异常情况的主要场景,以及相应的排查方法,如果排查了上面所有的情况,还是没有发现瓶颈所在,则可以考虑添加更详细的日志,逐步将范围缩小,然后最终定位原因。

上文提到的一些 DEBUG 日志,如果 flink dist 包是自己编译的话,则建议将 Checkpoint 整个步骤内的一些 DEBUG 改为 INFO,能够通过日志了解整个 Checkpoint 的整体阶段,什么时候完成了什么阶段,也在 Checkpoint 异常的时候,快速知道每个阶段都消耗了多少时间。

关于如何进行Flink Checkpoint问题排查就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

如何进行Flink Checkpoint问题排查

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

sql运行缓慢问题如何排查

要排查SQL运行缓慢的问题,可以尝试以下几个步骤:1. 检查查询语句:确保查询语句写得正确且优化良好。可以对查询进行优化,例如使用合适的索引、避免使用不必要的连接、减少子查询等。2. 检查索引:确保表中的列上有适当的索引。可以使用数据库的索
2023-10-21

linux中如何排查Java问题

小编给大家分享一下linux中如何排查Java问题,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!Linux命令类tail最常用的tail -fgrepawk1 基
2023-06-16

maven依赖问题如何排查

在排查Maven依赖问题时,可以尝试以下几个步骤:1. 检查pom.xml文件:首先检查项目的pom.xml文件,确保依赖项正确地被添加到了标签中,并且没有任何拼写错误或者格式错误。2. 清理本地仓库:有时候Maven本地仓库中的依赖可能损
2023-09-17

mongo慢查询问题如何排查处理

这篇文章主要介绍了mongo慢查询问题如何排查处理的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇mongo慢查询问题如何排查处理文章都会有所收获,下面我们一起来看看吧。一、简单介绍mongo语句查询条件中出现n
2023-06-29

kubernetes k8s常用问题如何排查

这篇文章主要介绍了kubernetes k8s常用问题如何排查的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇kubernetes k8s常用问题如何排查文章都会有所收获,下面我们一起来看看吧。Pod 的那些状态
2023-07-02

java线程卡死问题如何排查

java线程卡死问题的排查可以遵循以下步骤:1. 查看线程堆栈:使用jstack命令或者在IDE中查看线程堆栈,定位到卡死的线程。查看线程的状态、持有的锁和调用栈信息,判断是否是由于死锁或者线程阻塞导致的卡死。2. 查看日志:检查应用程序的
2023-08-24

java内存溢出问题如何排查

Java内存溢出问题的排查可以按照以下步骤进行:1. 观察错误信息:当Java发生内存溢出时,通常会抛出`java.lang.OutOfMemoryError`异常,并伴随有错误堆栈跟踪信息。仔细阅读错误信息,可以发现导致内存溢出的原因和位
2023-09-29

Node.js子线程Crash问题如何排查

这篇文章主要介绍“Node.js子线程Crash问题如何排查”,在日常操作中,相信很多人在Node.js子线程Crash问题如何排查问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Node.js子线程Crash
2023-07-02

linux异常关机问题如何排查

要排查Linux异常关机问题,可以按照以下步骤进行:1. 检查系统日志:查看/var/log目录下的日志文件,特别是syslog和kern.log文件,看是否有任何异常或错误信息。可以使用命令如下:```sudo tail -n 100 /
2023-08-31

Linux系统CPU占用率较高问题怎么进行排查

本篇内容介绍了“Linux系统CPU占用率较高问题怎么进行排查”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!很多运维的同学遇到这种状况往往会
2023-06-16

MariaDB如何进行故障排查和修复

MariaDB 是一个流行的开源关系型数据库管理系统,可以使用一些工具和技术来进行故障排查和修复。以下是一些常见的方法:使用日志文件:MariaDB 会生成多个日志文件,包括错误日志、查询日志和慢查询日志等。通过查看这些日志文件,可以帮助确
MariaDB如何进行故障排查和修复
2024-04-09

tomcat启动时卡住问题如何排查

本篇内容主要讲解“tomcat启动时卡住问题如何排查”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“tomcat启动时卡住问题如何排查”吧!正常项目无法访问(Linux 服务器),启动tomcat
2023-07-05

如何进行linux Nautilus问题解答

今天就跟大家聊聊有关如何进行linux Nautilus问题解答,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。1.linux问题解答之简述Nautilus的功能。Nautilus 是
2023-06-17

编程热搜

目录