我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Flink SQL怎么实现数据流的Join

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Flink SQL怎么实现数据流的Join

这篇文章主要介绍“Flink SQL怎么实现数据流的Join”,在日常操作中,相信很多人在Flink SQL怎么实现数据流的Join问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Flink SQL怎么实现数据流的Join”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

无论在 OLAP 还是 OLTP 领域,Join 都是业务常会涉及到且优化规则比较复杂的 SQL 语句。对于离线计算而言,经过数据库领域多年的积累,Join 语义以及实现已经十分成熟,然而对于近年来刚兴起的 Streaming SQL 来说 Join 却处于刚起步的状态。

其中最为关键的问题在于 Join 的实现依赖于缓存整个数据集,而 Streaming SQL Join 的对象却是无限的数据流,内存压力和计算效率在长期运行来说都是不可避免的问题。下文将结合 SQL 的发展解析 Flink SQL 是如何解决这些问题并实现两个数据流的 Join。

离线 Batch SQL Join 的实现

传统的离线 Batch SQL (面向有界数据集的 SQL)有三种基础的实现方式,分别是 Nested-loop Join、Sort-Merge Join 和 Hash Join。

  • Nested-loop Join 最为简单直接,将两个数据集加载到内存,并用内嵌遍历的方式来逐个比较两个数据集内的元素是否符合 Join 条件。Nested-loop Join 虽然时间效率以及空间效率都是最低的,但胜在比较灵活适用范围广,因此其变体 BNL 常被传统数据库用作为 Join 的默认基础选项。

  • Sort-Merge Join 顾名思义,分为两个 Sort 和 Merge 阶段。首先将两个数据集进行分别排序,然后对两个有序数据集分别进行遍历和匹配,类似于归并排序的合并。值得注意的是,Sort-Merge 只适用于 Equi-Join(Join 条件均使用等于作为比较算子)。Sort-Merge Join 要求对两个数据集进行排序,成本很高,通常作为输入本就是有序数据集的情况下的优化方案。

  • Hash Join 同样分为两个阶段,首先将一个数据集转换为 Hash Table,然后遍历另外一个数据集元素并与 Hash Table 内的元素进行匹配。第一阶段和第一个数据集分别称为 build 阶段和 build table,第二个阶段和第二个数据集分别称为 probe 阶段和 probe table。Hash Join 效率较高但对空间要求较大,通常是作为 Join 其中一个表为适合放入内存的小表的情况下的优化方案。和 Sort-Merge Join 类似,Hash Join 也只适用于 Equi-Join。

实时 Streaming SQL Join

相对于离线的 Join,实时 Streaming SQL(面向无界数据集的 SQL)无法缓存所有数据,因此 Sort-Merge Join 要求的对数据集进行排序基本是无法做到的,而 Nested-loop Join 和 Hash Join 经过一定的改良则可以满足实时 SQL 的要求。
我们通过例子来看基本的 Nested Join 在实时 Streaming SQL 的基础实现(案例及图来自 Piotr Nowojski 在 Flink Forward San Francisco 的分享[2])。

Flink SQL怎么实现数据流的Join

图1. Join-in-continuous-query-1

Table A 有 1、42 两个元素,Table B 有 42 一个元素,所以此时的 Join 结果会输出 42。

Flink SQL怎么实现数据流的Join

图2. Join-in-continuous-query-2

接着 Table B 依次接受到三个新的元素,分别是 7、3、1。因为 1 匹配到 Table A 的元素,因此结果表再输出一个元素 1。

Flink SQL怎么实现数据流的Join

图3. Join-in-continuous-query-3

随后 Table A 出现新的输入 2、3、6,3 匹配到 Table B 的元素,因此再输出 3 到结果表。

可以看到在 Nested-Loop Join 中我们需要保存两个输入表的内容,而随着时间的增长 Table A 和 Table B 需要保存的历史数据无止境地增长,导致很不合理的内存磁盘资源占用,而且单个元素的匹配效率也会越来越低。类似的问题也存在于 Hash Join 中。

那么有没有可能设置一个缓存剔除策略,将不必要的历史数据及时清理呢?答案是肯定的,关键在于缓存剔除策略如何实现,这也是 Flink SQL 提供的三种 Join 的主要区别。

Flink SQL 的 Join

  • Regular Join

Regular Join 是最为基础的没有缓存剔除策略的 Join。Regular Join 中两个表的输入和更新都会对全局可见,影响之后所有的 Join 结果。举例,在一个如下的 Join 查询里,Orders 表的新纪录会和 Product 表所有历史纪录以及未来的纪录进行匹配。

SELECT * FROM OrdersINNER JOIN ProductON Orders.productId = Product.id

因为历史数据不会被清理,所以 Regular Join 允许对输入表进行任意种类的更新操作(insert、update、delete)。然而因为资源问题 Regular Join 通常是不可持续的,一般只用做有界数据流的 Join。

  • Time-Windowed Join

Time-Windowed Join 利用窗口给两个输入表设定一个 Join 的时间界限,超出时间范围的数据则对 JOIN 不可见并可以被清理掉。值得注意的是,这里涉及到的一个问题是时间的语义,时间可以指计算发生的系统时间(即 Processing Time),也可以指从数据本身的时间字段提取的 Event Time。如果是 Processing Time,Flink 根据系统时间自动划分 Join 的时间窗口并定时清理数据;如果是 Event Time,Flink 分配 Event Time 窗口并依据 Watermark 来清理数据。

以更常用的 Event Time Windowed Join 为例,一个将 Orders 订单表和 Shipments 运输单表依据订单时间和运输时间 Join 的查询如下:

SELECT *FROM   Orders o,   Shipments sWHERE   o.id = s.orderId AND  s.shiptime BETWEEN o.ordertime AND o.ordertime + INTERVAL '4' HOUR

这个查询会为 Orders 表设置了 o.ordertime > s.shiptime- INTERVAL ‘4’ HOUR 的时间下界(图4)。

Flink SQL怎么实现数据流的Join

图4. Time-Windowed Join 的时间下界 - Orders 表

并为 Shipmenets 表设置了 s.shiptime >= o.ordertime 的时间下界(图5)。

Flink SQL怎么实现数据流的Join

图5. Time-Windowed Join 的时间下界 - Shipment 表

因此两个输入表都只需要缓存在时间下界以上的数据,将空间占用维持在合理的范围。

不过虽然底层实现上没有问题,但如何通过 SQL 语法定义时间仍是难点。尽管在实时计算领域 Event Time、Processing Time、Watermark 这些概念已经成为业界共识,但在 SQL 领域对时间数据类型的支持仍比较弱[4]。因此,定义 Watermark 和时间语义都需要通过编程 API 的方式完成,比如从 DataStream 转换至 Table ,不能单纯靠 SQL 完成。这方面的支持 Flink 社区计划通过拓展 SQL 方言来完成,感兴趣的读者可以通过 FLIP-66[7] 来追踪进度。

  • Temporal Table Join

虽然 Timed-Windowed Join 解决了资源问题,但也限制了使用场景: Join 两个输入流都必须有时间下界,超过之后则不可访问。这对于很多 Join 维表的业务来说是不适用的,因为很多情况下维表并没有时间界限。针对这个问题,Flink 提供了 Temporal Table Join 来满足用户需求。

Temporal Table Join 类似于 Hash Join,将输入分为 Build Table 和 Probe Table。前者一般是纬度表的 changelog,后者一般是业务数据流,典型情况下后者的数据量应该远大于前者。在 Temporal Table Join 中,Build Table 是一个基于 append-only 数据流的带时间版本的视图,所以又称为 Temporal Table。Temporal Table 要求定义一个主键和用于版本化的字段(通常就是 Event Time 时间字段),以反映记录在不同时间的内容。

比如典型的一个例子是对商业订单金额进行汇率转换。假设有一个 Orders 流记录订单金额,需要和 RatesHistory 汇率流进行 Join。RatesHistory 代表不同货币转为日元的汇率,每当汇率有变化时就会有一条更新记录。两个表在某一时间节点内容如下:

Flink SQL怎么实现数据流的Join

图6. Temporal Table Join Example]

我们将 RatesHistory 注册为一个名为 Rates 的 Temporal Table,设定主键为 currency,版本字段为 time。

Flink SQL怎么实现数据流的Join

图7. Temporal Table Registration]

此后给 Rates 指定时间版本,Rates 则会基于 RatesHistory 来计算符合时间版本的汇率转换内容。

Flink SQL怎么实现数据流的Join

图8. Temporal Table Content]

在 Rates 的帮助下,我们可以将业务逻辑用以下的查询来表达:

SELECT   o.amount * r.rateFROM  Orders o,  LATERAL Table(Rates(o.time)) rWHERE  o.currency = r.currency

值得注意的是,不同于在 Regular Join 和 Time-Windowed Join 中两个表是平等的,任意一个表的新记录都可以与另一表的历史记录进行匹配,在 Temporal Table Join 中,Temoparal Table 的更新对另一表在该时间节点以前的记录是不可见的。这意味着我们只需要保存 Build Side 的记录直到 Watermark 超过记录的版本字段。因为 Probe Side 的输入理论上不会再有早于 Watermark 的记录,这些版本的数据可以安全地被清理掉。

到此,关于“Flink SQL怎么实现数据流的Join”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注编程网网站,小编会继续努力为大家带来更多实用的文章!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Flink SQL怎么实现数据流的Join

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

Flink SQL怎么实现数据流的Join

这篇文章主要介绍“Flink SQL怎么实现数据流的Join”,在日常操作中,相信很多人在Flink SQL怎么实现数据流的Join问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Flink SQL怎么实现数据
2023-06-05

Flink流处理引擎之数据怎么抽取

本文小编为大家详细介绍“Flink流处理引擎之数据怎么抽取”,内容详细,步骤清晰,细节处理妥当,希望这篇“Flink流处理引擎之数据怎么抽取”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。一、CDCCDC (Cha
2023-06-30

SQL实现数据过滤流程详解

目录数据准备student 表过滤数据过滤单个值过滤null 值过滤集合BETWEEN IN NOT IN使用通配符过滤数据LIKE使用逻辑操作符组合WHERE子句数据准备student 表CREATE TABLE `student` (
2023-01-05

sql怎么实现数据检索

SQL(Structured Query Language)是一种用于管理关系数据库系统的语言。通过使用SQL查询语句,可以实现数据检索。下面是一些常见的SQL查询语句示例:SELECT语句:用于从数据库中检索数据。示例:SELECT
sql怎么实现数据检索
2024-04-09

flink怎么使用Event_time处理实时数据

本篇内容主要讲解“flink怎么使用Event_time处理实时数据”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“flink怎么使用Event_time处理实时数据”吧!//flink中关于时间
2023-06-02

怎么实现大数据处理引擎Spark与Flink比拼

今天就跟大家聊聊有关怎么实现大数据处理引擎Spark与Flink比拼,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。下一代大数据计算引擎  自从数据处理需求超过了传统数据库能有效处理的
2023-06-02

怎么使用SQL实现车流量的计算

这篇文章主要为大家展示了“怎么使用SQL实现车流量的计算”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“怎么使用SQL实现车流量的计算”这篇文章吧。卡口转换率将数据导入hive,通过SparkSq
2023-06-29

sql数据库升序排列怎么实现

在SQL中,可以使用"ORDER BY"子句对查询结果进行升序排序。以下是使用"ORDER BY"子句实现升序排序的示例:假设有一个名为"Students"的表,其中包含"Name"和"Age"两列。要按照"Age"列进行升序排列,可以使
2023-10-27

sql自动备份数据库怎么实现

可以通过编写一个定时脚本来实现自动备份数据库。以下是一个简单的示例脚本:创建一个备份脚本(backup.sh):#!/bin/bash# 设置数据库连接信息DB_USER="your_db_user"DB_PASS="your_db_p
sql自动备份数据库怎么实现
2024-04-09

MySQL与HBase在大数据流处理框架(如Flink)中的集成实践

MySQL和HBase是两个非常不同的数据库系统,分别用于不同的场景。MySQL是一个关系型数据库,适用于需要复杂查询和事务处理的场景;而HBase是一个分布式、可扩展的大数据存储系统,适用于需要高速读写和海量数据存储的场景。在大数据流处
MySQL与HBase在大数据流处理框架(如Flink)中的集成实践
2024-10-21

java怎么实现图片转化为数据流

这篇文章主要介绍了java怎么实现图片转化为数据流的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇java怎么实现图片转化为数据流文章都会有所收获,下面我们一起来看看吧。实现图片转化为数据流方法如下/** * C
2023-06-29

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录