草庐IT

Apache Pulsar——分层存储

小波同学 2023-09-29 原文

前言

在一些流数据用例场景中,用户希望将数据长时间存储在流中。虽然 Apache Pulsar 对topic backlog的大小没有限制,但将所有数据存储在 Pulsar 中较长时间,存储成本比较大。分层存储支持在不影响终端用户的条件下,将较旧的数据移动到长期存储中。

在推荐服务中,开发者不希望限制 backlog 的大小。以音乐服务为例,终端用户每听一首歌,就向 topic 中添加一条消息。使用这一 topic 训练推荐算法,根据终端用户听过的音乐推荐用户可能喜欢的音乐。然后,将计算结果推荐给用户,再循环这个过程。

推荐算法并非一成不变。音乐服务的数据科学家一直在不断优化推荐算法,以更好地预测用户喜欢的音乐,从而提高用户对推荐服务的满意度和参与度。

但是,如果每次修改算法时,都只运行修改时间点之后的用户数据,不仅预测的准确度会受到影响,判断算法的修改效果也会需要一段较长的时间。为了解决这一问题,算法需要尽可能多地运行用户历史数据。

一、Pulsar 分层存储

Pulsar 允许用户存储任意大小的 topic backlog。当集群将要耗尽空间时,用户只需添加新的存储节点,系统将会自动重新平衡数据。但是,这样的操作运行一段时间后,运维成本十分昂贵。

Pulsar 通过提供分层存储(Apache Pulsar 2.1 起新增的特性)减少了成本/大小的损失。分层存储为用户提供大小不受限制的 backlog,且无需添加存储节点;卸载较旧的 topic 数据到长期存储中,长期存储的成本比在 Pulsar 集群中存储的成本低一个数量级。对于终端用户来说,消费存储在 Pulsar 集群或分层存储中的 topic 数据没有明显差别。位于 Pulsar 集群和分层存储中的 topic 生产和消费消息的方式也完全相同。

Pulsar 通过分片架构实现了分层存储。Pulsar topic 的消息日志由一系列分片组成。序列中的最后一个分片是 Pulsar 当前写入的分片。当前序列之前的所有分片都已封装,也就是说,这些分片中的数据不可变。由于数据不可变,因此可以轻易地将数据复制到另一个存储系统,而不必担心一致性的问题。复制完成后,可以立即更新消息日志元数据中的数据指针,并且可以删除 Pulsar 在 Apache BookKeeper 中存储的数据副本。

二、在 Pulsar 中使用分层存储

Pulsar 目前支持通过 Amazon S3、GCS(Google Cloud Storage)、HDFS 进行长期存储。

整体配置操作也是非常简单的,只需要在broker.conf中配置卸载地址和路径,并开启卸载自动运行即可,详细配置大家可参考: https://pulsar.apache.org/docs/en/cookbooks-tiered-storage/

要使用 S3 进行分层存储,管理员需要先在 S3 中创建一个存储桶(bucket);然后,用存储桶和创建存储桶的区域配置 broker。

managedLedgerOffloadDriver=S3

s3ManagedLedgerOffloadRegion=eu-west-3

s3ManagedLedgerOffloadBucket=pulsar-topic-offload

用户不直接在 Pulsar 中配置身份验证。Pulsar 使用的 DefaultAWSCredentialsProviderChain 可以在多个位置查找验证信息。

配置验证信息最简单的方式是在pulsar-env.sh中设置环境变量。

关于配置身份验证方法的更多信息,参阅分层存储文档:
http://pulsar.apache.org/docs/en/concepts-tiered-storage/

配置好所有 broker 后,就可以开始使用分层存储了。可以配置分层存储的数据卸载为自动运行,也可以手动触发。

自动迁移数据到长期存储

管理员可以为命名空间设置大小阈值策略。配置大小阈值策略后,如果命名空间中的任一 topic 在 Pulsar 集群上的数据大小超过了阈值,topic 就会卸载分片到长期存储中,直到 Pulsar 集群上的数据大小在阈值之内。

例如,当 Pulsar 集群上的数据大小超过 1 GB 时,指定命名空间中的 topic 卸载分片,可以使用以下命令:

pulsar-admin namespaces set-offload-threshold --size 1G my-tenant/my-namespace

当命名空间中的任一 topic 超过阈值时,topic 将会移动数据至长期存储,释放 Pulsar 集群上的存储空间。

手动卸载

除了配置自动卸载数据外,还可以通过 REST 接口或命令行界面在单个 topic 上手动触发卸载操作。要通过命令行界面触发,用户必须指定在 Pulsar 集群上为 topic 保留的最大数据量。如果 Pulsar 集群上的 topic 数据大小超过了设置的阈值,则将此 topic 上的分片移动到长期存储中,直到 Pulsar 集群上的数据大小在阈值之内。移动数据时,先移动较旧的分片。

pulsar-admin topics offload --size-threshold 10M my-tenant/my-namespace/topic1

更多关于配置和使用分层存储的信息,参阅分层存储文档:
https://pulsar.apache.org/docs/en/cookbooks-tiered-storage/

参考:
https://blog.csdn.net/zhaijia03/article/details/108744892

有关Apache Pulsar——分层存储的更多相关文章

  1. ruby - 解析 RDFa、微数据等的最佳方式是什么,使用统一的模式/词汇(例如 schema.org)存储和显示信息 - 2

    我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i

  2. ruby - Rack:如何将 URL 存储为变量? - 2

    我正在编写一个简单的静态Rack应用程序。查看下面的config.ru代码:useRack::Static,:urls=>["/elements","/img","/pages","/users","/css","/js"],:root=>"archive"map'/'dorunProc.new{|env|[200,{'Content-Type'=>'text/html','Cache-Control'=>'public,max-age=6400'},File.open('archive/splash.html',File::RDONLY)]}endmap'/pages/search.

  3. ruby-on-rails - 为什么在 Rails 5.1.1 中删除了 session 存储初始化程序 - 2

    我去了这个website查看Rails5.0.0和Rails5.1.1之间的区别为什么5.1.1不再包含:config/initializers/session_store.rb?谢谢 最佳答案 这是删除它的提交:Setupdefaultsessionstoreinternally,nolongerthroughanapplicationinitializer总而言之,新应用没有该初始化器,session存储默认设置为cookie存储。即与在该初始值设定项的生成版本中指定的值相同。 关于

  4. ruby-on-rails - 尝试设置 Amazon 的 S3 存储桶 : 403 Forbidden error & setting permissions - 2

    我正在关注Hartl的railstutorial.org并已到达11.4.4:Imageuploadinproduction.我做了什么:注册亚马逊网络服务在AmazonIdentityandAccessManagement中,我创建了一个用户。用户创建成功。在AmazonS3中,我创建了一个新存储桶。设置新存储桶的权限:权限:本教程指示“授予上一步创建的用户读写权限”。但是,在存储桶的“权限”下,未提及新用户名。我只能在每个人、经过身份验证的用户、日志传送、我和亚马逊似乎根据我的名字+数字创建的用户名之间进行选择。我已经通过选择经过身份验证的用户并选中了上传/删除和查看权限的框(而不

  5. ruby - 如何打印出 Mechanized 存储的 cookie? - 2

    我正在使用mechanize登录网站,然后检索页面。我遇到了一些问题,我怀疑这是由于cookie中的某些值造成的。当Mechanize登录网站时,我假设它存储了cookie。如何通过Mechanize打印出存储在cookie中的所有数据? 最佳答案 代理有一个cookie方法。agent=Mechanize.newpage=agent.get("http://www.google.com/")agent.cookiesagent.cookies.to_scookie返回一个Mechanize::Cookiesobject

  6. ruby-on-rails - 闪存消息存储在哪里? - 2

    我以为它们存储在cookie中-但不,检查cookie没有任何结果。session也不存储它们。那么,我在哪里可以找到它们?我需要这个来直接设置它们(而不是通过flashhash)。 最佳答案 它们存储在inyoursessionstore.自rails2.0以来的默认设置是cookie存储,但请检查config/initializers/session_store.rb以检查您是否使用默认设置以外的东西。 关于ruby-on-rails-闪存消息存储在哪里?,我们在StackOverf

  7. ruby-on-rails - 在 Rails 中存储(结构化)配置数据的位置 - 2

    对于我正在编写的Rails3应用程序,我正在考虑从本地文件系统上的XML、YAML或JSON文件中读取一些配置数据。重点是:我应该把这些文件放在哪里?Rails应用程序中是否有用于存储此类内容的默认位置?附带说明一下,我的应用程序部署在Heroku上。 最佳答案 我经常做的是:如果文件是通用配置文件:我在目录/config中创建一个YAML文件,每个环境有一个上层key如果我为每个环境(大项目)创建一个文件:我为每个环境创建一个YAML并将它们存储在/config/environments/然后我在加载YAML的地方创建了一个初始化

  8. ruby - 如何存储和读取 RubyVM::InstructionSequence? - 2

    有没有办法将RubyVM::InstructionSequence存储到文件中并稍后读取?我尝试了Marshal.dump但没有成功。我收到以下错误:`dump':no_dump_dataisdefinedforclassRubyVM::InstructionSequence(TypeError) 最佳答案 是的,有办法。首先,您需要使InstructionSequence的load方法可访问,默认情况下该方法是禁用的:require'fiddle'classRubyVM::InstructionSequence#RetrieveR

  9. ruby-on-rails - 如何解析位于 Amazon S3 存储桶中的 CSV 文件 - 2

    下面是我用来从应用程序中解析CSV的代码,但我想解析位于AmazonS3存储桶中的文件。当推送到Heroku时它也需要工作。namespace:csvimportdodesc"ImportCSVDatatoInventory."task:wiwt=>:environmentdorequire'csv'csv_file_path=Rails.root.join('public','wiwt.csv.txt')CSV.foreach(csv_file_path)do|row|p=Wiwt.create!({:user_id=>row[0],:date_worn=>row[1],:inven

  10. ruby - 存储外部 API 的密码 - 最佳实践 - 2

    如果我构建了一个应用程序来访问来自Gmail、Twitter和Facebook的一些数据,并且我希望用户只需输入一次他们的身份验证信息,并且在几天或几周后重置,那会怎样是在Ruby中动态执行此操作的最佳方法吗?我看到很多人只是拥有他们客户/用户凭证的配置文件,如下所示:gmail_account:username:myClientpassword:myClientsPassword这看起来a)非常不安全,b)如果我想为成千上万的用户存储此类信息,它就无法工作。推荐的方法是什么?我希望能够在这些服务之上构建一个界面,因此每次用户进行交易时都必须输入凭据是不可行的。

随机推荐