草庐IT

Flink SQL --维表join

宝哥大数据 2023-04-05 原文

文章目录


本文基于flink-1.13.6

一、维表 join 介绍

维表是数仓中的一个概念,维表中的维度属性是观察数据的角度,在建设离线数仓的时候,通常是将维表与事实表进行关联构建星型模型。在实时数仓中,同样也有维表与事实表的概念,其中事实表通常存储在kafka中,维表通常存储在外部设备中(比如MySQL,HBase)。对于每条流式数据,可以关联一个外部维表数据源,为实时计算提供数据关联查询。维表可能是会不断变化的,在维表JOIN时,需指明这条记录关联维表快照的时刻。需要注意是,目前Flink SQL的维表JOIN仅支持对当前时刻维表快照的关联(处理时间语义),而不支持事实表rowtime所对应的的维表快照(事件时间语义)

二、Temporal Table Join

使用语法

SELECT column-names
FROM table1  [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.proctime [AS <alias2>]
ON table1.column-name1 = table2.key-name1

注意:目前,仅支持INNER JOIN与LEFT JOIN。在join的时候需要使用 FOR SYSTEM_TIME AS OF ,其中table1.proctime表示table1的proctime处理时间属性(计算列)。使用FOR SYSTEM_TIME AS OF table1.proctime表示当左边表的记录与右边的维表join时,只匹配当前处理时间维表所对应的的快照数据。

样例

SELECT
  o.amout, o.currency, r.rate, o.amount * r.rate
FROM
  Orders AS o
  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
  ON r.currency = o.currency

使用说明
仅支持Blink planner
仅支持SQL,目前不支持Table API
目前不支持基于事件时间(event time)的temporal table join
维表可能会不断变化,JOIN行为发生后,维表中的数据发生了变化(新增、更新或删除),则已关联的维表数据不会被同步变化
维表和维表不能进行JOIN
维表必须指定主键。维表JOIN时,ON的条件必须包含所有主键的等值条件

三、维表Join案例

3.1、背景

Kafka中有一份用户行为数据,包括pv,buy,cart,fav行为;MySQL中有一份省份区域的维表数据。现将两种表进行JOIN,统计每个区域的购买行为数量。

3.2、实践

3.2.1、维表存储在MySQL中

-- mysql
CREATE TABLE `dim_province` (
  `province_id` bigint(20) DEFAULT NULL,
  `province_name` varchar(50) DEFAULT NULL,
  `region_name` varchar(50) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


insert into dim_province (province_id, province_name, region_name) 
values 
(1, "山东", "华东"),
(2, "广东", "华南"),
(3, "河南", "华中"),
(4, "北京", "华北"),
(5, "新疆", "西北");

-- flinksql  维度表
CREATE TABLE dim_province (
    province_id BIGINT,  -- 省份id
    province_name  VARCHAR, -- 省份名称
 region_name VARCHAR -- 区域名称
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://chb1:3306/chb_test?useUnicode=true&characterEncoding=UTF-8',
    'connector.table' = 'dim_province',
    'connector.username' = 'root',
    'connector.password' = '123456',
    'connector.lookup.cache.max-rows' = '5000',
    'connector.lookup.cache.ttl' = '10min'
);

注意: 加上useUnicode=true&characterEncoding=UTF-8,否则 flinksql 写到 mysql 产生乱码

3.2.2、事实数据存在 kafka

事实表存储在kafka中,数据为用户点击行为,格式为csv,具体数据样例如下:

1,1002,10002,fav,2022-10-27 16:25:00,2
1,1004,10002,cart,2022-10-27 16:25:01,3
6,1004,10004,pv,2022-10-27 16:25:01,3
3,1002,10001,cart,2022-10-27 16:25:01,1
4,1001,10004,fav,2022-10-27 16:25:01,4

创建kafka数据源表,如下:

CREATE TABLE user_behavior (
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    `ts` timestamp(3),
	 province_id INT,   -- 用户所在的省份id
	`proctime` as PROCTIME(),   -- 处理时间列
    WATERMARK FOR ts as ts - INTERVAL '5' SECOND  -- 在ts上定义watermark,ts成为事件时间列
) WITH (
    'connector' = 'kafka', -- 使用 kafka connector
    'topic' = 'user_behavior',  -- kafka topic
    'scan.startup.mode' = 'latest-offset', -- 从起始 offset 开始读取
	'properties.bootstrap.servers' = 'chb1:9092',
	'properties.group.id' = 'testGroup',
	'format' = 'csv'
);

3.2.3、创建MySQL的结果表,表示区域销量

-- mysql
 CREATE TABLE top_region (
    region_name varchar(50),  -- 区域名称
    buy_cnt BIGINT  -- 销量
)ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- flinksql
CREATE TABLE region_sales_sink (
    region_name STRING,  -- 区域名称
    buy_cnt BIGINT,  -- 销量
	proctime as PROCTIME()
) WITH (

    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://chb1:3306/chb_test?useUnicode=true&characterEncoding=UTF-8',
    'connector.table' = 'top_region', -- MySQL中的待插入数据的表
    'connector.username' = 'root',
    'connector.password' = '123456',
    'connector.write.flush.interval' = '1s'
);

3.2.4、用户行为数据与省份维表数据 join

CREATE VIEW user_behavior_detail AS
SELECT
  u.user_id, 
  u.item_id,
  u.category_id,
  u.behavior,  
  p.province_name,
  p.region_name
FROM user_behavior AS u LEFT JOIN dim_province FOR SYSTEM_TIME AS OF u.proctime AS p
ON u.province_id = p.province_id;

3.2.5、计算区域的销量,并将计算结果写入MySQL

-- 结果
INSERT INTO region_sales_sink
SELECT 
  region_name,
  COUNT(*) buy_cnt
FROM user_behavior_detail
WHERE behavior = 'buy'
GROUP BY region_name;

参考:
Flink SQL— CREATE语句
Flink Temporal Join Versioned Table Demo

有关Flink SQL --维表join的更多相关文章

  1. ruby-on-rails - 是否可以让 ActiveRecord 为使用 :joins option? 加载的行创建对象 - 2

    我需要做这样的事情classUser'User',:foreign_key=>'abuser_id'belongs_to:gameendclassGame['JOINabuse_reportsONusers.id=abuse_reports.abuser_id','JOINgamesONgames.id=abuse_reports.game_id'],:group=>'users.id',:select=>'users.*,count(distinctgames.id)ASgame_count,count(abuse_reports.id)asabuse_report_count',:

  2. ruby - 关于 Ruby 中 Dir[] 和 File.join() 的混淆 - 2

    我在Ruby中遇到了一个关于Dir[]和File.join()的简单程序,blobs_dir='/path/to/dir'Dir[File.join(blobs_dir,"**","*")].eachdo|file|FileUtils.rm_rf(file)ifFile.symlink?(file)我有两个困惑:首先,File.join(@blobs_dir,"**","*")中的第二个和第三个参数是什么意思?其次,Dir[]在Ruby中有什么用?我只知道它等价于Dir.glob(),但是,我对Dir.glob()确实不是很清楚。 最佳答案

  3. ruby - Dir[File.join(File.dirname(__FILE__), "subdirectory/**/*.rb")] 的较短版本? - 2

    这是一个有点微观的问题,但每次我创建一个gem并需要加载子目录下的所有文件以用于某种反射目的(或只是一个快速而肮脏的预加载)时,我问自己“肯定有更清洁的方法吗?”,引用这种常见模式:Dir[File.join(File.dirname(__FILE__),"subdirectory/**/*.rb")].each{|f|requiref}需要在__FILE__上调用File.dirname,这使得它不必要地冗长。你不能真正在gem中使用相对路径,因为你不知道你是从哪里加载的。 最佳答案 你用的是哪种ruby?在ruby​​1.9中,

  4. ruby-on-rails - 如何将 JOIN 信息添加到 rails seeds.rb 文件中? - 2

    我正在尝试构建一个seeds.rb文件以将初始管理员用户添加到数据库中。我有一个用户表和模型,以及一个角色表和模型。我有一个连接表,roles_users来加入用户角色和权限。这是架构:create_table"roles",:force=>truedo|t|t.string"name"t.datetime"created_at"t.datetime"updated_at"endcreate_table"roles_users",:id=>false,:force=>truedo|t|t.integer"role_id"t.integer"user_id"endcreate_table

  5. ruby-on-rails - Rails 4 Has_many :through join association with select - 2

    我正在尝试将Rails3.0应用程序升级到Rails4.0。我注意到的行为之一是模型之间的关系停止工作。假设我们有以下模型:classStudent:teacher_students,:select=>'teacher_students.met_with_parent,teachers.*'#TheRails4syntaxhas_many:teachers,->{select('teacher_students.met_with_parent,teachers.*')},:through=>:teacher_studentsendclassTeacher:teacher_student

  6. ruby - 不调用 Thread#join 可以吗? - 2

    可以不调用Thread#join吗?在这种情况下,我不关心线程是否爆炸-我只希望Unicorn继续处理。classMyMiddlewaredefinitialize(app)@app=appenddefcall(env)t=Thread.new{sleep1}t.join#isitokifIskipthis?@app.callenvendend我会得到“僵尸线程”或类似的东西吗? 最佳答案 不调用join完全没问题-事实上,多线程代码通常根本不需要join。如果您需要阻塞直到新线程完成,您应该只调用join。您不会得到“僵尸”线程。

  7. ruby 任务 : joining numbers to intervals - 2

    我有一组唯一编号。像这样:[1,2,3,4,7,8,10,12]。它可以是未排序的。我需要的是获取此数组的间隔:intervals_for[1,2,3,4,7,8,10,12]#=>"1-4,7-8,10,12"我有自己的解决方案:defintervals_for(array)array.sort!new_array=[]array.eachdo|a|ifnew_array.lastanda==new_array.last.last+1new_array.last1?"#{a.first}-#{a.last}":a.first}.join(",")end但我认为这里有更干净的解决方案

  8. ruby - 如何修复 Ruby 中 join() 中的死锁 - 2

    我在Ruby中从事多线程工作。代码片段是:threads_array=Array.new(num_of_threads)1.upto(num_of_threads)do|i|Thread.abort_on_exception=truethreads_array[i-1]=Thread.new{catch(:exit)doprint"s#{i}"user_id=nilloopdouser_id=user_ids.pop()ifuser_id==nilprint"a#{i}"Thread.stop()enddosomething(user_id)endend}end#puts"aftert

  9. ruby-on-rails - 如何将参数传递给 LEFT JOIN? - 2

    **为更好理解而编辑****编辑重命名为文档**我想获取给定用户的所有带有投票记录的用户文档记录,如果该用户未对某些文档记录进行投票,我仍然希望获取所有没有投票的文档记录。例如:+------------+--------------+------+|document.par1|document.par2|vote|+------------+--------------+------+|2|z|y||3|w|NULL||4|x|NULL|+------------+--------------+------+如果我尝试在RubyonRails上:我。第一次尝试:Document.jo

  10. ruby-on-rails - Rails 3 Join——只选择某些列 - 2

    下面是评论和用户之间的关系。每个评论都有一个用户,所以我在下面的代码中构建了一个连接。我想知道如何构建此代码以仅在连接中包含特定列。我不需要所有的用户信息。只是名字。任何建议。当前代码:@comments=Comment.where(:study_id=>@study.id).joins(:user) 最佳答案 你可以使用这样的东西:@comments=Comment.joins(:user).select("comments.*,users.first_name").where(study_id:@study.id)

随机推荐