【Flink】第二十三篇:join 之 temporal join 您所在的位置:网站首页 joins的意思 【Flink】第二十三篇:join 之 temporal join

【Flink】第二十三篇:join 之 temporal join

#【Flink】第二十三篇:join 之 temporal join| 来源: 网络整理| 查看: 265

相关推荐:

【Flink】第十篇:join 之 regular join

【Flink】第十一篇:join 之 interval join

继以上 Flink Join 两篇文章之后探讨最后一类Flink的Join:temporal join。

传统 join 方式

传统的离线 Batch SQL (面向有界数据集的 SQL) 有三种基础的实现方式,分别是 Nested-loop Join、Sort-Merge Join 和 Hash Join。

1. Nested-loop Join 最为简单直接,将两个数据集加载到内存,并用内嵌遍历的方式来逐个比较两个数据集内的元素是否符合 Join 条件。Nested-loop Join 的时间效率以及空间效率都是最低的,可以使用:table.exec.disabled-operators:NestedLoopJoin 来禁用。

2. Sort-Merge Join 分为 Sort 和 Merge 两个阶段:首先将两个数据集进行分别排序,然后再对两个有序数据集分别进行遍历和匹配,类似于归并排序的合并。(Sort-Merge Join 要求对两个数据集进行排序,但是如果两个输入是有序的数据集,则可以作为一种优化方案)。

3. Hash Join 同样分为两个阶段:首先将一个数据集转换为 Hash Table,然后遍历另外一个数据集元素并与 Hash Table 内的元素进行匹配。

第一阶段和第一个数据集分别称为 build 阶段和 build table; 第二个阶段和第二个数据集分别称为 probe 阶段和 probe table。

Hash Join 效率较高但是对空间要求较大,通常是作为 Join 其中一个表为适合放入内存的小表的情况下的优化方案 (并不是不允许溢写磁盘)。

注意:Sort-Merge Join 和 Hash Join 只适用于 Equi-Join ( Join 条件均使用等于作为比较算子)。

Flink SQL 流批一体的核心是:流表二象性。围绕这一核心有若干概念,例如,动态表(Dynamic Table)/时态表(Temporal Table)、版本(Version)、版本表(Version Table)、普通表、连续查询、物化视图/虚拟视图、CDC(Change Data Capture)、Changelog Stream。

将流转换为动态表。 在动态表上计算一个连续查询,生成一个新的动态表。 生成的动态表被转换回流。

理解:流和表只是数据在特定场景下的两种形态(联想到光的波粒二象性?笔者已经傻傻分不清)

temporal join

Flink Join 主要包含:

Event Time Temporal JoinProcessing Time Temporal Join

语法(SQL 2011 标准):

SELECT [column_list] FROM table1 [AS ] [LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS ] ON table1.column-name1 = table2.column-name1

其中,

左表:任意表(探针侧,probe site)右表:版本表(versioned table)/普通表(构建侧,build side)

本文主要探索Event Time temporal join的一些设计特性,即右侧是版本表的join。

Event Time Temporal Join

一个典型的场景是订单和汇率,下方示例展示了一个append-only 订单表Orders 与一个不断改变的汇率表 RatesHistory 的 Join 操作:

SELECT * FROM Orders; rowtime amount currency ======= ====== ========= 10:15 2 Euro 10:30 1 US Dollar 10:32 50 Yen 10:52 3 Euro 11:04 5 US Dollar

RatesHistory 表示不断变化的汇率信息。汇率以日元为基准(即 Yen 永远为 1)。

SELECT * FROM RatesHistory; rowtime currency rate ======= ======== ====== 09:00 US Dollar 102 09:00 Euro 114 09:00 Yen 1 10:45 Euro 116 11:15 Euro 119 11:49 Pounds 108

基于上述信息,欲计算 Orders 表中所有交易量并全部转换成日元。

例如,09:00 到 10:45 间欧元对日元的汇率是 114,10:45 到 11:15 间为 116,10:45 以后是119。如果要将10:52的这笔订单进行汇率转换,最终选择 10:45这个版本,

由于temporal join设计很多特定的影响因素,以以下测试用例探索join规则:

左流(主表、探针侧):

create table left_upsert ( id string, op_ts timestamp(3), primary key(id) not enforced, watermark for op_ts as op_ts - intervcal '0' second ) with ( 'connector' = 'upsert-kafka', 'properties.bootstrap.servers' = '...', 'topic' = '...' 'key.format' = 'json', 'value.format' = 'json', 'properties.group.id' = '...' )

右流(维表、构建侧):

create table right_upsert ( id string, op_ts timestamp(3), primary key(id) not enforced, watermark for op_ts as op_ts - intervcal '0' second ) with ( 'connector' = 'upsert-kafka', 'properties.bootstrap.servers' = '...', 'topic' = '...' 'key.format' = 'json', 'value.format' = 'json', 'properties.group.id' = '...' )

1. 支持inner join, left join

2. 右流版本表既要定义为事件时间(水位线)也要定义主键;左流需要定义为事件时间(水位线)。

其实,版本表的特点是可以追溯历史版本,所以,时间和主键是必须要同时具备的。

3. 关联等式条件必须有维表的主键,但是可以加入其它辅助条件,例如,

on left_upsert.id = right_upsert.id and left_upsert.id '2'

4. 水位线起到一个触发写出的作用,在写出之前,左右流的元素在缓存中join。

例如,测试数据及 join sql, 程序 join 结果如下,

Left: key value produce seq {"id":"1"} {"id":"1","op_ts":"1970-01-03 00:00:00"} 1 --- watermark {"id":"2"} {"id":"2","op_ts":"1970-01-01 01:00:00"} 3 {"id":"3"} {"id":"3","op_ts":"1970-01-04 00:00:00"} 6 --- watermark Right: key value produce seq {"id":"1"} {"id":"1","op_ts":"1970-01-03 00:00:00"} 2 --- watermark {"id":"2"} {"id":"2","op_ts":"1970-01-01 00:00:00"} 4 {"id":"2"} {"id":"2","op_ts":"1970-01-01 02:00:00"} 5 {"id":"3"} {"id":"3","op_ts":"1970-01-04 00:00:00"} 7 --- watermark join sql: select * from left_upsert as l left join right_upsert for system_time as of l.op_ts as r on l.id = r.id 结果: +----+------+-------------------+-------+-------------------+ | op | id | op_ts | id0 | op_ts0 | +----+------+-------------------+-------+-------------------+ | +I | 1 | 1970-01-03T00:00 | 1 | 1970-01-03T00:00 | | +I | 2 | 1970-01-01T01:00 | 2 | 1970-01-01T00:00 | | +I | 3 | 1970-01-04T00:00 | 3 | 1970-01-04T00:00 |

1和2消息将水位线提升到先生产1970-01-03 00:00:00,会触发join写出

| +I | 1 | 1970-01-03T00:00 | 1 | 1970-01-03T00:00 |

紧接着按照测试数据的produce seq顺序发出测试数据,当在6和7测试数据发出后,又触发一次写出:

| +I | 2 | 1970-01-01T01:00 | 2 | 1970-01-01T00:00 | | +I | 3 | 1970-01-04T00:00 | 3 | 1970-01-04T00:00 |

此时,会将内存中缓存的以下join结果也写出,

| +I | 2 | 1970-01-01T01:00 | 2 | 1970-01-01T00:00 |

并且可以看到join的时间版本也符合之前的规则。

5. 左流元素才会触发join的作用,join的结果只会看到从左流探针侧触发的join。

例如,测试数据及 join sql, 程序 join 结果如下,

Left: key value produce seq {"id":"1"} {"id":"1","op_ts":"1970-01-03 00:00:00"} 1 --- watermark {"id":"2"} {"id":"2","op_ts":"1970-01-01 01:00:00"} 3 {"id":"3"} {"id":"3","op_ts":"1970-01-04 00:00:00"} 7 --- watermark Right: key value produce seq {"id":"1"} {"id":"1","op_ts":"1970-01-03 00:00:00"} 2 --- watermark {"id":"2"} {"id":"2","op_ts":"1970-01-01 00:00:00"} 4 {"id":"2"} {"id":"2","op_ts":"1970-01-01 02:00:00"} 5 {"id":"2"} null 6 {"id":"3"} {"id":"3","op_ts":"1970-01-04 00:00:00"} 8 --- watermark join sql: select * from left_upsert as l left join right_upsert for system_time as of l.op_ts as r on l.id = r.id 结果: +----+------+-------------------+------+-------------------+ | op | id | op_ts | id0 | op_ts0 | +----+------+-------------------+------+-------------------+ | +I | 1 | 1970-01-03T00:00 | 1 | 1970-01-03T00:00 | | +I | 2 | 1970-01-01T01:00 | 2 | 1970-01-01T00:00 | | +I | 3 | 1970-01-04T00:00 | 3 | 1970-01-04T00:00 |

在produce seq为6的数据发出之前,内存中左流的id=2的元素与右流的id=2的1970-01-01 00:00:00版本join,当右流{"id":"2"} null发出后,语义上理解{"id":"2"} {"id":"2","op_ts":"1970-01-01 02:00:00"}这条数据应该被撤回,但是从join结果看,并不是这样的:

| +I | 2 | 1970-01-01T01:00 | 2 | 1970-01-01T00:00 |

说明,{"id":"2"} null并没有触发join结果的更新,这也说明了右流是不会触发join结果的更新的。

如果将上述左流测试数据{"id":"3"} {"id":"3","op_ts":"1970-01-04 00:00:00"}改为,

{"id":"2"} {"id":"2","op_ts":"1970-01-04 00:00:00"}

测试结果中,id=2的join结果变为,

+----+-----+------------------+--------+-------------------+ | op | id | op_ts | id0 | op_ts0 | +----+-----+------------------+--------+-------------------+ | +I | 1 | 1970-01-03T00:00 | 1 | 1970-01-03T00:00 | | +I | 2 | 1970-01-01T01:00 | 2 | 1970-01-01T00:00 | | -U | 2 | 1970-01-01T01:00 | 2 | 1970-01-01T00:00 | | +U | 2 | 1970-01-04T00:00 | (NULL) | (NULL) |

从以上这个输出也可看出,

6. 在缓存中的join结果没有merge,而是将每次触发join的结果依次输出。

7. 当触发写出后,在缓存中只保留元素最新版本,过期版本将删除。

总结

支持inner join, left join。 右流版本表既要定义为事件时间(水位线)也要定义主键;左流需要定义为事件时间(水位线)。 关联等式条件必须有维表的主键,但是可以加入其它辅助条件。 水位线起到一个触发写出的作用,在写出之前,左右流的元素在缓存中join。 左流元素才会触发join的作用,join的结果只会看到从左流探针侧触发的join。 在缓存中的join结果没有merge,而是将每次触发join的结果依次输出。当触发写出后,在缓存中只保留元素最新版本,过期版本将删除。

以上,可以看出Event Time Temporal Join的适用场景比较特殊,因为构建侧的维表的数据流必须是【缓慢变化维】,否则无法确join的合适的时间版本,并且水位线无法推进。

Processing Time Temporal Join

Processing Time Temporal Join用于和以处理时间作为时间属性的构建侧流表进行Join,这种维表通常我们用HBase、MySQL此类具有Lookup能力的表进行Join。

语法同Event Time Temporal Join,在此不做赘述。



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有