草庐IT

Presto+Alluxio 加速 Iceberg 数据湖访问

王北南 2023-03-28 原文

一、Presto & Alluxio

1、Presto Overview

​Presto 是一个里程碑式的产品,它能够让我们很简单的不需要数据的导入和导出,就可以使用标准的 SQL 来查询数据湖仓上的数据。早先是数据仓库 data warehouse 即 Hive 数据仓库,之后出现了 Hudi 和 Iceberg,有一些公司用 Presto 查询 Kafka ,还有 Druid 等等。Druid 很快,但是可能对 Join 支持不好,可以用 Presto 直接查询 Druid 一步到位,然后通过一些计算的 pushdown,能够让 Druid 中有些跑得比较困难的任务得到很好的运行。

Presto 中有一个概念叫做交互式的查询,即在几秒种最多几分钟返回一个结果。现实中很多人用 Presto 来做秒级查询,即 subsecond 的查询,一秒钟返回结果,得出一些很快很高效的 dashboard。也有人用 Presto 来处理一些几小时的 job,甚至用 Presto 来部分取代 ETL,通过 SQL 语句就能直接处理数据,简单易用。Presto 处理的数据量为 PB 级,在日常的使用中,一般一个 Presto 集群,一天处理几十个 PB 的数据,还是很容易的。当然,集群越多,处理的数据量也越大。

目前 Presto 有两个开源的社区,一个是 prestodb,此社区主要是由 Facebook 领导的社区,包括 uber、Twitter,以及国内公司 TikTok,腾讯都有参与。

另一个社区是 trinodb,prestodb 分出去之后,新建的开源社区更加的活跃。此社区背后的商用公司叫 starburst,这个社区更加活跃,用户会更多一些,但是 prestodb 背后的大厂多一些。

Presto 目前的使用场景有很多,很多数据科学家和数据工程师通过 SQL 查询想要的数据;一些公司决策使用的 BI 工具,比如 tableau 和 zeppelin;公司决策需要报表和 dashboard,这些 query 可能需要在几秒钟快速地完成,将数据展示出来,比如广告的转化率和活跃用户,这些数据需要实时或准实时的反馈出来;还有一个场景就是 A/B testing,因为它的好处就是很快,结果能够很快的反馈回来;最后一个是 ETL,ETL 是很多公司的数据仓库或者数据平台的基石,非常重要,但是 Presto 并不是特别适合在这个领域,虽然很多人使用 Presto 来处理一些 ETL 的 job,但是 Presto 并不是一个很容错的系统,如果计算过程中间坏掉,整个查询可能就要从头开始了。​

下图展示了 Presto 发展的历史。

2、Presto 主体架构

上图是 Presto 的主体架构,coordinator 如同一个 master,负责调度,当有一个查询进来时,把 SQL 解析生成查询的 plan,然后根据 plan 分发给若干个 worker 执行。根据不同的运算性质,每个 worker 去查对应的数据源,数据源可能是 Hive 数仓,也可能是数据湖 Iceberg 或者 Hudi,不同的数据源对应不同的 connector。connector 在使用的时候,其实在 Presto 里就像一个 catalog 一个 namespace。比如在 SQL 中查询 Hive 数据仓库中的部门表,通过 hive.ADS.tablename 就可以把这个 table 找到。

由于 Presto 有着多个 connector 和 catalog,天生能够提供数据的 federation,即联合。可以在 Presto 中联合不同的数据源,可以来自 Hive 、Iceberg 、Kafka 、Druid、mysql 等各式各样的数据源,并把来自多个数据源的数据 join 到一起。Presto很灵活,如很多人还把 Hive 的表跟 Google 的 spreadsheet 表格 join 到一起。

目前 presto 主要的数据来源可能 95% 甚至 99% 是来自 Hive 。当然现在也有些变化了,由于数据湖的崛起,可能越来越多流量会转向数据湖 Iceberg 和 Hudi。

3、Presto + Alluxio Overview

Presto 访问数据源就是通过直连的方式,比如要访问 HDFS 就连到 HDFS 上。有的公司可能数据源太多,可能有十几个 HDFS 的集群,这时候 presto 需要一个统一的命名空间,此时 Presto 可以提供一个联合,在物理的数据层上面提供一个抽象层,看起来就像是一个 cluster,然后在 Presto 中呈现出来的就是一个统一的命名空间,这个功能还是挺方便的。

4、Presto 与 Alluxio 结合

Presto 查数据并不是把数据给吃进来,而是访问数据的原始的存储,数据存储在 HDFS 就访问 HDFS,当 SQL 查询进来后翻译完,去到这个 Hive Metastore 中拿到元数据,通过元数据找到表数据存储在哪个目录中,将该目录分开,然后让每个 worker 读取若干的文件去计算结果。在结合 Alluxio 的工作时,改变了缓存路径。

​其实在商用版本有更好的一个功能。可以不改变这个路径,还是这个 S3 路径,但它其实使用了本地的 Alluxio,当然这在我们数据库中遇到一些麻烦,因为数据库中 expert 文件里边是 hard code 而不是死的路径,为缓存带来了一些麻烦,我们通过转换,让本来是访问原始数据的存储,通过 election 变成访问本地的数据源,得到提速的效果。

5、Co-located deployment

我们提出提供了另外一种部署的方式。我们把 Presto worker 和 Alluxio worker 部署在同一台物理机上。这样保证了数据的本地性。确保数据加载到了 Presto worker 的本地。这里 Presto DB 上有更精简的实现方式 ,在 to local cache 项目中,有 local cache 实现数据的本地化,通过数据本地化省掉网络传输。对于 Alluxio 就是 Co-located 的部署方式。它跟 HDFS 相比也省掉了一次网络的传输。

6、Disaggregated deployment

国内很多公司使用数据一体机,将 Presto、Spark、HDFS、 ClickHouse 等都放到一起。针对这种情况,推荐的实现就是用 in memory 的 Lark show 的 local cache,会有非常好的提速,即 local cache 结合 Alluxio worker ,能有百分之四五十的提速。缺点在于这种实现需要使用很多的内存,数据缓存在内存中,通过 SSD 或者内存来给 HDD 或者慢速的 SSD 做一个提速。这种方式即 Alluxio worker 跟 Presto worker 捆绑到了一起,200 个 Presto worker节点,就需要 200 个 Alluxio worker,这种方式会导致拓展的时候可能出现问题。

所以当数据量特别巨大,且跨数据中心访问的时候,更推荐分离式 disaggregated 的部署方式。

二、Alluxio & Iceberg

Hive 数据仓库已经有十几年的历史了​,但是一直存在着一些问题,对于一个表的 Schema 经常有多人的改动,且改动往往不按规律改,原来是简单类型,改成了复杂类型,导致无法保证数据的一致性,如果一条 SQL 查询两年的数据,这个表很可能两年中改了好几次,可能很多列的类型也改了,名字也改了,甚至可能删掉或者又加回来,这就会导致 Presto 报错,即使 Spark 也很难在数据 Schema 修改的过程中做到完全兼容。这是各个计算引擎的通病。

其实最早我们讨论 Iceberg 这个方案的时候,最想解决的就是 Schema 的升级变化问题,另外想解决的就是数据版本的一致性问题。众所周知,数据可能中间会出错,此时需要数据回滚从而查看上一个版本的数据,也可能要做一些 time travel 查指定时间版本的数据。有些数据是追加的,可以通过 partition 按时间来分区,通过 partition 查询指定时间分区数据。有的数据集是快照数据集,数据后一天覆盖前一天,历史数据无法保留,而 Iceberg 能解决这个问题。

其实 Iceberg 并没有提供一个新的数据存储,它更多的是提供一个数据的组织方式。数​据的存储还是像 Hive 的数仓一样,存在 parquet 或者 ORC 中,Iceberg 支持这两种数据格式。

当然很多时候为了能使用 export table,我们会把一些原始的数据 CSV 或者其他格式导进来变成一个 expert table,根据分区重新组织写入 parquet 或者 ORC 文件。

关于 Schema 的 evolution 是一个痛点,Presto 支持读和写,但是目前用 Presto 写 Iceberg 的不多,主要还是用 Presto 读,用 Spark 来写,这给我们的 Alluxio to Iceberg 结合造成了一定的麻烦。

1、Alluxio + Iceberg Architecture 方案

  • 方案一:

所有的操作都通过 Alluxio 写,Spark 和 Presto 将 Alluxio 作为一个底层存储,从而充分保证数据的一致性。

弊端是,实施该方案的公司稍微大了之后,数据直接往 S3 或 HDFS 写,不通过 Alluxio。

  • 方案二:

读写都通过 Alluxio,通过自动同步元数据,保证拿到最新数据,此方案基本可用,不过还需 Spark 社区、Iceberg 社区以及 Presto 社区继续合作来把数据一致性做得更好。

三、最佳实践

​1、Iceberg Native Catalog

目前,与 cache 结合比较好的是使用 Iceberg native catalog,在 Iceberg 叫 Hadoop catalog,在 Presto 中叫 native catalog,如果使用最原始的 Hive catalog,则 table 的元数据,即 table 位置的数据是放在 Hive-Metastore 中,Presto 或者 Spark 访问表的时候先去查询 Hive-Metastore 获取表的存储路径,然后通过 Iceberg 将数据文件加载进来,但是实际上,table 会有变更,此时需要将 Hive-Metastore 上锁,这种方案在只有一个 Hive-Metastore 的时候才有效,如果面临多个 Hive-Metastore 会出现锁失效的问题。​

更好的一个方案是 Iceberg native catalog,即完全抛弃 Hive-Metastore,使用一个目录来存储这个 table 的列表,这个目录可以在 HDFS 上或者 S3 上,我们更加推荐 HDFS,因为 HDFS 效果好一些,一致性也强一些。这一方案避免了 Hive-Metastore service 本身的很多问题,如 scalability 、延时。此方案对 cache 也比较友好,不需要做一个 metadata 的 cache,而是直接 cache 存放 metadata 的目录。

2、Iceberg Local Cache

Local Cache 的实现是 Presto DB 的 RaptorX 项目,是给 Hive connector 做 Local Cache,很容易就可以给 Iceberg connector 也来打开这个 Local Cache。相当于是 cache 了 parquet 的文件到 local 的 SSD 上,Prestoworker,worker 上的 SSD 其实本来是闲置的,通过它来缓存数据效果还是挺好的。它可以提速,但我们目前还没有特别好的官方 benchmark。

目前只是对 worker 进行 cache,metadata coordinator 是不开的,打开的话可能会有数据一致性的问题。

3、数据加密

早先 parquet 文件是不加密的,cache 了 parquet 文件,虽然不是明文,但只要你知道怎么读取这个 parquet 文件格式就能把所有数据读取出来。其 magic number 原来是 pare 1 就代表第一个版本,现在增加了一个 magic number 即 pare 加密的版本,这个加密版本把一些加密的信和 metadata 存在 footer 里边,它可以选择对一些 column 和配置进行加密。加密好后,数据便不再是明文的了,如果没有对应的 key,就无法读取出数据。

通过对 parquet 加密,我们不再需要第三方的加密,也不需要对整个文件加密,可以只对需要加密的一些数据进行加密,这个方案也解决了另外一个重要的问题,就是有的公司其实是整个文件来加密存放在 HDFS,然后 Presto 读之前把它解密好,很多文件存储系统就是存的时候是加密的。读取的时候确实拿到的解密好的数据,当 Presto 再通过 Local Cache 缓存数据的时候,cache 里存储还是明文数据,这破坏了数据加密的管理。但是采用 parquet 内部加密,local cache 就可以满足数据加密的要求了。

4、谓词下推

Iceberg 通过谓词下推(Predicate Pushdown)可以减少查询的数据量。

原来 Presto 的暴力查询,根据条件把符合条件的一条条数据挑出来,但是中间有优化。其实很多查询条件可以直接 push 到 Iceberg,Iceberg 读取文件的范围就小了。

下面是一个 benchmark,可以看到没有谓词下推前扫到了 200 万条记录,CPU time 是 62 毫秒。谓词下推后,扫到了一条记录,查询时间极大的缩短,这也是对缓存的一个优化。开谓词下推(Predicate Pushdown)功能后,我们发现,缓存层次够用,扫的文件少了很多,这意味着我们都可以缓存的下了,命中率有一个提高。

四、未来的工作

在前面的工作中我们发现系统的瓶颈在 CPU。此瓶体现在很多地方,其中很大一部分是对 parquet 文件的解析,parquet 文件解析任务太重了。由于 parquet 很节约资源,很难将 parquet 转换为更好的格式。此时,一种解决方案是将数据分为冷热数据,将较热的数据转换为更加轻量,序列化低的格式存到缓存中,通过实验,将 parquet 文件反序列好的数据直接放到内存中,效率提升 8% 到 10% 。

但这有一个问题,此方案对 Java 的 GC 压力非常大,因为缓存长时间存在。我们发现此方案并不是那么好实施,所以我们更加想用 off heap 的方式,将数据存在 heap 之外。此时不能 cache object 本身,需要 cache Arrow 或者 flat buffer 格式,这两种格式反序列成本极低,又是二进制的流存在内存中,通过 off heap 把它装进来,然后在 Java 中再反序列化,这样可以达到一个很好的提速效果。

另外我们也可以把一些算子 pushdown 到 native 实现存储。比如说 Alluxio 再增加一些实现 native 的 worker 和客户端的 cache 实现,我们将算子直接 pushdown 过去,就像前面 Iceberg pushdown 一样,有些计算 push 到存储,存储返回来的结果特别少,它帮你计算,而且格式更好,它是 Arrow 并可以有 native 的实现,也可以向量化的计算。

Java 也能向量化计算。但问题在于 Java 的版本要求比较高,需要 Java16 或 17,而现在 Presto DB 还在 Java 11,trainer 倒是可以了,但是这个效果也不是特别好,因为  Presto 和 trainer 内存中的格式对性能化计算不友好,而且这个格式基本上是不能动的,如果要动,基本上全都要重新实现,这也是为什么会有这个 vlogs 在那里的原因。

可能这个 Presto 以后会有格式转换,但是不在眼前,但是我们可以 off heap 的缓存,可以把这个 Arrow 缓存到 off heap 上,然后在那里边需要的时候把它拿出来。然后反序列化成 page,然后给 Presto 进行进一步的计算。这个开发正在进行,可能在将来会给大家展现一部分的工作。其实就是为了降低 CPU 的使用和系统的延时,降低 GC 的开销,让系统变得更加的稳定。

今天的分享就到这里,谢谢大家。

有关Presto+Alluxio 加速 Iceberg 数据湖访问的更多相关文章

  1. ruby - 为什么我可以在 Ruby 中使用 Object#send 访问私有(private)/ protected 方法? - 2

    类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc

  2. ruby - 解析 RDFa、微数据等的最佳方式是什么,使用统一的模式/词汇(例如 schema.org)存储和显示信息 - 2

    我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i

  3. ruby-on-rails - 在混合/模块中覆盖模型的属性访问器 - 2

    我有一个包含模块的模型。我想在模块中覆盖模型的访问器方法。例如:classBlah这显然行不通。有什么想法可以实现吗? 最佳答案 您的代码看起来是正确的。我们正在毫无困难地使用这个确切的模式。如果我没记错的话,Rails使用#method_missing作为属性setter,因此您的模块将优先,阻止ActiveRecord的setter。如果您正在使用ActiveSupport::Concern(参见thisblogpost),那么您的实例方法需要进入一个特殊的模块:classBlah

  4. ruby - 续集在添加关联时访问many_to_many连接表 - 2

    我正在使用Sequel构建一个愿望list系统。我有一个wishlists和itemstable和一个items_wishlists连接表(该名称是续集选择的名称)。items_wishlists表还有一个用于facebookid的额外列(因此我可以存储opengraph操作),这是一个NOTNULL列。我还有Wishlist和Item具有续集many_to_many关联的模型已建立。Wishlist类也有:selectmany_to_many关联的选项设置为select:[:items.*,:items_wishlists__facebook_action_id].有没有一种方法可以

  5. ruby - Ruby 有 `Pair` 数据类型吗? - 2

    有时我需要处理键/值数据。我不喜欢使用数组,因为它们在大小上没有限制(很容易不小心添加超过2个项目,而且您最终需要稍后验证大小)。此外,0和1的索引变成了魔数(MagicNumber),并且在传达含义方面做得很差(“当我说0时,我的意思是head...”)。散列也不合适,因为可能会不小心添加额外的条目。我写了下面的类来解决这个问题:classPairattr_accessor:head,:taildefinitialize(h,t)@head,@tail=h,tendend它工作得很好并且解决了问题,但我很想知道:Ruby标准库是否已经带有这样一个类? 最佳

  6. ruby - 我如何添加二进制数据来遏制 POST - 2

    我正在尝试使用Curbgem执行以下POST以解析云curl-XPOST\-H"X-Parse-Application-Id:PARSE_APP_ID"\-H"X-Parse-REST-API-Key:PARSE_API_KEY"\-H"Content-Type:image/jpeg"\--data-binary'@myPicture.jpg'\https://api.parse.com/1/files/pic.jpg用这个:curl=Curl::Easy.new("https://api.parse.com/1/files/lion.jpg")curl.multipart_form_

  7. 世界前沿3D开发引擎HOOPS全面讲解——集3D数据读取、3D图形渲染、3D数据发布于一体的全新3D应用开发工具 - 2

    无论您是想搭建桌面端、WEB端或者移动端APP应用,HOOPSPlatform组件都可以为您提供弹性的3D集成架构,同时,由工业领域3D技术专家组成的HOOPS技术团队也能为您提供技术支持服务。如果您的客户期望有一种在多个平台(桌面/WEB/APP,而且某些客户端是“瘦”客户端)快速、方便地将数据接入到3D应用系统的解决方案,并且当访问数据时,在各个平台上的性能和用户体验保持一致,HOOPSPlatform将帮助您完成。利用HOOPSPlatform,您可以开发在任何环境下的3D基础应用架构。HOOPSPlatform可以帮您打造3D创新型产品,HOOPSSDK包含的技术有:快速且准确的CAD

  8. FOHEART H1数据手套驱动Optitrack光学动捕双手运动(Unity3D) - 2

    本教程将在Unity3D中混合Optitrack与数据手套的数据流,在人体运动的基础上,添加双手手指部分的运动。双手手背的角度仍由Optitrack提供,数据手套提供双手手指的角度。 01  客户端软件分别安装MotiveBody与MotionVenus并校准人体与数据手套。MotiveBodyMotionVenus数据手套使用、校准流程参照:https://gitee.com/foheart_1/foheart-h1-data-summary.git02  数据转发打开MotiveBody软件的Streaming,开始向Unity3D广播数据;MotionVenus中设置->选项选择Unit

  9. 使用canal同步MySQL数据到ES - 2

    文章目录一、概述简介原理模块二、配置Mysql使用版本环境要求1.操作系统2.mysql要求三、配置canal-server离线下载在线下载上传解压修改配置单机配置集群配置分库分表配置1.修改全局配置2.实例配置垂直分库水平分库3.修改group-instance.xml4.启动监听四、配置canal-adapter1修改启动配置2配置映射文件3启动ES数据同步查询所有订阅同步数据同步开关启动4.验证五、配置canal-admin一、概述简介canal是Alibaba旗下的一款开源项目,Java开发。基于数据库增量日志解析,提供增量数据订阅&消费。Git地址:https://github.co

  10. ruby-on-rails - 创建 ruby​​ 数据库时惰性符号绑定(bind)失败 - 2

    我正在尝试在Rails上安装ruby​​,到目前为止一切都已安装,但是当我尝试使用rakedb:create创建数据库时,我收到一个奇怪的错误:dyld:lazysymbolbindingfailed:Symbolnotfound:_mysql_get_client_infoReferencedfrom:/Library/Ruby/Gems/1.8/gems/mysql2-0.3.11/lib/mysql2/mysql2.bundleExpectedin:flatnamespacedyld:Symbolnotfound:_mysql_get_client_infoReferencedf

随机推荐