草庐IT

java - 在 Akka actor 中处理异常的最佳实践

coder 2024-03-08 原文

我有以下任务,对此我有 Java/Executors 解决方案运行良好,但我想在 Akka 中实现相同的功能并寻找最佳实践建议。

问题:

从多个 URL 中并行获取/解析数据,阻塞直到所有数据被获取并返回聚合结果。应该重试错误(IOException 等)达到一定次数。

到目前为止,我的实现非常简单——创建知道应该获取哪些 URL 的 Fetcher actor,它创建一堆 Worker actor 并向它们发送 URL,每条消息一个。完成特定 URL Worker 后,将消息连同结果发送回 Fetcher。 Fetcher 保持结果状态,Workers 无状态。下面是简化的代码。

getter :

class Fetcher extends UntypedActor {
  private ActorRef worker;

  public void onReceive(Object message) throws Exception {
    if (message instanceof FetchMessage) {
      this.worker = context().actorOf(SpringExtension.SpringExtProvider.get(actorSystem).props("Worker")
              .withRouter(new RoundRobinPool(4)), "worker");
      for(URL u: urls) {
        this.worker.tell(new WorkUnit(u), getSelf());
      }
   }
   else if (message instanceof Result) {
     // accumulate results
   }
}

worker :

class Worker extends UntypedActor {

  public void onReceive(Object message) throws Exception {
    if (message instanceof WorkUnit) {
      // fetch URL, parse etc
      // send result back to sender
      getSender().tell(new Result(...), null);
    }
}

到目前为止一切顺利,在没有异常(exception)的情况下,一切都按预期进行。

但是,如果在 Worker 中获取 URL 时出现 IOException,那么 Akka 将重启 Worker actor,但 Worker 正在处理的消息会丢失。即使我使用不同的 SupervisorStrategy,结果也是一样的——一些消息实际上“丢失”了。当然,我可以使用 try/catch 将代码包装在 Worker.onReceive() 中,但我觉得这违背了 Akka 的理念。我想我可以使用持久消息传递,但我认为在这种情况下增加消息持久性的复杂性是不合理的。

我可能需要某种方式让 Fetcher 发现 Worker 未能获取某些 URL 并再次重新发送 WorkUnit 或检测到某些结果在太长时间内没有返回。处理这种情况的最佳方法是什么?

谢谢,

最佳答案

我们在我们的项目中遇到了类似的问题,我们找到了一个适合我们的解决方案 - 无论异常、工作人员故障、网络故障等如何执行任务。尽管我必须承认代码最终变得有点复杂。

所以我们的设置如下:

  1. 有一个 WorkerControl actor 处理任务管理和与 worker 的通信
  2. 有许多 Worker actors 位于不同的 VM 中(可能位于不同的物理机器上)
  3. WorkerControl 接收一些要处理的数据并在 worker 之间调度任务

我们或多或少地尝试遵循描述的指南 here

但我们也提高了设计的容错能力。

在 WorkerControl 中,我们保留以下数据结构:

Map<ActorPath, ActorRef> registeredWorkers // registry of workers
Deque<TaskInfo> todoList                   // tasks that have not been yet processed
Map<ActorRef, TaskInfo> assignedTasks      // tasks assigned to the workers
Map<ActorPath, ActorRef> deadWorkers       // registry of dead workers

对于每个要执行的任务,我们保留一个数据结构

class TaskInfo {
    private final WorkerTask task;
    private int failureCount = 0;
    private int restartCount = 1;
    private Date latestResultDelivery;
}

我们处理以下可能的故障列表

Worker 通过抛出异常(即您的情况下的 IOException)而使任务失败

我们向工作控件发送一条 new Failure(caughtException) 消息。看到它后,工作控件会增加 failureCount 并将任务放在 todoList 队列的头部。当达到给定的失败次数时,任务将被视为永久失败并且永远不会重试。 (之后,可以以自定义方式记录、处置和处理永久失败的任务)。

Worker在给定的时间段内没有交付任何结果(例如,他陷入死循环,worker机器上的资源争用,worker神秘消失,任务处理时间过长)

我们为此做了两件事

  1. 我们初始化 taskInfolatestResultDelivery 字段,并将任务分配存储在 assignedTasks 映射中。
  2. 我们会定期对工作人员控件运行“健康检查”,以确定工作人员是否在某项任务上工作的时间过长。

    for (ActorRef busyWorker : assignedTasks.keySet()) {
        Date now = new Date();
        if (now.getTime()
                - assignedTasks.get(busyWorker).getLatestResultDeliveryTime() >= 0) {
            logger.warn("{} has failed to deliver the data processing result in time", nameOf(busyWorker));
            logger.warn("{} will be marked as dead", nameOf(busyWorker));
            getSelf().tell(new Failure(new IllegalStateException("Worker did not deliver any result in time")),
                    busyWorker);
            registeredWorkers.remove(busyWorker.path());
            deadWorkers.put(busyWorker.path(), busyWorker);
        }
    }

网络断开,工作进程死亡

我们又做了两件事:

  1. 在 worker 向 worker 控件注册后,我们开始观察 worker Actor

    registeredWorkers.put(worker.path(), worker);
    context().watch(worker);

  2. 如果我们在工作控件中收到一条 Terminated 消息,我们会增加 restartCount 并将任务返回到 todoList。同样,重启次数过多的任务最终会永久失败,并且再也不会重试。这是在任务本身成为远程工作人员死亡的原因(例如,由于 OutOfMemoryError 而导致远程系统关闭)的情况下完成的。我们为失败和重启保留单独的计数器,以便能够更好地精确重试策略。

我们还做了一些尝试,使 worker 本身具有容错能力。例如。工作人员控制他的任务的执行时间,并监控他最近是否一直在做任何事情。

根据您需要处理的故障类型,您可以实现所列策略的一部分。

底线:正如其中一条评论中提到的那样:为了重新安排任务,您需要在 Fetcher 中保留一些数据结构来映射工作人员和分配的任务。

关于java - 在 Akka actor 中处理异常的最佳实践,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23945464/

有关java - 在 Akka actor 中处理异常的最佳实践的更多相关文章

  1. ruby-on-rails - 使用 Ruby on Rails 进行自动化测试 - 最佳实践 - 2

    很好奇,就使用ruby​​onrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提

  2. ruby - 解析 RDFa、微数据等的最佳方式是什么,使用统一的模式/词汇(例如 schema.org)存储和显示信息 - 2

    我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i

  3. ruby - 如何指定 Rack 处理程序 - 2

    Rackup通过Rack的默认处理程序成功运行任何Rack应用程序。例如:classRackAppdefcall(environment)['200',{'Content-Type'=>'text/html'},["Helloworld"]]endendrunRackApp.new但是当最后一行更改为使用Rack的内置CGI处理程序时,rackup给出“NoMethodErrorat/undefinedmethod`call'fornil:NilClass”:Rack::Handler::CGI.runRackApp.newRack的其他内置处理程序也提出了同样的反对意见。例如Rack

  4. java - 等价于 Java 中的 Ruby Hash - 2

    我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/

  5. ruby-on-rails - Rails - 乐观锁定总是触发 StaleObjectError 异常 - 2

    我正在学习Rails,并阅读了关于乐观锁的内容。我已将类型为integer的lock_version列添加到我的articles表中。但现在每当我第一次尝试更新记录时,我都会收到StaleObjectError异常。这是我的迁移:classAddLockVersionToArticle当我尝试通过Rails控制台更新文章时:article=Article.first=>#我这样做:article.title="newtitle"article.save我明白了:(0.3ms)begintransaction(0.3ms)UPDATE"articles"SET"title"='dwdwd

  6. ruby - #之间? Cooper 的 *Beginning Ruby* 中的错误或异常 - 2

    在Cooper的书BeginningRuby中,第166页有一个我无法重现的示例。classSongincludeComparableattr_accessor:lengthdef(other)@lengthother.lengthenddefinitialize(song_name,length)@song_name=song_name@length=lengthendenda=Song.new('Rockaroundtheclock',143)b=Song.new('BohemianRhapsody',544)c=Song.new('MinuteWaltz',60)a.betwee

  7. ruby - 在 Ruby 中重新分配常量时抛出异常? - 2

    我早就知道Ruby中的“常量”(即大写的变量名)不是真正常量。与其他编程语言一样,对对象的引用是唯一存储在变量/常量中的东西。(侧边栏:Ruby确实具有“卡住”引用对象不被修改的功能,据我所知,许多其他语言都没有提供这种功能。)所以这是我的问题:当您将一个值重新分配给常量时,您会收到如下警告:>>FOO='bar'=>"bar">>FOO='baz'(irb):2:warning:alreadyinitializedconstantFOO=>"baz"有没有办法强制Ruby抛出异常而不是打印警告?很难弄清楚为什么有时会发生重新分配。 最佳答案

  8. java - 从 JRuby 调用 Java 类的问题 - 2

    我正在尝试使用boilerpipe来自JRuby。我看过guide从JRuby调用Java,并成功地将它与另一个Java包一起使用,但无法弄清楚为什么同样的东西不能用于boilerpipe。我正在尝试基本上从JRuby中执行与此Java等效的操作:URLurl=newURL("http://www.example.com/some-location/index.html");Stringtext=ArticleExtractor.INSTANCE.getText(url);在JRuby中试过这个:require'java'url=java.net.URL.new("http://www

  9. java - 我的模型类或其他类中应该有逻辑吗 - 2

    我只想对我一直在思考的这个问题有其他意见,例如我有classuser_controller和classuserclassUserattr_accessor:name,:usernameendclassUserController//dosomethingaboutanythingaboutusersend问题是我的User类中是否应该有逻辑user=User.newuser.do_something(user1)oritshouldbeuser_controller=UserController.newuser_controller.do_something(user1,user2)我

  10. java - 什么相当于 ruby​​ 的 rack 或 python 的 Java wsgi? - 2

    什么是ruby​​的rack或python的Java的wsgi?还有一个路由库。 最佳答案 来自Python标准PEP333:Bycontrast,althoughJavahasjustasmanywebapplicationframeworksavailable,Java's"servlet"APImakesitpossibleforapplicationswrittenwithanyJavawebapplicationframeworktoruninanywebserverthatsupportstheservletAPI.ht

随机推荐