前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站:https://www.captainai.net/dongkelun
由于项目上主要用Hive查询Hudi,所以之前总结过一篇:Hive增量查询Hudi表。最近可能会有Spark SQL增量查询Hudi表的需求,并且我发现目前用纯Spark SQL的形式还不能直接增量查询Hudi表,于是进行学习总结一下。
先看一下官方文档上Spark SQL增量查询的方式,地址:https://hudi.apache.org/cn/docs/quick-start-guide#incremental-query 和 https://hudi.apache.org/cn/docs/querying_data#incremental-query
它是先通过spark.read中添加增量参数的形式读Hudi表为DF,然后将DF注册成临时表,最后通过Spark SQL查询临时表的形式,实现增量查询的。
import org.apache.hudi.DataSourceReadOptions.{BEGIN_INSTANTTIME, END_INSTANTTIME, INCR_PATH_GLOB, QUERY_TYPE, QUERY_TYPE_INCREMENTAL_OPT_VAL}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier
val tableName = "test_hudi_incremental"
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long,
| dt string
|) using hudi
| partitioned by (dt)
| options (
| primaryKey = 'id',
| preCombineField = 'ts',
| type = 'cow'
| )
|""".stripMargin)
spark.sql(s"insert into $tableName values (1,'hudi',10,100,'2022-11-25')")
spark.sql(s"insert into $tableName values (2,'hudi',10,100,'2022-11-25')")
spark.sql(s"insert into $tableName values (3,'hudi',10,100,'2022-11-26')")
spark.sql(s"insert into $tableName values (4,'hudi',10,100,'2022-12-26')")
spark.sql(s"insert into $tableName values (5,'hudi',10,100,'2022-12-27')")
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
val basePath = table.storage.properties("path")
// incrementally query data
val incrementalDF = spark.read.format("hudi").
option(QUERY_TYPE.key, QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(BEGIN_INSTANTTIME.key, beginTime).
option(END_INSTANTTIME.key, endTime).
option(INCR_PATH_GLOB.key, "/dt=2022-11*/*").
load(basePath)
// table(tableName)
incrementalDF.createOrReplaceTempView(s"temp_$tableName")
spark.sql(s"select * from temp_$tableName").show()
spark.stop()
结果:
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name| id|name|price| ts| dt|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
| 20221126165954300|20221126165954300...| id:1| dt=2022-11-25|de99b299-b9de-423...| 1|hudi| 10.0|100|2022-11-25|
| 20221126170009762|20221126170009762...| id:2| dt=2022-11-25|de99b299-b9de-423...| 2|hudi| 10.0|100|2022-11-25|
| 20221126170030470|20221126170030470...| id:5| dt=2022-12-27|75f8a760-9dc3-452...| 5|hudi| 10.0|100|2022-12-27|
| 20221126170023240|20221126170023240...| id:4| dt=2022-12-26|4751225d-4848-4dd...| 4|hudi| 10.0|100|2022-12-26|
| 20221126170017119|20221126170017119...| id:3| dt=2022-11-26|2272e513-5516-43f...| 3|hudi| 10.0|100|2022-11-26|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
+-----------------+
| commit_time|
+-----------------+
|20221126170030470|
|20221126170023240|
|20221126170017119|
|20221126170009762|
|20221126165954300|
+-----------------+
20221126170009762
20221126170023240
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name| id|name|price| ts| dt|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
| 20221126170017119|20221126170017119...| id:3| dt=2022-11-26|2272e513-5516-43f...| 3|hudi| 10.0|100|2022-11-26|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
注释掉INCR_PATH_GLOB,结果:
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name| id|name|price| ts| dt|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
| 20221127155346067|20221127155346067...| id:4| dt=2022-12-26|33e7a2ed-ea28-428...| 4|hudi| 10.0|100|2022-12-26|
| 20221127155339981|20221127155339981...| id:3| dt=2022-11-26|a5652ae0-942a-425...| 3|hudi| 10.0|100|2022-11-26|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
继续注释掉END_INSTANTTIME,结果:
20221127161253433
20221127161311831
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name| id|name|price| ts| dt|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
| 20221127161320347|20221127161320347...| id:5| dt=2022-12-27|7b389e57-ca44-4aa...| 5|hudi| 10.0|100|2022-12-27|
| 20221127161311831|20221127161311831...| id:4| dt=2022-12-26|2707ce02-548a-422...| 4|hudi| 10.0|100|2022-12-26|
| 20221127161304742|20221127161304742...| id:3| dt=2022-11-26|264bc4a9-930d-4ec...| 3|hudi| 10.0|100|2022-11-26|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
可以看到不包含起始时间,包含结束时间。
一般项目上都采用纯SQL方式进行增量查询,这样比较方便,纯SQL的方式参数和上面讲的参数是一样的,接下来看一下怎么用纯SQL方式实现
create table hudi.test_hudi_incremental (
id int,
name string,
price double,
ts long,
dt string
) using hudi
partitioned by (dt)
options (
primaryKey = 'id',
preCombineField = 'ts',
type = 'cow'
);
insert into hudi.test_hudi_incremental values (1,'a1', 10, 1000, '2022-11-25');
insert into hudi.test_hudi_incremental values (2,'a2', 20, 2000, '2022-11-25');
insert into hudi.test_hudi_incremental values (3,'a3', 30, 3000, '2022-11-26');
insert into hudi.test_hudi_incremental values (4,'a4', 40, 4000, '2022-12-26');
insert into hudi.test_hudi_incremental values (5,'a5', 50, 5000, '2022-12-27');
看一下有哪些commit_time
select distinct(_hoodie_commit_time) from test_hudi_incremental order by _hoodie_commit_time
+----------------------+
| _hoodie_commit_time |
+----------------------+
| 20221130163618650 |
| 20221130163703640 |
| 20221130163720795 |
| 20221130163726780 |
| 20221130163823274 |
+----------------------+
使用Call Procedures:copy_to_temp_view、copy_to_table,目前这两个命令已经合到master,由scxwhite 苏乘祥贡献,这俩参数差不多,建议使用copy_to_temp_view,因为copy_to_table会先将数据落盘而copy_to_temp_view是创建的临时表,效率会高一点,且数据落盘无意义,后面还要将落盘的表删掉。
支持的参数
测试SQL:
call copy_to_temp_view(table => 'test_hudi_incremental', query_type => 'incremental',
view_name => 'temp_incremental', begin_instance_time=> '20221130163703640', end_instance_time => '20221130163726780');
select _hoodie_commit_time, id, name, price, ts, dt from temp_incremental;
结果:
+----------------------+-----+-------+--------+-------+-------------+
| _hoodie_commit_time | id | name | price | ts | dt |
+----------------------+-----+-------+--------+-------+-------------+
| 20221130163726780 | 4 | a4 | 40.0 | 4000 | 2022-12-26 |
| 20221130163720795 | 3 | a3 | 30.0 | 3000 | 2022-11-26 |
+----------------------+-----+-------+--------+-------+-------------+
可以看到这种方式是可以实现增量查询的,但是需要注意,如果需要修改增量查询的起始时间,那么就需要重复执行copy_to_temp_view,但是因为临时表temp_incremental已经存在,要么新起个表名,要么先删掉,再创建新的,我建议先删掉,通过下面的命令删除
drop view if exists temp_incremental;
PR地址:https://github.com/apache/hudi/pull/7182,这个PR同样由scxwhite贡献,目前只支持Spark3.2以上的版本(目前社区未合并)
增量查询SQL
select id, name, price, ts, dt from tableName
[
'hoodie.datasource.query.type'=>'incremental',
'hoodie.datasource.read.begin.instanttime'=>'$instant1',
'hoodie.datasource.read.end.instanttime'=>'$instant2'
]
这种方式,是支持了一种新的语法,在查询SQL后通过在[]添加参数的形式,感兴趣的话可以拉一下代码,自己打包试一下。
使用 Spark SQL Hint实现,具体实现方式,请查看KnightChess的这篇文章如何使用 Spark SQL Hint 对 Hudi 进行增量查询、时间旅行
最终的效果如下
select
/*+
hoodie_prop(
'default.h1',
map('hoodie.datasource.read.begin.instanttime', '20221127083503537', 'hoodie.datasource.read.end.instanttime', '20221127083506081')
),
hoodie_prop(
'default.h2',
map('hoodie.datasource.read.begin.instanttime', '20221127083508715', 'hoodie.datasource.read.end.instanttime', '20221127083511803')
)
*/
id, name, price, ts
from (
select id, name, price, ts
from default.h1
union all
select id, name, price, ts
from default.h2
)
是在hint中添加增量查询相关的参数,先指定表名再写参数,但是文章好像未给出完整的代码地址,大家有时间可以自己试一下。
这种方式,是我按照Hive增量查询Hudi的方式修改的源码,通过set的方式实现增量查询。
PR地址:https://github.com/apache/hudi/pull/7339
关于为啥目前不能通过set参数进行增量查询,这里说明一下:根据文章Hudi Spark SQL源码学习总结-select(查询),可知Hudi的
DefaultSource.createRelation中的optParams参数为readDataSourceTable中的options = table.storage.properties ++ pathOption,也就是表本身属性中的配置参数+path,之后在createRelation并没有接收其他参数,所以不能通过set参数的形式进行查询
和Hive增量查询一样,指定具体表名的增量查询参数
set hoodie.test_hudi_incremental.datasource.query.type=incremental
set hoodie.test_hudi_incremental.datasource.read.begin.instanttime=20221130163703640;
select _hoodie_commit_time, id, name, price, ts, dt from test_hudi_incremental;
+----------------------+-----+-------+--------+-------+-------------+
| _hoodie_commit_time | id | name | price | ts | dt |
+----------------------+-----+-------+--------+-------+-------------+
| 20221130163823274 | 5 | a5 | 50.0 | 5000 | 2022-12-27 |
| 20221130163726780 | 4 | a4 | 40.0 | 4000 | 2022-12-26 |
| 20221130163720795 | 3 | a3 | 30.0 | 3000 | 2022-11-26 |
+----------------------+-----+-------+--------+-------+-------------+
如果不同的库下面有相同的表名,则可以通过库名.表名的形式:
## 需要先开启使用数据库名称限定表名的配置,开启后上面不加库名的配置就失效了
set hoodie.query.use.database = true;
set hoodie.hudi.test_hudi_incremental.datasource.query.type=incremental;
set hoodie.hudi.test_hudi_incremental.datasource.read.begin.instanttime=20221130163703640;
set hoodie.hudi.test_hudi_incremental.datasource.read.end.instanttime=20221130163726780;
set hoodie.hudi.test_hudi_incremental.datasource.read.incr.path.glob=/dt=2022-11*/*;
refresh table test_hudi_incremental;
select _hoodie_commit_time, id, name, price, ts, dt from test_hudi_incremental;
+----------------------+-----+-------+--------+-------+-------------+
| _hoodie_commit_time | id | name | price | ts | dt |
+----------------------+-----+-------+--------+-------+-------------+
| 20221130163720795 | 3 | a3 | 30.0 | 3000 | 2022-11-26 |
+----------------------+-----+-------+--------+-------+-------------+
大家可以自己试一下,不同的库表关联的情形
这里需要注意一点,更新参数后,需要先refresh table,再查询,否则查询时修改的参数不生效,因为会使用缓存中的参数。
这种方式只是简单地修改了一下源码,使set的参数对查询生效。
为了避免有些读者嫌打包麻烦,这里给大家提供了hudi-spark3.1-bundle_2.12-0.13.0-SNAPSHOT.jar的下载地址:https://download.csdn.net/download/dkl12/87221476
本文总结了Spark SQL增量查询Hudi表的一些参数设置,并给出了示例,介绍了使用纯Spark SQL实现增量查询Hudi表的几种方式,不确定未来社区会采用哪种方式,大家目前如果有这种需求的话,可以先选择一种自己喜欢的方式,等未来社区版本支持后,再升级版本。本文没有涉及增量查询的原理,暂未验证增量查询的效率,是否可以起到文件过滤的效果,以后如果有时间会单独整理一篇。
我正在用Ruby编写一个简单的程序来检查域列表是否被占用。基本上它循环遍历列表,并使用以下函数进行检查。require'rubygems'require'whois'defcheck_domain(domain)c=Whois::Client.newc.query("google.com").available?end程序不断出错(即使我在google.com中进行硬编码),并打印以下消息。鉴于该程序非常简单,我已经没有什么想法了-有什么建议吗?/Library/Ruby/Gems/1.8/gems/whois-2.0.2/lib/whois/server/adapters/base.
我知道我可以指定某些字段来使用pluck查询数据库。ids=Item.where('due_at但是我想知道,是否有一种方法可以指定我想避免从数据库查询的某些字段。某种反拔?posts=Post.where(published:true).do_not_lookup(:enormous_field) 最佳答案 Model#attribute_names应该返回列/属性数组。您可以排除其中一些并传递给pluck或select方法。像这样:posts=Post.where(published:true).select(Post.attr
我正在尝试查询我的Rails数据库(Postgres)中的购买表,我想查询时间范围。例如,我想知道在所有日期的下午2点到3点之间进行了多少次购买。此表中有一个created_at列,但我不知道如何在不搜索特定日期的情况下完成此操作。我试过:Purchases.where("created_atBETWEEN?and?",Time.now-1.hour,Time.now)但这最终只会搜索今天与那些时间的日期。 最佳答案 您需要使用PostgreSQL'sdate_part/extractfunction从created_at中提取小时
我最喜欢的Google文档功能之一是它会在我工作时不断自动保存我的文档版本。这意味着即使我在进行关键更改之前忘记在某个点进行保存,也很有可能会自动创建一个保存点。至少,我可以将文档恢复到错误更改之前的状态,并从该点继续工作。对于在MacOS(或UNIX)上运行的Ruby编码器,是否有具有等效功能的工具?例如,一个工具会每隔几分钟自动将Gitcheckin我的本地存储库以获取我正在处理的文件。也许我有点偏执,但这点小保险可以让我在日常工作中安心。 最佳答案 虚拟机有些人可能讨厌我对此的回应,但我在编码时经常使用VIM,它具有自动保存功
我在Rails上使用带有ruby的solr。一切正常,我只需要知道是否有任何现有代码来清理用户输入,比如以?开头的查询。或* 最佳答案 我不知道执行此操作的任何代码,但理论上可以通过查看parsingcodeinLucene来完成并搜索thrownewParseException(只有16个匹配!)。在实践中,我认为您最好只捕获代码中的任何solr异常并显示“无效查询”消息或类似信息。编辑:这里有几个“sanitizer”:http://pivotallabs.com/users/zach/blog/articles/937-s
我正在为锦标赛开发一个Rails应用程序。我在这个查询中使用了三个模型:classPlayertruehas_and_belongs_to_many:tournamentsclassTournament:destroyclassPlayerMatch"Player",:foreign_key=>"player_one"belongs_to:player_two,:class_name=>"Player",:foreign_key=>"player_two"在tournaments_controller的显示操作中,我调用以下查询:Tournament.where(:id=>params
我想用sunspot重现以下原始solr查询q=exact_term_text:fooORterm_textv:foo*ORalternate_text:bar*但我无法通过标准的太阳黑子界面理解这是否可能以及如何实现,因为看起来:fulltext方法似乎不接受多个文本/搜索字段参数我不知道将什么参数作为第一个参数传递给fulltext,就好像我通过了"foo"或"bar"结果不匹配如果我传递一个空参数,我得到一个q=*:*范围过滤器(例如with(:term).starting_with('foo*')(顾名思义)作为过滤器查询应用,因此不参与评分。似乎可以手动编写字符串(或者可能使
例如,假设我有一个名为Products的模型,并且在ProductsController中,我有以下代码用于product_listView以显示已排序的产品。@products=Product.order(params[:order_by])让我们想象一下,在product_listView中,用户可以使用下拉菜单按价格、评级、重量等进行排序。数据库中的产品不会经常更改。我很难理解的是,每次用户选择新的order_by过滤器时,rails是否必须查询,或者rails是否能够以某种方式缓存事件记录以在服务器端重新排序?有没有一种方法可以编写它,以便在用户排序时rails不会重新查询结果
我目前正在尝试了解RoR。我将两个字符串传递到我的Controller中。一个是随机的十六进制字符串,另一个是电子邮件。该项目用于对数据库进行简单的电子邮件验证。我遇到的问题是当我输入如下内容来测试我的页面时:http://signup.testsite.local/confirm/da2fdbb49cf32c6848b0aba0f80fb78c/bob.villa@gmailcom我在:email的参数散列中得到的全部是'bob'。我在gmail和com之间留下了.,因为那样会导致匹配根本不起作用。我的路由匹配如下:match"confirm/:code/:email"=>"conf
我正在寻找一种方便实用的方法来将编码值添加到Ruby中的URL查询字符串。目前,我有:require'open-uri'u=URI::HTTP.new("http",nil,"mydomain.example",nil,nil,"/tv",nil,"show="+URI::encode("Rosie&Jim"),nil)pu.to_s#=>"http://mydomain.example/tv?show=Rosie%20&%20Jim"这不是我要找的,因为我需要得到“http://mydomain.example/tv?show=Rosie%20%26%20Jim”,这样show=值就