
前言
无论是在大数据处理领域,还是在消息处理领域,任务系统都有一个很关键的能力 - 任务触发去重的保障。这个能力对于一些准确性要求极高的场景中(如金融等)是必不可少的。作为 Serverless 化任务处理平台,Serverless Task 也需要提供这类保障,在用户应用层面及自身系统内部两个维度具备任务的准确触发语义。本文主要针对消息处理可靠性这一主题来介绍函数计算内部的一些技术细节,并展示如何在实际应用中使用函数计算所提供的这方面能力来增强任务执行的可靠性。
浅谈任务去重
在讨论异步消息处理系统时,消息处理的基本语义是无法绕开的话题。在一个异步的消息处理系统(任务系统)中,一条消息的处理流程简化如下图所示:

图 1
用户下发任务 - 进入队列 - 任务处理单元监听并获取消息 - 调度到实际 worker 执行
在任务消息整个的流转过程中,任何组件(环节)可能出现的宕机等问题会导致消息的错误传递。一般的任务系统会提供至多 3 个层级的消息处理语义:
●At-Most-Once:保证消息最多被传递一次。当出现网络分区、系统组件宕机时,可能出现消息丢失;
●At-Least-Once:保证消息至少被传递一次。消息传递链路支持错误重试,利用消息重发机制保证下游一定收到上游消息,但是在宕机或者网络分区的场景下,可能导致相同消息传递多次。
●Exactly-Once机制则可以保证消息精确被传送一次,精确一次并不是意味着在宕机或网络分区的场景下没有重传,而是重传对于接受方的状态不产生任何改变,与传送一次的结果一样。在实际生产中,往往是依赖重传机制 & 接收方去重(幂等)来做到 Exactly Once。
函数计算能够提供任务分发的 Exactly Once 语义,即无论在何种情况下,重复的任务将被系统认为是相同的触发,进而只进行一次的任务分发。
结合图 1,如果要做到任务去重,系统至少需要提供两个维度的保障:
1、系统侧保障:任务调度系统自身的 failover 不影响消息的传递正确性及唯一性;
2、提供给用户一种机制,可以做到整个业务逻辑的触发去重语义。
下面,我们将结合简化的 Serverless Task 系统架构,谈一谈函数计算是如何做到上面的能力的。
函数计算异步任务触发去重的实现
函数计算的任务系统架构如下图所示

图 2
首先,用户调用函数计算 API 下发一个任务(步骤 1)进入系统的 API-Server 中,API-Server 进行校验后将消息传入内部队列(步骤 2.1)。后台有一个异步模块实时监听内部队列(步骤 2.2),之后调用资源管理模块获取运行时资源(步骤 2.2-2.3)。获取运行时资源后,调度模块将任务数据下发到 VM 级别的客户端中(步骤 3.1),并由客户端将任务转发至实际的用户运行资源(步骤 3.2)。为了做到上文中所提到的两个维度的保障,我们需要在以下层面进行支持:
1、系统侧保障:在步骤 2.1 - 3.1 中,任何一个中间过程的 Failover 只能触发一次步骤 3.2 的执行,即只会调度一次用户实例的运行;
2、用户侧应用级别去重能力:能够支持用户多次反复执行步骤 1,但实际只会触发一次 步骤 3.2 的执行。
系统侧优雅升级 & Failover 时的任务分发去重保证
当用户的消息进入函数计算系统中(即完成步骤 2.1)后,用户的请求将收到 HTTP 状态码 202 的 Response,用户可以认为已经成功提交一次任务。从该任务消息进入 MQ 起,其生命周期便由 Scheduler 维护,所以 Scheduler 的稳定性及 MQ 的稳定性将直接影响系统 Exactly Once 的实现方案。
在大多数开源消息系统中(如 MQ、Kafka)一般都提供消息多副本存储及唯一消费的语义。函数计算所使用的消息队列(最底层为 RocketMQ)也是同样的,底层存储的 3 副本实现使得我们无需关注消息存储方面的稳定性。除此之外,函数计算所使用的的消息队列还具有以下特性:
1、消费的唯一性:每一个队列中的每一条消息当被消费后,会进入“不可见模式”。在此模式下,其他消费者无法获取该消息;
2、每条消息的实际消费者需要实时更新该模式的不可见时间;当消费者消费完成后,需要显示的删除该消息。
因此,消息在队列中的的整个生命周期如下图所示:

图 3
Scheduler 主要负责消息的处理,其任务主要有以下几个部分组成:
1、根据函数计算负载均衡模块的调度策略,监听自身所负责的队列;
2、当队列中出现消息后,拉取消息,并在内存中维持一个状态:直到消息消费完成(用户实例返回函数执行结果)前,不断更新消息的可见时间,确保消息不会再次在队列中出现;
3、当任务执行完成后,显示删除该消息。
在队列的调度模型方面,函数计算对于普通用户采用“单队列”的管理模式;即每一个用户的所有异步执行请求由一个独立队列相互隔离,并且由一个 Scheduler 固定负责。这个负载的映射关系由函数计算的负载均衡服务进行管理,如下图所示(我们在后续文章中还会更为详细的介绍这部分内容):

图 4
当 Scheduler 1 发生宕机或升级时,任务由两种执行状态:
1、如果消息还未传递到用户的执行实例中(图 2 中的步骤 3.1 ~ 3.2),那么当这台 Scheduler 负责的队列被其他 Scheduler 拾起后,消息将在消费可见期后再次出现,因此 Scheduler 2 将再次获取该消息,做到后续的触发。
2、如果消息已经开始执行(步骤 3.2),当消息在 Scheduler 2 中再次出现后,我们依赖用户 VM 中的 Agent 进行状态管理。此时 Scheduler 2 将向对应的 Agent 发送执行请求;此时 Agent 发现该消息已经存在于内存中,那么将直接忽略执行请求,并将执行的结果在执行后通过此链接告知 Scheduler 2,进而完成 Failover 的恢复。
用户侧业务级别的分发去重实现
函数计算系统能够做到对于单点故障下的每条消息准确的消费能力,但是如果用户侧对于同一条业务数据反复触发函数执行的话,函数计算无法识别不同消息是否在逻辑上是同一个任务。这种情况往往发生在网络分区。在图 2 中,如果用户调用 1 发生超时,此时有可能有两种情况:
1、消息未到达函数计算系统,任务未成功提交;
2、消息已经到达函数计算并入队,任务提交成功,但由于超时用户无法得知提交成功的信息。
大多数情况下用户会对此次的提交进行重试。如果是第 2 种情况,那么同一个任务将被提交并执行多次。因此函数计算需要提供一种机制,保证这种场景下业务的准确性。
函数计算提供了 TaskID 这一任务概念(StatefulAsyncInvocationID)。该 ID 全局唯一。用户每次提交任务均可以指定这样一个 ID。当发生请求超时时,用户可以进行无限次重试。所有的重复重试将在函数计算侧进行校验。函数计算内部使用 DB 对任务 Meta 数据进行存储;当有相同 ID 进入系统时该次请求将被拒绝,并返回 400 错误。此时客户端即可得知任务的提交情况。
在实际使用中以 Go SDK 为例,您可以编辑如下触发任务的代码:
import fc "github.com/aliyun/fc-go-sdk"
func SubmitJob() {
invokeInput := fc.NewInvokeFunctionInput("ServiceName", "FunctionName")
invokeInput = invokeInput.WithAsyncInvocation().WithStatefulAsyncInvocationID("TaskUUID")
invokeOutput, err := fcClient.InvokeFunction(invokeInput)
...
}
便提交了一个独一无二的任务。
总结
本文介绍了函数计算 Serverless Task 对于任务触发去重的相关技术细节,以便支持对于任务执行准确性有严格要求的场景。在使用 Serverless Task 后,您无需担心任何系统组件的 Failover,您每次提交的任务将被准确执行一次。为了支持业务侧语义的分发去重,您可以在提交任务时设置任务的全局唯一 ID,使用函数计算提供的能力帮您对任务进行去重处理。
更多内容关注 Serverless 微信公众号(ID:serverlessdevs),汇集 Serverless 技术最全内容,定期举办 Serverless 活动、直播,用户最佳实践。
我试图在一个项目中使用rake,如果我把所有东西都放到Rakefile中,它会很大并且很难读取/找到东西,所以我试着将每个命名空间放在lib/rake中它自己的文件中,我添加了这个到我的rake文件的顶部:Dir['#{File.dirname(__FILE__)}/lib/rake/*.rake'].map{|f|requiref}它加载文件没问题,但没有任务。我现在只有一个.rake文件作为测试,名为“servers.rake”,它看起来像这样:namespace:serverdotask:testdoputs"test"endend所以当我运行rakeserver:testid时
exe应该在我打开页面时运行。异步进程需要运行。有什么方法可以在ruby中使用两个参数异步运行exe吗?我已经尝试过ruby命令-system()、exec()但它正在等待过程完成。我需要用参数启动exe,无需等待进程完成是否有任何rubygems会支持我的问题? 最佳答案 您可以使用Process.spawn和Process.wait2:pid=Process.spawn'your.exe','--option'#Later...pid,status=Process.wait2pid您的程序将作为解释器的子进程执行。除
如何使用RSpec::Core::RakeTask初始化RSpecRake任务?require'rspec/core/rake_task'RSpec::Core::RakeTask.newdo|t|#whatdoIputinhere?endInitialize函数记录在http://rubydoc.info/github/rspec/rspec-core/RSpec/Core/RakeTask#initialize-instance_method没有很好的记录;它只是说:-(RakeTask)initialize(*args,&task_block)AnewinstanceofRake
请帮助我理解范围运算符...和..之间的区别,作为Ruby中使用的“触发器”。这是PragmaticProgrammersguidetoRuby中的一个示例:a=(11..20).collect{|i|(i%4==0)..(i%3==0)?i:nil}返回:[nil,12,nil,nil,nil,16,17,18,nil,20]还有:a=(11..20).collect{|i|(i%4==0)...(i%3==0)?i:nil}返回:[nil,12,13,14,15,16,17,18,nil,20] 最佳答案 触发器(又名f/f)是
我正在学习Rails,并阅读了关于乐观锁的内容。我已将类型为integer的lock_version列添加到我的articles表中。但现在每当我第一次尝试更新记录时,我都会收到StaleObjectError异常。这是我的迁移:classAddLockVersionToArticle当我尝试通过Rails控制台更新文章时:article=Article.first=>#我这样做:article.title="newtitle"article.save我明白了:(0.3ms)begintransaction(0.3ms)UPDATE"articles"SET"title"='dwdwd
我有一个rubyonrails应用程序。我按照facebook的说明添加了一个像素。但是,要跟踪转化,Facebook要求您将页面置于达到预期结果时出现的转化中。即,如果我想显示客户已注册,我会将您注册后转到的页面作为成功对象进行跟踪。我的问题是,当客户注册时,在我的应用程序中没有登陆页面。该应用程序将用户带回主页。它在主页上显示了一条消息,所以我想看看是否有一种方法可以跟踪来自Controller操作而不是实际页面的转化。我需要计数的Action没有页面,它们是ControllerAction。是否有任何人都知道的关于如何执行此操作的gem、文档或最佳实践?这是进入布局文件的像素
我写了一个非常简单的rake任务来尝试找到这个问题的根源。namespace:foodotaskbar::environmentdoputs'RUNNING'endend当在控制台中执行rakefoo:bar时,输出为:RUNNINGRUNNING当我执行任何rake任务时会发生这种情况。有没有人遇到过这样的事情?编辑上面的rake任务就是写在那个.rake文件中的所有内容。这是当前正在使用的Rakefile。requireFile.expand_path('../config/application',__FILE__)OurApp::Application.load_tasks这里
在我做的一些网络开发中,我有多个操作开始,比如对外部API的GET请求,我希望它们同时开始,因为一个不依赖另一个的结果。我希望事情能够在后台运行。我找到了concurrent-rubylibrary这似乎运作良好。通过将其混合到您创建的类中,该类的方法具有在后台线程上运行的异步版本。这导致我编写如下代码,其中FirstAsyncWorker和SecondAsyncWorker是我编写的类,我在其中混合了Concurrent::Async模块,并编写了一个名为“work”的方法来发送HTTP请求:defindexop1_result=FirstAsyncWorker.new.async.
我有一个super简单的脚本,它几乎包含了FayeWebSocketGitHub页面上用于处理关闭连接的内容:ws=Faye::WebSocket::Client.new(url,nil,:headers=>headers)ws.on:opendo|event|p[:open]#sendpingcommand#sendtestcommand#ws.send({command:'test'}.to_json)endws.on:messagedo|event|#hereistheentrypointfordatacomingfromtheserver.pJSON.parse(event.d
我以前没有使用过cron,所以我不能确定我这样做是对的。我想要自动化的任务似乎没有运行。我在终端中执行了这些步骤:sudogeminstall每当切换到应用程序目录无论何时。(这创建了文件schedule.rb)我将此代码添加到schedule.rb:every10.minutesdorunner"User.vote",environment=>"development"endevery:hourdorunner"Digest.rss",:environment=>"development"end我将此代码添加到deploy.rb:after"deploy:symlink","depl