我有以下任务,对此我有 Java/Executors 解决方案运行良好,但我想在 Akka 中实现相同的功能并寻找最佳实践建议。
问题:
从多个 URL 中并行获取/解析数据,阻塞直到所有数据被获取并返回聚合结果。应该重试错误(IOException 等)达到一定次数。
到目前为止,我的实现非常简单——创建知道应该获取哪些 URL 的 Fetcher actor,它创建一堆 Worker actor 并向它们发送 URL,每条消息一个。完成特定 URL Worker 后,将消息连同结果发送回 Fetcher。 Fetcher 保持结果状态,Workers 无状态。下面是简化的代码。
getter :
class Fetcher extends UntypedActor {
private ActorRef worker;
public void onReceive(Object message) throws Exception {
if (message instanceof FetchMessage) {
this.worker = context().actorOf(SpringExtension.SpringExtProvider.get(actorSystem).props("Worker")
.withRouter(new RoundRobinPool(4)), "worker");
for(URL u: urls) {
this.worker.tell(new WorkUnit(u), getSelf());
}
}
else if (message instanceof Result) {
// accumulate results
}
}
worker :
class Worker extends UntypedActor {
public void onReceive(Object message) throws Exception {
if (message instanceof WorkUnit) {
// fetch URL, parse etc
// send result back to sender
getSender().tell(new Result(...), null);
}
}
到目前为止一切顺利,在没有异常(exception)的情况下,一切都按预期进行。
但是,如果在 Worker 中获取 URL 时出现 IOException,那么 Akka 将重启 Worker actor,但 Worker 正在处理的消息会丢失。即使我使用不同的 SupervisorStrategy,结果也是一样的——一些消息实际上“丢失”了。当然,我可以使用 try/catch 将代码包装在 Worker.onReceive() 中,但我觉得这违背了 Akka 的理念。我想我可以使用持久消息传递,但我认为在这种情况下增加消息持久性的复杂性是不合理的。
我可能需要某种方式让 Fetcher 发现 Worker 未能获取某些 URL 并再次重新发送 WorkUnit 或检测到某些结果在太长时间内没有返回。处理这种情况的最佳方法是什么?
谢谢,
最佳答案
我们在我们的项目中遇到了类似的问题,我们找到了一个适合我们的解决方案 - 无论异常、工作人员故障、网络故障等如何执行任务。尽管我必须承认代码最终变得有点复杂。
所以我们的设置如下:
WorkerControl actor 处理任务管理和与 worker 的通信WorkerControl 接收一些要处理的数据并在 worker 之间调度任务我们或多或少地尝试遵循描述的指南 here
但我们也提高了设计的容错能力。
在 WorkerControl 中,我们保留以下数据结构:
Map<ActorPath, ActorRef> registeredWorkers // registry of workers
Deque<TaskInfo> todoList // tasks that have not been yet processed
Map<ActorRef, TaskInfo> assignedTasks // tasks assigned to the workers
Map<ActorPath, ActorRef> deadWorkers // registry of dead workers
对于每个要执行的任务,我们保留一个数据结构
class TaskInfo {
private final WorkerTask task;
private int failureCount = 0;
private int restartCount = 1;
private Date latestResultDelivery;
}
我们处理以下可能的故障列表
Worker 通过抛出异常(即您的情况下的 IOException)而使任务失败
我们向工作控件发送一条 new Failure(caughtException) 消息。看到它后,工作控件会增加 failureCount 并将任务放在 todoList 队列的头部。当达到给定的失败次数时,任务将被视为永久失败并且永远不会重试。 (之后,可以以自定义方式记录、处置和处理永久失败的任务)。
Worker在给定的时间段内没有交付任何结果(例如,他陷入死循环,worker机器上的资源争用,worker神秘消失,任务处理时间过长)
我们为此做了两件事
taskInfo 的 latestResultDelivery 字段,并将任务分配存储在 assignedTasks 映射中。
for (ActorRef busyWorker : assignedTasks.keySet()) {
Date now = new Date();
if (now.getTime()
- assignedTasks.get(busyWorker).getLatestResultDeliveryTime() >= 0) {
logger.warn("{} has failed to deliver the data processing result in time", nameOf(busyWorker));
logger.warn("{} will be marked as dead", nameOf(busyWorker));
getSelf().tell(new Failure(new IllegalStateException("Worker did not deliver any result in time")),
busyWorker);
registeredWorkers.remove(busyWorker.path());
deadWorkers.put(busyWorker.path(), busyWorker);
}
}
网络断开,工作进程死亡
我们又做了两件事:
在 worker 向 worker 控件注册后,我们开始观察 worker Actor
registeredWorkers.put(worker.path(), worker); context().watch(worker);
如果我们在工作控件中收到一条 Terminated 消息,我们会增加 restartCount 并将任务返回到 todoList。同样,重启次数过多的任务最终会永久失败,并且再也不会重试。这是在任务本身成为远程工作人员死亡的原因(例如,由于 OutOfMemoryError 而导致远程系统关闭)的情况下完成的。我们为失败和重启保留单独的计数器,以便能够更好地精确重试策略。
我们还做了一些尝试,使 worker 本身具有容错能力。例如。工作人员控制他的任务的执行时间,并监控他最近是否一直在做任何事情。
根据您需要处理的故障类型,您可以实现所列策略的一部分。
底线:正如其中一条评论中提到的那样:为了重新安排任务,您需要在 Fetcher 中保留一些数据结构来映射工作人员和分配的任务。
关于java - 在 Akka actor 中处理异常的最佳实践,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23945464/
很好奇,就使用rubyonrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提
我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i
Rackup通过Rack的默认处理程序成功运行任何Rack应用程序。例如:classRackAppdefcall(environment)['200',{'Content-Type'=>'text/html'},["Helloworld"]]endendrunRackApp.new但是当最后一行更改为使用Rack的内置CGI处理程序时,rackup给出“NoMethodErrorat/undefinedmethod`call'fornil:NilClass”:Rack::Handler::CGI.runRackApp.newRack的其他内置处理程序也提出了同样的反对意见。例如Rack
我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/
我正在学习Rails,并阅读了关于乐观锁的内容。我已将类型为integer的lock_version列添加到我的articles表中。但现在每当我第一次尝试更新记录时,我都会收到StaleObjectError异常。这是我的迁移:classAddLockVersionToArticle当我尝试通过Rails控制台更新文章时:article=Article.first=>#我这样做:article.title="newtitle"article.save我明白了:(0.3ms)begintransaction(0.3ms)UPDATE"articles"SET"title"='dwdwd
在Cooper的书BeginningRuby中,第166页有一个我无法重现的示例。classSongincludeComparableattr_accessor:lengthdef(other)@lengthother.lengthenddefinitialize(song_name,length)@song_name=song_name@length=lengthendenda=Song.new('Rockaroundtheclock',143)b=Song.new('BohemianRhapsody',544)c=Song.new('MinuteWaltz',60)a.betwee
我早就知道Ruby中的“常量”(即大写的变量名)不是真正常量。与其他编程语言一样,对对象的引用是唯一存储在变量/常量中的东西。(侧边栏:Ruby确实具有“卡住”引用对象不被修改的功能,据我所知,许多其他语言都没有提供这种功能。)所以这是我的问题:当您将一个值重新分配给常量时,您会收到如下警告:>>FOO='bar'=>"bar">>FOO='baz'(irb):2:warning:alreadyinitializedconstantFOO=>"baz"有没有办法强制Ruby抛出异常而不是打印警告?很难弄清楚为什么有时会发生重新分配。 最佳答案
我正在尝试使用boilerpipe来自JRuby。我看过guide从JRuby调用Java,并成功地将它与另一个Java包一起使用,但无法弄清楚为什么同样的东西不能用于boilerpipe。我正在尝试基本上从JRuby中执行与此Java等效的操作:URLurl=newURL("http://www.example.com/some-location/index.html");Stringtext=ArticleExtractor.INSTANCE.getText(url);在JRuby中试过这个:require'java'url=java.net.URL.new("http://www
我只想对我一直在思考的这个问题有其他意见,例如我有classuser_controller和classuserclassUserattr_accessor:name,:usernameendclassUserController//dosomethingaboutanythingaboutusersend问题是我的User类中是否应该有逻辑user=User.newuser.do_something(user1)oritshouldbeuser_controller=UserController.newuser_controller.do_something(user1,user2)我
什么是ruby的rack或python的Java的wsgi?还有一个路由库。 最佳答案 来自Python标准PEP333:Bycontrast,althoughJavahasjustasmanywebapplicationframeworksavailable,Java's"servlet"APImakesitpossibleforapplicationswrittenwithanyJavawebapplicationframeworktoruninanywebserverthatsupportstheservletAPI.ht