草庐IT

Hudi 概念和特性

AlienPaul 2023-09-21 原文

背景

本篇为Hudi概念和特性相关介绍。依据于官网和相关博客资料,融入了个人理解。内容可能会有疏漏,欢迎大家指正和补充。

Hudi概念

Apache Hudi是一个Data Lakes的开源方案,Hudi是Hadoop Updates and Incrementals的简写,它是由Uber开发并开源的Data Lakes解决方案。Hudi具有如下基本特性/能力:

  • Hudi能够摄入(Ingest)和管理(Manage)基于HDFS之上的大型分析数据集,主要目的是高效的减少入库延时。
  • Hudi基于Spark/Flink/Hive来对HDFS上的数据进行更新、插入、删除等。
  • Hudi在HDFS数据集上提供如下流原语:插入更新(如何改变数据集);增量拉取(如何获取变更的数据)。
  • Hudi可以对HDFS上的parquet格式数据进行插入/更新操作。
  • Hudi通过自定义InputFormat与Hadoop生态系统(Spark、Hive、Parquet)集成。
  • Hudi通过Savepoint来实现数据恢复。

来源:Hudi数据湖简介_阿福Chris的博客-CSDN博客_hudi 数据湖

时间线

https://hudi.apache.org/docs/next/timeline/

Hudi内部维护了时间线,支持按照数据的到达时间顺序来获取数据。Hudi确保时间线上的动作是原子性的。

Hudi 表类型

https://hudi.apache.org/docs/next/table_types/#table-and-query-types

Copy on Write

数据使用列存储形式存放,每次提交都会产生新版本的列存储文件。将原有数据文件copy一份,合并入变更的数据后保存一份新的数据。因此写入放大很高,读取放大基本为0。适用于读取多写入少的场景。写入存在延迟,Flink每次checkpoint或者Spark每次微批处理才会commit新数据。读取延迟很低。可以通过hoodie.cleaner.commits.retained配置项确定需要保存的最近commit个数,防止存储空间占用无限放大。可以提供较好的并发控制,因为读取数据的时候,无法读取到正在写入但尚未commit或者是写入失败的数据。

Merge on Read

数据使用行存储(avro)和列存储(parquet)共同存放。其中新变更的数据使用行存储,历史数据采用列存储。每当满足一定条件的时候(经过n个commit或者经过特定时间)Hudi开始compact操作,将行存储的数据和列存储的合并,生成新的列存储文件。适用于写多读少的场景。写入延迟很小,读取的时候需要合并新文件avro和parquet,存在一定延迟。需要使用定期的online compaction或者是手工执行的offline compaction将avro格式和parquet格式文件合并。

通过配置项table.type指定表类型。

两种表类型的特性对比:

Trade-off CopyOnWrite MergeOnRead
Data Latency Higher Lower
Query Latency Lower Higher
Update cost (I/O) Higher (rewrite entire parquet) Lower (append to delta log)
Parquet File Size Smaller (high update(I/0) cost) Larger (low update cost)
Write Amplification Higher Lower (depending on compaction strategy)

查询类型

https://hudi.apache.org/docs/next/table_types/#table-and-query-types

Snapshot Queries : Queries see the latest snapshot of the table as of a given commit or compaction action. In case of merge on read table, it exposes near-real time data(few mins) by merging the base and delta files of the latest file slice on-the-fly. For copy on write table, it provides a drop-in replacement for existing parquet tables, while providing upsert/delete and other write side features.
Incremental Queries : Queries only see new data written to the table, since a given commit/compaction. This effectively provides change streams to enable incremental data pipelines.
Read Optimized Queries : Queries see the latest snapshot of table as of a given commit/compaction action. Exposes only the base/columnar files in latest file slices and guarantees the same columnar query performance compared to a non-hudi columnar table.

总结:

  • Snapshot Queries:MOR表合并avro和parquet文件之后输出结果,存在较大的查询延迟,但是数据没有延迟。COW表直接读取现有的新版本parquet文件,查询延迟低,但是有数据延迟(最新的数据可能没有写入,还没有commit)。
  • Incremental Queries:只查询某次commit或compaction之后新插入或者修改的数据。
  • Read Optimized Queries:仅适用于MOR表,为了优化MOR表的读取性能,此方式读取的时候不会强制合并avro和parquet文件。而是只读取parquet文件。这样查询延迟很低,但是数据存在延迟(在avro但是还没有compact的数据无法被查询到)。
Table Type Supported Query types
Copy On Write Snapshot Queries + Incremental Queries
Merge On Read Snapshot Queries + Incremental Queries + Read Optimized Queries

通过hoodie.datasource.query.type参数控制查询类型。配置项对应为:

  • snapshot(默认值)
  • read_optimized
  • incremental
Trade-off Snapshot Read Optimized
Data Latency Lower Higher
Query Latency Higher (merge base / columnar file + row based delta / log files) Lower (raw base / columnar file performance)

索引

Hudi通过HoodieKey(recordKey和poartition path)和file id的对应关系来加速upsert操作。这正是Hudi的索引机制。这种对应关系在初始记录写入之后不会在改变。

对于COW表插入数据的场景,索引可以快速的过滤掉不涉及数据修改的file。对于MOR表插入数据的场景,索引能够很快定位到需要合并的文件。不需要像Hive ACID一样,合并所有的base file。

Hudi支持下面4种Index选项:

  • Bloom Index(默认值):基于recordkey构建布隆过滤器,可以快速定位record key位于哪个file中。
  • Simple:将更新/删除的数据同表中存储的数据的key做简单的join操作。
  • HBase index:将索引映射存储到HBase中。
  • 自己实现索引。

所有具有GLOBAL和非GLOBAL两种(HBase本来就是global的)。其中:

  • global index确保一个表的所有分区都不会有重复的数据。但是更新的删除的代价会随着表中数据量的增长而加大。对于较小的表可以接受。
  • 非global index只能确保一个表的同一分区之内不会存在重复数据。需要writer去确保某条记录的更新/删除操作对应的partition path始终一致。在查找index的时候性能更好。

使用场景:

  • 事实表数据迟到场景:Bloom
  • event表去重(通常只append数据):Bloom
  • 维度表(通常不会分区)变更(变更量较少):SIMPLE或HBase

hoodie.index.type用来修改Index选项。

注意:使用GLOBAL_BLOOM的时候需要留意hoodie.bloom.index.update.partition.path配置。源码给出的解释如下:

  /**
   * Only applies if index type is GLOBAL_BLOOM.
   * <p>
   * When set to true, an update to a record with a different partition from its existing one
   * will insert the record to the new partition and delete it from the old partition.
   * <p>
   * When set to false, a record will be updated to the old partition.
   */

设置为true的话,如果record更新操作修改了partition,则会在新partition插入这条数据,然后在旧partition删除这条数据。
如果设置为false,会在就parititon更新这条数据。

hoodie.simple.index.update.partition.path对于GLOBAL_SIMPLE也同理。

文件结构

https://hudi.apache.org/docs/next/file_layouts/

  • Hudi在分布式文件系统base path下的目录结构来保存表数据。
  • 表被分解为一个个分区。
  • 在每个分区中,文件以file group形式组织,每个都由独特的file id标识。
  • 每个file group包含多个file slice。
  • 每个slice包含一个base file(commit或者compact的时候创建)还有一系列的log file(MOR表的插入或修改)。

元数据表(Metadata Table)

https://hudi.apache.org/docs/next/metadata/

用于提高读写性能,避免使用list file操作。

0.10.1以后metadata table默认启用(hoodie.metadata.enable)。

为了确保metadata table保持最新,针对同一张Hudi表的写操作需要根据不同的场景增加相应配置。

单个writer同步表服务(清理,聚簇,压缩),只需配置hoodie.metadata.enable=true,重启writer。

单个writer异步表服务(同一进程内),需要配置乐观并发访问控制:

hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.cleaner.policy.failed.writes=LAZY
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider

多个writer(不同进程)异步表服务,需要配置乐观并发访问控制:

hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.cleaner.policy.failed.writes=LAZY
hoodie.write.lock.provider=<distributed-lock-provider-classname>

外部分布式锁提供方有:ZookeeperBasedLockProvider, HiveMetastoreBasedLockProviderDynamoDBBasedLockProvider

Hudi 写类型

https://hudi.apache.org/docs/next/write_operations/

  • UPSERT : This is the default operation where the input records are first tagged as inserts or updates by looking up the index. The records are ultimately written after heuristics are run to determine how best to pack them on storage to optimize for things like file sizing. This operation is recommended for use-cases like database change capture where the input almost certainly contains updates. The target table will never show duplicates.
  • INSERT : This operation is very similar to upsert in terms of heuristics/file sizing but completely skips the index lookup step. Thus, it can be a lot faster than upserts for use-cases like log de-duplication (in conjunction with options to filter duplicates mentioned below). This is also suitable for use-cases where the table can tolerate duplicates, but just need the transactional writes/incremental pull/storage management capabilities of Hudi.
  • BULK_INSERT : Both upsert and insert operations keep input records in memory to speed up storage heuristics computations faster (among other things) and thus can be cumbersome for initial loading/bootstrapping a Hudi table at first. Bulk insert provides the same semantics as insert, while implementing a sort-based data writing algorithm, which can scale very well for several hundred TBs of initial load. However, this just does a best-effort job at sizing files vs guaranteeing file sizes like inserts/upserts do.

总结:

  • UPSERT: 通过索引来区别数据是insert还是update。不会存在重复数据。
  • INSERT: 跳过index查找过程。比upsert执行要快,但无法保证数据不重复。
  • BULK_INSERT: upsert和insert使用内存缓存。upsert和insert操作对加载原始(存量)数据到Hudi中不是很合适。加载初始数据适合使用bulk insert方式。此方式使用基于排序的写入算法,适合TB级别数据的导入操作。减少序列化和数据合并操作。不能够提供数据去重功能。
  • DELETE: 删除操作。Hudi支持软删除和硬删除。软删除指的是保留Hoodie key的同时将所有其他字段设置为null。需要表的schema允许其他所有字段为null,然后将其他所有字段upsert为null。硬删除指的是物理删除。

通过write.operation配置项指定。

写入步骤

The following is an inside look on the Hudi write path and the sequence of events that occur during a write.

  1. Deduping(去重)
    同一批中的数据先去重,需要combine或者reduce by key。
  2. Index Lookup(查询索引)
    查找插入的数据位于哪些file slice中。
  3. File Sizing(文件大小控制)
    Hudi基于之前提交数据的平均大小,制定计划将小文件中插入足够的数据,使它的大小接近配置的容量上限限制。
  4. Partitioning(分区)
    确定update和insert的数据属于哪个file group,可能伴随新file group的创建。
  5. Write I/O(写入数据)
    写入数据过程。根据表类型写入base file或者是log file。
  6. Update Index(更新索引)
    Now that the write is performed, we will go back and update the index.
    写入操作已完成,更新索引。
  7. Commit(提交)
    原子提交所有的更改。
  8. Clean (if needed)(清理,如果需要的话)
    如果需要,运行清理步骤。
  9. Compaction(压缩)
    如果使用MOR表,压缩会同步运行,或者是异步调度。
  10. Archive(归档)
    最后归档步骤将老旧的timeline项目移动到归档目录。

Schema变更

https://hudi.apache.org/docs/next/schema_evolution

试验功能,Spark 3.1.x和3.2.x支持Schema变更。

Key Generation

https://hudi.apache.org/docs/next/key_generation

Primary key由RecordKey和Partition path组成。

RecordKey由hoodie.datasource.write.recordkey.field决定。Partition path由hoodie.datasource.write.partitionpath.field决定。

并发控制

https://hudi.apache.org/docs/next/concurrency_control

Hudi支持MVCC和乐观并发访问两种方式。MVCC方式所有的table service都使用同一个writer来保证没有冲突,避免竟态条件。新版本的Hudi增加了乐观并发访问控制(OCC)。支持文件级别的乐观锁。需要依赖外部组件实现乐观锁,例如Zookeeper,Hive metastore等。

启用并发控制:

hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.cleaner.policy.failed.writes=LAZY
hoodie.write.lock.provider=<lock-provider-classname>

使用Zookeeper分布式锁:

hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
hoodie.write.lock.zookeeper.url
hoodie.write.lock.zookeeper.port
hoodie.write.lock.zookeeper.lock_key
hoodie.write.lock.zookeeper.base_path

使用HiveMetastore分布式锁:

hoodie.write.lock.provider=org.apache.hudi.hive.HiveMetastoreBasedLockProvider
hoodie.write.lock.hivemetastore.database
hoodie.write.lock.hivemetastore.table

禁用并发写入:

hoodie.write.concurrency.mode=single_writer
hoodie.cleaner.policy.failed.writes=EAGER

有关Hudi 概念和特性的更多相关文章

  1. WebSocket的那些事(1-概念篇) - 2

    目录一、什么是Websocket二、WebSocket部分header介绍三、HTTPVSWebSocket四、什么时候使用WebSockets五、关于SockJS和STOMP一、什么是Websocket根据RFC6455标准,Websocket协议提供了一种标准化的方式在客户端和服务端之间通过TCP连接建立全双工、双向通信渠道。它是一种不同于HTTP的TCP协议,但是被设计为在HTTP基础上运行。Websocket交互始于HTTP请求,该请求会通过HTTPUpgrade请求头去升级请求,进而切换到Websocket协议。请求报文如下:GET/spring-websocket-portfoli

  2. 半个月狂飙1000亿,ChatGPT概念股凭什么? - 2

    ChatGPT掀起了AI股历史上最疯狂的一轮市值狂飙。自春节后至今,ChatGPT概念股开始了暴走模式,短短半月时间,海天瑞声、开普云等ChatGPT概念股市值累计增加了近1400亿。如此的爆炸效应,得益于ChatGPT所展现出商业化落地的巨大潜力。要知道,在此之前,无论是十年AI投入超千亿的百度,还是困在硬件化里的AI四小龙,都在重复着AI商业化难落地的故事。ChatGPT的出现,让AI从生产力的赋能者直接成为一种创造生产力的工具。随着订阅模式的推出,ChatGPT已经成为第一个以AI技术为核心直接变现的消费者应用。本文持有以下核心观点:1、ChatGPT是AI技术迭代的受益者。过去受限技术

  3. c# - 与 C# 相比,您会强调 Ruby 的哪些语言特性? - 2

    我正在就Ruby语言和环境向.NET(C#)开发团队进行一系列演讲。我把它当作一个机会来强调Ruby相对于C#的优势。首先,我想在进入环境之前专注于语言本身(RoR与ASPMVC等)。你会介绍Ruby语言的哪些特性? 最佳答案 我刚才在一个.NET用户组做了一个关于IronRuby的演讲,遇到了类似的问题。我关注的事情是:鸭子打字。没有什么比ListstringList=newList()更愚蠢的了;表达力强,语法简洁。简单的事情,比如省略括号、数组和散列文字等(结合鸭子类型,你会得到string_list=[]这显然更好)。所有的

  4. ruby - 你最喜欢 Ruby 的什么特性? - 2

    就目前而言,这个问题不适合我们的问答形式。我们希望答案得到事实、引用或专业知识的支持,但这个问题可能会引起辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visitthehelpcenter为指导。9年前关闭。已向.NET提出类似问题和Java,但不适用于Ruby。所以,你最喜欢Ruby的什么特性?您可能还对hiddenfeaturesofRuby感兴趣.请具体说明,并为每个答案发布一项功能。解释或代码示例会很好。 最佳答案 块非常好:my_array.each{|element|printelement}#.

  5. ruby-on-rails - Ruby on Rails 最酷的特性是什么,为什么选择它? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭8年前。Improvethisquestion在我问这个问题之前,我浏览了SO上“RubyonRails”的搜索结果。找不到太多,但以下(foundonthispage)让我觉得很有趣Personally,Istartedusing.html,movedontophp,triedruby(hatedit),discoveredPython/DJango..andhavebeenhappyeversince.这就是交易。我个人目前还没有

  6. ruby - 在 Ruby 的正则表达式中,前瞻和后视概念如何支持这种零宽度断言概念? - 2

    我刚刚经历了这个概念Zero-WidthAssertions从文档中。我想到了一些快速的问题-为什么这样的名字Zero-WidthAssertions?Look-ahead怎么了和look-behind概念支持这样的Zero-WidthAssertions概念?什么这样的?,,=s,-4个符号在模式内指示?你能帮我集中精力了解实际发生的事情我还尝试了一些小代码来理解逻辑,但对它们的输出没有那么自信:irb(main):001:0>"foresight".sub(/(?!s)ight/,'ee')=>"foresee"irb(main):002:0>"foresight".sub(/(?

  7. ruby - 是否有任何你避免使用的 Ruby 语言特性? - 2

    在我看来,Ruby的句法非常灵活,很多东西可以用多种方式编写。作为Ruby程序员,有没有什么语言特性/语法糖/编码约定是您避免清楚的?我问的是您选择有意不使用的东西,而不是您仍然需要学习的东西。如果您的回答是“我什么都用!”,如果读者知道相关的Ruby语法,您是否曾经对代码进行注释?[我对RoR上下文中的Ruby特别感兴趣,但欢迎一切。] 最佳答案 “$”全局变量的整个范围(参见Pickaxe2pp333-336)大多继承自Perl,非常可怕,尽管我有时发现自己使用$:而不是$LOAD_PATH.

  8. ruby-on-rails - 有哪些对 Rails 开发有用的 Emacs 特性 - 2

    哪些Emacs功能、包、附加组件等可以帮助您进行日常的RubyOnRails开发? 最佳答案 两者的先前版​​本emacs-rails模式,和Rinari(Rails开发的两种最流行的模式)功能非常丰富,但又臃肿又笨重。为了保持一个小巧、干净、可靠、功能强大且可破解的核心,Rinari将避免许多“花里胡哨”的功能。然而,这并不是说这些额外的好东西可能没有用。这个页面应该作为链接到一些其他工具/包的链接点,这些工具/包通常与Rinari和Rails一起工作。如果您对添加到此列表或Rinari的新功能有任何想法,请通过http://gr

  9. ruby - Scala 缺少哪些动态语言(如 Ruby 或 Clojure)的特性? - 2

    当您选择Scala(或F#、Haskell、C#)等静态类型语言而不是Ruby、Python、Clojure、Groovy(具有宏或运行时元编程功能)等动态类型语言时,您在实践中失去了什么)?请考虑最好的静态类型语言和最好的(在您看来)动态类型的语言,而不是最差的。答案总结:恕我直言,Ruby等动态语言相对于Scala等静态类型语言的主要优势是:快速的编辑-运行周期(JavaRebel是否缩小了差距?)目前Scala/Lift社区比Ruby/Rails或Python/Django小得多可以修改类型定义(尽管动机或需要不是很清楚) 最佳答案

  10. 涡旋光束基本概念介绍 - 2

    涡旋光束及其MATLAB实现前言涡旋光束的基本概念常见的涡旋涡旋光束涡旋光束的产生方法前言笔者新开一块专栏,专门用于讨论整理总结涡旋光束的相关内容,从基本的概念出发,推导相关的公式,并结合MATLAB进行相关的仿真,不清楚这个专栏会更新多少期,我会分享部分的代码,全部的代码有需要的话可以私聊我。当然大家对这个专栏感兴趣的话,欢迎积极交流。涡旋光束的基本概念​涡旋光束(vortexbeam)是指携带光学涡旋,具有exp(imϕ)exp(im\phi)exp(imϕ)相位分布的光束,其中mmm表示相位拓扑电荷数,ϕ\phiϕ是柱坐标下的方位角。之前的分享中笔者已经说明了部分的激光光束的表达式,想要

随机推荐