Structured Streaming怎么使用checkpoint进行故障恢复
本篇文章给大家分享的是有关Structured Streaming怎么使用checkpoint进行故障恢复,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。
使用checkpoint进行故障恢复
如果发生故障或关机,可以恢复之前的查询的进度和状态,并从停止的地方继续执行。这是使用Checkpoint和预写日志完成的。您可以使用检查点位置配置查询,那么查询将将所有进度信息(即,每个触发器中处理的偏移范围)和运行聚合(例如,示例中的wordcount)保存到检查点位置。此检查点位置必须是HDFS兼容文件系统中的路径,并且可以在启动查询时将其设置为DataStreamWriter中的选项。
aggDF
.writeStream
.outputMode("complete")
.option("checkpointLocation", "path/to/HDFS/dir")
.format("memory")
.start()
具体测试代码如下:
val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream.queryName("aggregates").outputMode("complete").option("checkpointLocation", "memory/").format("memory").start()
spark.sql("select * from aggregates").show()
kill掉submit进行测试
以上就是Structured Streaming怎么使用checkpoint进行故障恢复,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注编程网行业资讯频道。
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341