文章目录
本文基于flink-1.13.6
维表是数仓中的一个概念,维表中的维度属性是观察数据的角度,在建设离线数仓的时候,通常是将维表与事实表进行关联构建星型模型。在实时数仓中,同样也有维表与事实表的概念,其中事实表通常存储在kafka中,维表通常存储在外部设备中(比如MySQL,HBase)。对于每条流式数据,可以关联一个外部维表数据源,为实时计算提供数据关联查询。维表可能是会不断变化的,在维表JOIN时,需指明这条记录关联维表快照的时刻。需要注意是,目前Flink SQL的维表JOIN仅支持对当前时刻维表快照的关联(处理时间语义),而不支持事实表rowtime所对应的的维表快照(事件时间语义)
使用语法
SELECT column-names
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.proctime [AS <alias2>]
ON table1.column-name1 = table2.key-name1
注意:目前,仅支持INNER JOIN与LEFT JOIN。在join的时候需要使用 FOR SYSTEM_TIME AS OF ,其中table1.proctime表示table1的proctime处理时间属性(计算列)。使用FOR SYSTEM_TIME AS OF table1.proctime表示当左边表的记录与右边的维表join时,只匹配当前处理时间维表所对应的的快照数据。
样例
SELECT
o.amout, o.currency, r.rate, o.amount * r.rate
FROM
Orders AS o
JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
ON r.currency = o.currency
使用说明
仅支持Blink planner
仅支持SQL,目前不支持Table API
目前不支持基于事件时间(event time)的temporal table join
维表可能会不断变化,JOIN行为发生后,维表中的数据发生了变化(新增、更新或删除),则已关联的维表数据不会被同步变化
维表和维表不能进行JOIN
维表必须指定主键。维表JOIN时,ON的条件必须包含所有主键的等值条件
Kafka中有一份用户行为数据,包括pv,buy,cart,fav行为;MySQL中有一份省份区域的维表数据。现将两种表进行JOIN,统计每个区域的购买行为数量。
-- mysql
CREATE TABLE `dim_province` (
`province_id` bigint(20) DEFAULT NULL,
`province_name` varchar(50) DEFAULT NULL,
`region_name` varchar(50) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
insert into dim_province (province_id, province_name, region_name)
values
(1, "山东", "华东"),
(2, "广东", "华南"),
(3, "河南", "华中"),
(4, "北京", "华北"),
(5, "新疆", "西北");
-- flinksql 维度表
CREATE TABLE dim_province (
province_id BIGINT, -- 省份id
province_name VARCHAR, -- 省份名称
region_name VARCHAR -- 区域名称
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://chb1:3306/chb_test?useUnicode=true&characterEncoding=UTF-8',
'connector.table' = 'dim_province',
'connector.username' = 'root',
'connector.password' = '123456',
'connector.lookup.cache.max-rows' = '5000',
'connector.lookup.cache.ttl' = '10min'
);
注意: 加上useUnicode=true&characterEncoding=UTF-8,否则 flinksql 写到 mysql 产生乱码
事实表存储在kafka中,数据为用户点击行为,格式为csv,具体数据样例如下:
1,1002,10002,fav,2022-10-27 16:25:00,2
1,1004,10002,cart,2022-10-27 16:25:01,3
6,1004,10004,pv,2022-10-27 16:25:01,3
3,1002,10001,cart,2022-10-27 16:25:01,1
4,1001,10004,fav,2022-10-27 16:25:01,4
创建kafka数据源表,如下:
CREATE TABLE user_behavior (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
`ts` timestamp(3),
province_id INT, -- 用户所在的省份id
`proctime` as PROCTIME(), -- 处理时间列
WATERMARK FOR ts as ts - INTERVAL '5' SECOND -- 在ts上定义watermark,ts成为事件时间列
) WITH (
'connector' = 'kafka', -- 使用 kafka connector
'topic' = 'user_behavior', -- kafka topic
'scan.startup.mode' = 'latest-offset', -- 从起始 offset 开始读取
'properties.bootstrap.servers' = 'chb1:9092',
'properties.group.id' = 'testGroup',
'format' = 'csv'
);
-- mysql
CREATE TABLE top_region (
region_name varchar(50), -- 区域名称
buy_cnt BIGINT -- 销量
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- flinksql
CREATE TABLE region_sales_sink (
region_name STRING, -- 区域名称
buy_cnt BIGINT, -- 销量
proctime as PROCTIME()
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://chb1:3306/chb_test?useUnicode=true&characterEncoding=UTF-8',
'connector.table' = 'top_region', -- MySQL中的待插入数据的表
'connector.username' = 'root',
'connector.password' = '123456',
'connector.write.flush.interval' = '1s'
);
CREATE VIEW user_behavior_detail AS
SELECT
u.user_id,
u.item_id,
u.category_id,
u.behavior,
p.province_name,
p.region_name
FROM user_behavior AS u LEFT JOIN dim_province FOR SYSTEM_TIME AS OF u.proctime AS p
ON u.province_id = p.province_id;
-- 结果
INSERT INTO region_sales_sink
SELECT
region_name,
COUNT(*) buy_cnt
FROM user_behavior_detail
WHERE behavior = 'buy'
GROUP BY region_name;
参考:
Flink SQL— CREATE语句
Flink Temporal Join Versioned Table Demo
我需要做这样的事情classUser'User',:foreign_key=>'abuser_id'belongs_to:gameendclassGame['JOINabuse_reportsONusers.id=abuse_reports.abuser_id','JOINgamesONgames.id=abuse_reports.game_id'],:group=>'users.id',:select=>'users.*,count(distinctgames.id)ASgame_count,count(abuse_reports.id)asabuse_report_count',:
我在Ruby中遇到了一个关于Dir[]和File.join()的简单程序,blobs_dir='/path/to/dir'Dir[File.join(blobs_dir,"**","*")].eachdo|file|FileUtils.rm_rf(file)ifFile.symlink?(file)我有两个困惑:首先,File.join(@blobs_dir,"**","*")中的第二个和第三个参数是什么意思?其次,Dir[]在Ruby中有什么用?我只知道它等价于Dir.glob(),但是,我对Dir.glob()确实不是很清楚。 最佳答案
这是一个有点微观的问题,但每次我创建一个gem并需要加载子目录下的所有文件以用于某种反射目的(或只是一个快速而肮脏的预加载)时,我问自己“肯定有更清洁的方法吗?”,引用这种常见模式:Dir[File.join(File.dirname(__FILE__),"subdirectory/**/*.rb")].each{|f|requiref}需要在__FILE__上调用File.dirname,这使得它不必要地冗长。你不能真正在gem中使用相对路径,因为你不知道你是从哪里加载的。 最佳答案 你用的是哪种ruby?在ruby1.9中,
我正在尝试构建一个seeds.rb文件以将初始管理员用户添加到数据库中。我有一个用户表和模型,以及一个角色表和模型。我有一个连接表,roles_users来加入用户角色和权限。这是架构:create_table"roles",:force=>truedo|t|t.string"name"t.datetime"created_at"t.datetime"updated_at"endcreate_table"roles_users",:id=>false,:force=>truedo|t|t.integer"role_id"t.integer"user_id"endcreate_table
我正在尝试将Rails3.0应用程序升级到Rails4.0。我注意到的行为之一是模型之间的关系停止工作。假设我们有以下模型:classStudent:teacher_students,:select=>'teacher_students.met_with_parent,teachers.*'#TheRails4syntaxhas_many:teachers,->{select('teacher_students.met_with_parent,teachers.*')},:through=>:teacher_studentsendclassTeacher:teacher_student
可以不调用Thread#join吗?在这种情况下,我不关心线程是否爆炸-我只希望Unicorn继续处理。classMyMiddlewaredefinitialize(app)@app=appenddefcall(env)t=Thread.new{sleep1}t.join#isitokifIskipthis?@app.callenvendend我会得到“僵尸线程”或类似的东西吗? 最佳答案 不调用join完全没问题-事实上,多线程代码通常根本不需要join。如果您需要阻塞直到新线程完成,您应该只调用join。您不会得到“僵尸”线程。
我有一组唯一编号。像这样:[1,2,3,4,7,8,10,12]。它可以是未排序的。我需要的是获取此数组的间隔:intervals_for[1,2,3,4,7,8,10,12]#=>"1-4,7-8,10,12"我有自己的解决方案:defintervals_for(array)array.sort!new_array=[]array.eachdo|a|ifnew_array.lastanda==new_array.last.last+1new_array.last1?"#{a.first}-#{a.last}":a.first}.join(",")end但我认为这里有更干净的解决方案
我在Ruby中从事多线程工作。代码片段是:threads_array=Array.new(num_of_threads)1.upto(num_of_threads)do|i|Thread.abort_on_exception=truethreads_array[i-1]=Thread.new{catch(:exit)doprint"s#{i}"user_id=nilloopdouser_id=user_ids.pop()ifuser_id==nilprint"a#{i}"Thread.stop()enddosomething(user_id)endend}end#puts"aftert
**为更好理解而编辑****编辑重命名为文档**我想获取给定用户的所有带有投票记录的用户文档记录,如果该用户未对某些文档记录进行投票,我仍然希望获取所有没有投票的文档记录。例如:+------------+--------------+------+|document.par1|document.par2|vote|+------------+--------------+------+|2|z|y||3|w|NULL||4|x|NULL|+------------+--------------+------+如果我尝试在RubyonRails上:我。第一次尝试:Document.jo
下面是评论和用户之间的关系。每个评论都有一个用户,所以我在下面的代码中构建了一个连接。我想知道如何构建此代码以仅在连接中包含特定列。我不需要所有的用户信息。只是名字。任何建议。当前代码:@comments=Comment.where(:study_id=>@study.id).joins(:user) 最佳答案 你可以使用这样的东西:@comments=Comment.joins(:user).select("comments.*,users.first_name").where(study_id:@study.id)