我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Apache Hudi异步Clustering部署操作的方法

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Apache Hudi异步Clustering部署操作的方法

本文小编为大家详细介绍“Apache Hudi异步Clustering部署操作的方法”,内容详细,步骤清晰,细节处理妥当,希望这篇“Apache Hudi异步Clustering部署操作的方法”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。

1. 摘要

Clustering(聚簇)的表服务来重新组织数据来提供更好的查询性能,而不用降低摄取速度,并且我们已经知道如何部署同步Clustering,本篇博客中,我们将讨论近期社区做的一些改进以及如何通过HoodieClusteringJobDeltaStreamer工具来部署异步Clustering。

2. 介绍

通常讲,Clustering根据可配置的策略创建一个计划,根据特定规则对符合条件的文件进行分组,然后执行该计划。Hudi支持并发写入,并在多个表服务之间提供快照隔离,从而允许写入程序在后台运行Clustering时继续摄取。有关Clustering的体系结构的更详细概述请查看上一篇博文。

3. Clustering策略

如前所述Clustering计划和执行取决于可插拔的配置策略。这些策略大致可分为三类:计划策略、执行策略和更新策略。

3.1 计划策略

该策略在创建Clustering计划时发挥作用。它有助于决定应该对哪些文件组进行Clustering。让我们看一下Hudi提供的不同计划策略。请注意,使用此配置可以轻松地插拔这些策略。

  • SparkSizeBasedClusteringPlanStrategy:根据基本文件的小文件限制选择文件切片并创建Clustering组,最大大小为每个组允许的最大文件大小。可以使用此配置指定最大大小。此策略对于将中等大小的文件合并成大文件非常有用,以减少跨冷分区分布的大量文件。

  • SparkRecentDaysClusteringPlanStrategy:根据以前的N天分区创建一个计划,将这些分区中的小文件片进行Clustering,这是默认策略,当工作负载是可预测的并且数据是按时间划分时,它可能很有用。

  • SparkSelectedPartitionsClusteringPlanStrategy:如果只想对某个范围内的特定分区进行Clustering,那么无论这些分区是新分区还是旧分区,此策略都很有用,要使用此策略,还需要在下面设置两个配置(包括开始和结束分区):

hoodie.clustering.plan.strategy.cluster.begin.partitionhoodie.clustering.plan.strategy.cluster.end.partition

注意:所有策略都是分区感知的,后两种策略仍然受到第一种策略的大小限制的约束。

3.2 执行策略

在计划阶段构建Clustering组后,Hudi主要根据排序列和大小为每个组应用执行策略,可以使用此配置指定策略。

SparkSortAndSizeExecutionStrategy是默认策略。使用此配置进行Clustering时,用户可以指定数据排序列。除此之外我们还可以为Clustering产生的Parquet文件设置最大文件大小。该策略使用bulk_insert将数据写入新文件,在这种情况下,Hudi隐式使用一个分区器,该分区器根据指定列进行排序。通过这种策略改变数据布局,不仅提高了查询性能,而且自动平衡了重写开销。

现在该策略可以作为单个Spark作业或多个作业执行,具体取决于在计划阶段创建的Clustering组的数量。默认情况下Hudi将提交多个Spark作业并合并结果。如果要强制Hudi使用单Spark作业,请将执行策略类配置设置为SingleSparkJobExecutionStrategy

3.3 更新策略

目前只能为未接收任何并发更新的表/分区调度Clustering。默认情况下更新策略的配置设置为SparkRejectUpdateStrategy。如果某个文件组在Clustering期间有更新,则它将拒绝更新并引发异常。然而在某些用例中,更新是非常稀疏的,并且不涉及大多数文件组。简单拒绝更新的默认策略似乎不公平。在这种用例中用户可以将配置设置为SparkAllowUpdateStregy

我们讨论了关键策略配置,下面列出了与Clustering相关的所有其他配置。在此列表中一些非常有用的配置包括:

配置项解释默认值
hoodie.clustering.async.enabled启用在表上的异步运行Clustering服务。false
hoodie.clustering.async.max.commits通过指定应触发多少次提交来控制异步Clustering的频率。4
hoodie.clustering.preserve.commit.metadata重写数据时保留现有的_hoodie_commit_time。这意味着用户可以在Clustering数据上运行增量查询,而不会产生任何副作用。false

4. 异步Clustering

之前我们已经了解了用户如何设置同步Clustering。此外用户可以利用HoodiecClusteringJob设置两步异步Clustering。

4.1 HoodieClusteringJob

随着Hudi版本0.9.0的发布,我们可以在同一步骤中调度和执行Clustering。我们只需要指定-mode-m选项。有如下三种模式:

schedule(调度):制定一个Clustering计划。这提供了一个可以在执行模式下传递的instant

execute(执行):在给定的instant执行Clustering计划,这意味着这里需要instant

scheduleAndExecute(调度并执行):首先制定Clustering计划并立即执行该计划。

请注意要在原始写入程序仍在运行时运行作业请启用多写入:

hoodie.write.concurrency.mode=optimistic_concurrency_controlhoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider

使用spark submit命令提交HoodieClusteringJob示例如下:

spark-submit \--class org.apache.hudi.utilities.HoodieClusteringJob \/path/to/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.9.0-SNAPSHOT.jar \--props /path/to/config/clusteringjob.properties \--mode scheduleAndExecute \--base-path /path/to/hudi_table/basePath \--table-name hudi_table_schedule_clustering \--spark-memory 1g

clusteringjob.properties配置文件示例如下

hoodie.clustering.async.enabled=truehoodie.clustering.async.max.commits=4hoodie.clustering.plan.strategy.target.file.max.bytes=1073741824hoodie.clustering.plan.strategy.small.file.limit=629145600hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategyhoodie.clustering.plan.strategy.sort.columns=column1,column2

4.2 HoodieDeltaStreamer

接着看下如何使用HudiDeltaStreamer。现在我们可以使用DeltaStreamer触发异步Clustering。只需将hoodie.clustering.async.enabled为true,并在属性文件中指定其他Clustering配置,在启动Deltastreamer时可以将其位置设为-props(与HoodieClusteringJob配置类似)。

使用spark submit命令提交HoodieDeltaStreamer示例如下:

spark-submit \--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \/path/to/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.9.0-SNAPSHOT.jar \--props /path/to/config/clustering_kafka.properties \--schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \--source-class org.apache.hudi.utilities.sources.AvroKafkaSource \--source-ordering-field impresssiontime \--table-type COPY_ON_WRITE \--target-base-path /path/to/hudi_table/basePath \--target-table impressions_cow_cluster \--op INSERT \--hoodie-conf hoodie.clustering.async.enabled=true \--continuous

4.3 Spark Structured Streaming

我们还可以使用Spark结构化流启用异步Clustering,如下所示。

val commonOpts = Map(   "hoodie.insert.shuffle.parallelism" -> "4",   "hoodie.upsert.shuffle.parallelism" -> "4",   DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",   DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",   DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",   HoodieWriteConfig.TBL_NAME.key -> "hoodie_test")def getAsyncClusteringOpts(isAsyncClustering: String,                            clusteringNumCommit: String,                            executionStrategy: String):Map[String, String] = {   commonOpts + (DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE.key -> isAsyncClustering,           HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key -> clusteringNumCommit,           HoodieClusteringConfig.EXECUTION_STRATEGY_CLASS_NAME.key -> executionStrategy   )}def initStreamingWriteFuture(hudiOptions: Map[String, String]): Future[Unit] = {   val streamingInput = // define the source of streaming   Future {      println("streaming starting")      streamingInput              .writeStream              .format("org.apache.hudi")              .options(hudiOptions)              .option("checkpointLocation", basePath + "/checkpoint")              .mode(Append)              .start()              .awaitTermination(10000)      println("streaming ends")   }}def structuredStreamingWithClustering(): Unit = {   val df = //generate data frame   val hudiOptions = getClusteringOpts("true", "1", "org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy")   val f1 = initStreamingWriteFuture(hudiOptions)   Await.result(f1, Duration.Inf)}

读到这里,这篇“Apache Hudi异步Clustering部署操作的方法”文章已经介绍完毕,想要掌握这篇文章的知识点还需要大家自己动手实践使用过才能领会,如果想了解更多相关内容的文章,欢迎关注编程网行业资讯频道。

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Apache Hudi异步Clustering部署操作的方法

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

Apache Hudi异步Clustering部署操作的方法

本文小编为大家详细介绍“Apache Hudi异步Clustering部署操作的方法”,内容详细,步骤清晰,细节处理妥当,希望这篇“Apache Hudi异步Clustering部署操作的方法”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢
2023-06-29

Spring boot部署发布到linux的操作方法

限制条件:The default script supports most Linux distributions and is tested on CentOS and Ubuntu. Other platforms, such as O
2023-05-31

Qt之简单的异步操作实现方法

这篇文章主要介绍了Qt之简单的异步操作实现方法,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
2022-11-13

JavaScript前端超时异步操作的解决方法

今天就跟大家聊聊有关JavaScript前端超时异步操作的解决方法,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。自从 ECMAScript 的 Promise ES2015 和 as
2023-06-21

openstack云计算组件glance部署及操作的方法

这篇文章主要介绍“openstack云计算组件glance部署及操作的方法”,在日常操作中,相信很多人在openstack云计算组件glance部署及操作的方法问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”o
2023-06-30

React Suspense前后端IO异步操作处理的方法

这篇文章主要讲解了“React Suspense前后端IO异步操作处理的方法”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“React Suspense前后端IO异步操作处理的方法”吧!简单介
2023-07-02

Linux中安装部署Docker管理工具Drone的操作方法

今天就跟大家聊聊有关Linux中安装部署Docker管理工具Drone的操作方法,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。Drone 是一个构建在Docker之上的开源持续集成平
2023-06-12

thinkPHP利用ajax异步上传图片并显示、删除的操作方法

这篇文章主要介绍了thinkPHP利用ajax异步上传图片并显示、删除的操作方法,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。thinkPHP利用ajax异步上传图片并显示、
2023-06-14

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录