草庐IT

【flink】 各种join类型对比

矛始 2023-05-30 原文

表定义

  • 动态表(dynamic table):动态表是流的另一种表达方式,动态表作为一个逻辑的抽象概念,使我们更容易理解flink中将streaming发展到table这个层次的设计,本质都是对无边界、持续变更数据的表示形式,所以动态表与流之间可以相互转换。

  • 版本表(dynamic table):动态表之上的定义,版本是一个拥有主键和时间属性的动态表(建表语句必需包含PRIMARY KEY和WATERMARK),所以版本表可以追踪每个key在某时间点/时间内的变化情况。版本表可以直接从changelog格式的source创建,或者基于append-only的源创建版本视图。

  • 时态表(temporal table):时态表是随着时间变化而变化的,也就是动态表,时态表包含一个或多个版本表的快照;当它能追踪所有记录的历史变更(来自数据库的changelog)时,就是个版本表;如果它只能表示所有记录经过物化后的最新快照(直接一个数据库表),那就是个普通表。

Regular Joins

Regular Joins是flink这么多join类型中最普通的,任意一侧的数据流有变更或者新增,都会影响到join结果。Regular joins是通过把双流的输入数据都保存在flink的状态中,存在state过度膨胀的隐患,所以我们在使用时要合理设置table状态的TTL(table.exec.state.ttl),这要结合具体的业务场景,否则会影响join结果的正确性。

有两种join类型,内连接(INNER Equi-JOIN)和外连接(OUTER Equi-JOIN),两者都只支持等值连接,且至少一个连接条件。

Interval Joins

Interval join要求至少有一个等值谓词连接和一个时间约束条件,这个时间属性定义了流的时间范围,且作为WATERMARK

ltime = rtime
ltime >= rtime AND ltime < rtime + INTERVAL ‘10’ MINUTE
ltime BETWEEN rtime - INTERVAL ‘10’ SECOND AND rtime + INTERVAL ‘5’ SECOND

  • 与Regular join一样是双流,但是它加上了时间区间的概念,可以清理状态中较旧的数据,而不会影响join结果的正确性
  • 通过InnerJoin算子实现,水位线来控制join的数据区间以及清理数据,所以两个输入流都要定义WATERMARK,否则会变回Regular join
  • WATERMARK可以定义为event-time或process-time
  • 只支持append-only的输入流,当尝试使用cdc作为输入源(Retract)时出报错
Exception in thread "main" org.apache.flink.table.api.TableException: StreamPhysicalIntervalJoin doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[ tb_order]], fields=[order_id, price, currency, order_time])

Temporal Joins

与时态表的join,通过上述时态表的描述可得知,可以关联得到记录的历史版本或只能得到最新版本,flink sql遵循SQL:2011的标准语法

    SELECT [column_list]
    FROM table1 [AS <alias1>]
    [LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS <alias2>]
    ON table1.column-name1 = table2.column-name1

从使用形式划分,可以分为3种:Event Time Temporal Join、Processing Time Temporal Join、Temporal Table Function Join

Event Time Temporal Join

Event Time temporal join的是一个版本表,意味着可以根据主表的事件时间关联到当时维表的具体版本。

Temporal table join currently only supports ‘FOR SYSTEM_TIME AS OF’ left table’s time attribute field
Event-Time Temporal Table Join requires both primary key and row time attribute in versioned table, but no row time attribute can be found.
Temporal table’s primary key [currency] must be included in the equivalence condition of temporal join

  • 左表必需定义事件时间,右表除了定义事件时间外,还需要定义主键(即版本表)
  • 水位线替触发join,所以两侧流都需要设置正常的水位线
  • 右表的主键必需作为等值谓词连接
  • 与regular join相比,结果不会受到右侧流影响,即输出结果以左流为主,右流只是补充了左流的信息,效果与left join相似
  • 与interval join相比,不需要定义时间窗口,即可以关联到更久之前的维度,因为版本表会保存全量维度最新版本以及上一个水位线之后的变更
  • 一个水位线之前的版本数据将会被清理,因为随着水位线的推移,这些数据将不会再被用到

Processing Time Temporal Join

Processing-time temporal join is not supported yet

从flink 1.14开始已经不再支持这种方式,可以使用temporal table function语法替换

右表定义process-time属性,总是能关联到最新的右表记录,其实这和lookup join差不多,只不过右表的记录是以HashMap全部保存着最新的版本在状态中。这种方式的强大之处在于,可以直接对接不能变成flink动态表的外部表(例如hbase)

  • 与regular joins相比,右表的数据变化不会影响到之前已经join出的结果
  • 与interval joins相比,不需要定义时间窗口,且所有旧版本数据都不会保存在状态中

Temporal Table Function Join

Join key must be the same as temporal table’s primary key

    create table if not exists tb_order (
    `order_id` int,
    `price` int,
    `currency` string,
    `order_time` timestamp(3),
    `proc_time` AS PROCTIME(),
    primary key(order_id) not enforced)
    WITH (
    'connector'='mysql-cdc',
    'table-name'='tb_order',
    )
    create table if not exists tb_currency (
    `currency` string,
    `rate` int,
    `update_time` timestamp(3),
    `proc_time` AS PROCTIME(),
    )
    WITH (
    'connector'='mysql-cdc',
    'table-name'='tb_currency',
    )
    TemporalTableFunction rate = tEnv.from("tb_currency").createTemporalTableFunction("proc_time", "currency");
    tEnv.createTemporarySystemFunction("rate",rate);
    select * from tb_order o , LATERAL TABLE(rate(o.order_time)) c where o.currency=c.currency

上面例子实现了process time temporal join,两建表语句都不指定事件时间,且tb_currency无需指定primary key(即非版本表),但是在定义TemporalTableFunction可以指定任意字段为主键,所以如果建表语句指定了事件时间,且TemporalTableFunction也使用事件时间,那么相当于间接创建了版本表。

  • 先要定义table funtion,指定一个时间属性(event-time或process-time)和主键
  • TemporalTableFunction定义的主键必须作为等值谓词连接
  • 除了可以和版本表join,还能和普通的表示最新版本的表/视图join,即它包含了event-time/processing-time temporal join两种

Lookup Join

通过查询外部存储系统的数据以丰富流的属性,这种join方式要求流表必须有一个processing time属性,外部数据表的connector要实现LookupTableSource接口

    CREATE TEMPORARY TABLE Customers (
      id INT,
      name STRING,
      country STRING,
      zip STRING
    ) WITH (
      'connector' = 'jdbc',
      'table-name' = 'customers'
    );

    SELECT o.order_id, o.total, c.country, c.zip
    FROM Orders AS o
      JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
        ON o.customer_id = c.id;
  • 外部表的数据变化不会影响到已经join出的结果
  • 上面所有join都是双流,而lookup join是单流

有关【flink】 各种join类型对比的更多相关文章

  1. ruby - Infinity 和 NaN 的类型是什么? - 2

    我可以得到Infinity和NaNn=9.0/0#=>Infinityn.class#=>Floatm=0/0.0#=>NaNm.class#=>Float但是当我想直接访问Infinity或NaN时:Infinity#=>uninitializedconstantInfinity(NameError)NaN#=>uninitializedconstantNaN(NameError)什么是Infinity和NaN?它们是对象、关键字还是其他东西? 最佳答案 您看到打印为Infinity和NaN的只是Float类的两个特殊实例的字符串

  2. ruby - 检查方法参数的类型 - 2

    我不确定传递给方法的对象的类型是否正确。我可能会将一个字符串传递给一个只能处理整数的函数。某种运行时保证怎么样?我看不到比以下更好的选择:defsomeFixNumMangler(input)raise"wrongtype:integerrequired"unlessinput.class==FixNumother_stuffend有更好的选择吗? 最佳答案 使用Kernel#Integer在使用之前转换输入的方法。当无法以任何合理的方式将输入转换为整数时,它将引发ArgumentError。defmy_method(number)

  3. ruby - Ruby 有 `Pair` 数据类型吗? - 2

    有时我需要处理键/值数据。我不喜欢使用数组,因为它们在大小上没有限制(很容易不小心添加超过2个项目,而且您最终需要稍后验证大小)。此外,0和1的索引变成了魔数(MagicNumber),并且在传达含义方面做得很差(“当我说0时,我的意思是head...”)。散列也不合适,因为可能会不小心添加额外的条目。我写了下面的类来解决这个问题:classPairattr_accessor:head,:taildefinitialize(h,t)@head,@tail=h,tendend它工作得很好并且解决了问题,但我很想知道:Ruby标准库是否已经带有这样一个类? 最佳

  4. ruby - 查找字符串中的内容类型(数字、日期、时间、字符串等) - 2

    我正在尝试解析一个CSV文件并使用SQL命令自动为其创建一个表。CSV中的第一行给出了列标题。但我需要推断每个列的类型。Ruby中是否有任何函数可以找到每个字段中内容的类型。例如,CSV行:"12012","Test","1233.22","12:21:22","10/10/2009"应该产生像这样的类型['integer','string','float','time','date']谢谢! 最佳答案 require'time'defto_something(str)if(num=Integer(str)rescueFloat(s

  5. ruby-on-rails - 在 Rails 开发环境中为 .ogv 文件设置 Mime 类型 - 2

    我正在玩HTML5视频并且在ERB中有以下片段:mp4视频从在我的开发环境中运行的服务器很好地流式传输到chrome。然而firefox显示带有海报图像的视频播放器,但带有一个大X。问题似乎是mongrel不确定ogv扩展的mime类型,并且只返回text/plain,如curl所示:$curl-Ihttp://0.0.0.0:3000/pr6.ogvHTTP/1.1200OKConnection:closeDate:Mon,19Apr201012:33:50GMTLast-Modified:Sun,18Apr201012:46:07GMTContent-Type:text/plain

  6. ruby - 是否有用于序列化和反序列化各种格式的对象层次结构的模式? - 2

    给定一个复杂的对象层次结构,幸运的是它不包含循环引用,我如何实现支持各种格式的序列化?我不是来讨论实际实现的。相反,我正在寻找可能会派上用场的设计模式提示。更准确地说:我正在使用Ruby,我想解析XML和JSON数据以构建复杂的对象层次结构。此外,应该可以将该层次结构序列化为JSON、XML和可能的HTML。我可以为此使用Builder模式吗?在任何提到的情况下,我都有某种结构化数据-无论是在内存中还是文本中-我想用它来构建其他东西。我认为将序列化逻辑与实际业务逻辑分开会很好,这样我以后就可以轻松支持多种XML格式。 最佳答案 我最

  7. ruby-on-rails - Rails 迁移中的 PostgreSQL 点类型 - 2

    我想使用PostgreSQL中的point类型。我已经完成了:railsgmodelTestpoint:point最终的迁移是:classCreateTests当我运行时:rakedb:migrate结果是:==CreateTests:migrating====================================================--create_table(:tests)rakeaborted!Anerrorhasoccurred,thisandalllatermigrationscanceled:undefinedmethod`point'for#/hom

  8. ruby-on-rails - 我可以用鸭子类型(duck typing)改进这种方法吗? - 2

    希望我没有误解“ducktyping”的含义,但从我读到的内容来看,这意味着我应该根据对象如何响应方法而不是它是什么类型/类来编写代码。代码如下:defconvert_hash(hash)ifhash.keys.all?{|k|k.is_a?(Integer)}returnhashelsifhash.keys.all?{|k|k.is_a?(Property)}new_hash={}hash.each_pair{|k,v|new_hash[k.id]=v}returnnew_hashelseraise"CustomattributekeysshouldbeID'sorPropertyo

  9. ruby-on-rails - 是否可以让 ActiveRecord 为使用 :joins option? 加载的行创建对象 - 2

    我需要做这样的事情classUser'User',:foreign_key=>'abuser_id'belongs_to:gameendclassGame['JOINabuse_reportsONusers.id=abuse_reports.abuser_id','JOINgamesONgames.id=abuse_reports.game_id'],:group=>'users.id',:select=>'users.*,count(distinctgames.id)ASgame_count,count(abuse_reports.id)asabuse_report_count',:

  10. ruby - 关于 Ruby 中 Dir[] 和 File.join() 的混淆 - 2

    我在Ruby中遇到了一个关于Dir[]和File.join()的简单程序,blobs_dir='/path/to/dir'Dir[File.join(blobs_dir,"**","*")].eachdo|file|FileUtils.rm_rf(file)ifFile.symlink?(file)我有两个困惑:首先,File.join(@blobs_dir,"**","*")中的第二个和第三个参数是什么意思?其次,Dir[]在Ruby中有什么用?我只知道它等价于Dir.glob(),但是,我对Dir.glob()确实不是很清楚。 最佳答案

随机推荐