草庐IT

【开源项目】任务调度框架PowerJob介绍及源码解析

秋装什么 2024-01-29 原文

项目介绍

PowerJob(原OhMyScheduler)是全新一代分布式调度与计算框架,能让您轻松完成作业的调度与繁杂任务的分布式计算。

项目地址

  • 源码:https://gitee.com/KFCFans/PowerJob
  • 官网:http://www.powerjob.tech/index.html

快速入门

https://www.yuque.com/powerjob/guidence/nyio9g

源码解析

服务端启动

  1. PowerJobServerApplication启动。
    public static void main(String[] args) {

        pre();

        AkkaStarter.init();
        VertXStarter.init();

        // Start SpringBoot application.
        try {
            SpringApplication.run(PowerJobServerApplication.class, args);
        } catch (Throwable t) {
            log.error(TIPS);
            throw t;
        }
    }
  1. AkkaStarter.init();,启动actorSystem,用FriendRequestHandler作为消息的处理器。加载配置oms-server.akka.conf。服务端口号设置为10086。
    public static void init() {

        Stopwatch stopwatch = Stopwatch.createStarted();
        log.info("[PowerJob] PowerJob's akka system start to bootstrap...");

        // 忽略了一个问题,机器是没办法访问外网的,除非架设自己的NTP服务器
        // TimeUtils.check();

        // 解析配置文件
        Config akkaFinalConfig = parseConfig();
        actorSystem = ActorSystem.create(RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, akkaFinalConfig);
        actorSystem.actorOf(FriendRequestHandler.defaultProps(), RemoteConstant.SERVER_FRIEND_ACTOR_NAME);
        log.info("[PowerJob] PowerJob's akka system started successfully, using time {}.", stopwatch);
    }
  1. VertXStarter.init();主要是新建了Vertx对象,Vertx.vertx();,端口号设置成10010
  2. Initializer#initHandler设置了Vertx处理器WorkerRequestHttpHandler和Akka的消息处理器。
@Component
@ConditionalOnExpression("'${execution.env}'!='test'")
public class Initializer {

    @PostConstruct
    public void initHandler() {
        // init akka
        AkkaStarter.actorSystem.actorOf(WorkerRequestAkkaHandler.defaultProps(), RemoteConstant.SERVER_ACTOR_NAME);
        // init vert.x
        VertXStarter.vertx.deployVerticle(new WorkerRequestHttpHandler());
    }
}
  1. WorkerRequestHttpHandler是创建了一个HttpServer,并且设置路由。
@Slf4j
public class WorkerRequestHttpHandler extends AbstractVerticle {

    @Override
    public void start() throws Exception {

        Properties properties = PropertyUtils.getProperties();
        int port = Integer.parseInt(properties.getProperty(PowerJobServerConfigKey.HTTP_PORT, String.valueOf(OmsConstant.SERVER_DEFAULT_HTTP_PORT)));

        HttpServerOptions options = new HttpServerOptions();
        HttpServer server = vertx.createHttpServer(options);

        Router router = Router.router(vertx);
        router.route().handler(BodyHandler.create());
        router.post(ProtocolConstant.SERVER_PATH_HEARTBEAT)
                .handler(ctx -> {
                    WorkerHeartbeat heartbeat = ctx.getBodyAsJson().mapTo(WorkerHeartbeat.class);
                    fetchWorkerRequestHandler().processWorkerHeartbeat(heartbeat);
                    success(ctx);
                });
        router.post(ProtocolConstant.SERVER_PATH_STATUS_REPORT)
                .blockingHandler(ctx -> {
                    TaskTrackerReportInstanceStatusReq req = ctx.getBodyAsJson().mapTo(TaskTrackerReportInstanceStatusReq.class);
                    try {
                        fetchWorkerRequestHandler().processTaskTrackerReportInstanceStatus(req);
                        out(ctx, AskResponse.succeed(null));
                    } catch (Exception e) {
                        log.error("[WorkerRequestHttpHandler] update instance status failed for request: {}.", req, e);
                        out(ctx, AskResponse.failed(ExceptionUtils.getMessage(e)));
                    }
                });
        router.post(ProtocolConstant.SERVER_PATH_LOG_REPORT)
                .blockingHandler(ctx -> {
                    WorkerLogReportReq req = ctx.getBodyAsJson().mapTo(WorkerLogReportReq.class);
                    fetchWorkerRequestHandler().processWorkerLogReport(req);
                    success(ctx);
                });
        server.requestHandler(router).listen(port);
    }

    private static void out(RoutingContext ctx, Object msg) {
        ctx.response()
                .putHeader(OmsConstant.HTTP_HEADER_CONTENT_TYPE, OmsConstant.JSON_MEDIA_TYPE)
                .end(JsonObject.mapFrom(msg).encode());
    }

    private static void success(RoutingContext ctx) {
        out(ctx, ResultDTO.success(null));
    }
}

客户端启动

  1. PowerJobWorker实现了InitializingBean,执行方法PowerJobWorker#init。该方法中会连接服务器
ServerDiscoveryService serverDiscoveryService = new ServerDiscoveryService(workerRuntime.getAppId(), workerRuntime.getWorkerConfig());
            serverDiscoveryService.start(timingPool);
  1. 启动服务,ServerDiscoveryService#start。进行了服务的发现,在ServerDiscoveryService#acquire方法中调用http://127.0.0.1:7700/server/acquire?appId=2&currentServer=null&protocol=AKKA,找到服务器地址10.132.17.10:10086
  2. 客户端会也会启动akka服务,加载oms-worker.akka.conf的配置,设置端口号27777

客户端-服务发现

  1. 客户端发起http://127.0.0.1:7700/server/acquire?appId=2&currentServer=null&protocol=AKKA接口获取当前akka的server的地址。
  2. 服务端响应,ServerController#acquireServer。服务端设置Ping请求信息,访问Ping接口,path地址:akka://oms-server@10.132.17.10:10086/user/friend_actor,调用成功则说明地址是ok的。
    private String activeAddress(String serverAddress, Set<String> downServerCache, String protocol) {

        if (downServerCache.contains(serverAddress)) {
            return null;
        }
        if (StringUtils.isEmpty(serverAddress)) {
            return null;
        }

        Ping ping = new Ping();
        ping.setCurrentTime(System.currentTimeMillis());

        ActorSelection serverActor = AkkaStarter.getFriendActor(serverAddress);
        try {
            CompletionStage<Object> askCS = Patterns.ask(serverActor, ping, Duration.ofMillis(PING_TIMEOUT_MS));
            AskResponse response = (AskResponse) askCS.toCompletableFuture().get(PING_TIMEOUT_MS, TimeUnit.MILLISECONDS);
            downServerCache.remove(serverAddress);
            if (response.isSuccess()) {
                return JsonUtils.parseObject(response.getData(), JSONObject.class).getString(protocol);
            }
        }catch (Exception e) {
            log.warn("[ServerElection] server({}) was down.", serverAddress);
        }
        downServerCache.add(serverAddress);
        return null;
    }
    public static ActorSelection getFriendActor(String address) {
        String path = String.format(AKKA_PATH, RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, address, RemoteConstant.SERVER_FRIEND_ACTOR_NAME);
        return actorSystem.actorSelection(path);
    }
  1. FriendRequestHandler#onReceivePing会处理传到FriendRequestHandler里面的Ping请求。
    private void onReceivePing(Ping ping) {
        getSender().tell(AskResponse.succeed(TransportService.getAllAddress()), getSelf());
    }

服务端-执行任务

  1. 入口是JobController#runImmediately,先过切面DesignateServerAspect#execute,执行InstanceService#create进行任务实例创建。经过切面UseCacheLockAspect#execute进行分发DispatchService#dispatch
  2. 如果可以找到合适的worker线程,则构造请求实体,发送请求;如果找不到worker,则InstanceManager#processFinishedInstance,完成或者失败,都需要一些处理。比如日志打印,告警。
        // 获取当前最合适的 worker 列表
        List<WorkerInfo> suitableWorkers = workerClusterQueryService.getSuitableWorkers(jobInfo);

        if (CollectionUtils.isEmpty(suitableWorkers)) {
            log.warn("[Dispatcher-{}|{}] cancel dispatch job due to no worker available", jobId, instanceId);
            instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), current, current, RemoteConstant.EMPTY_ADDRESS, SystemInstanceResult.NO_WORKER_AVAILABLE, now);

            instanceManager.processFinishedInstance(instanceId, instanceInfo.getWfInstanceId(), FAILED, SystemInstanceResult.NO_WORKER_AVAILABLE);
            return;
        }
		List<String> workerIpList = suitableWorkers.stream().map(WorkerInfo::getAddress).collect(Collectors.toList());

        // 构造任务调度请求
        ServerScheduleJobReq req = constructServerScheduleJobReq(jobInfo, instanceInfo, workerIpList);

        // 发送请求(不可靠,需要一个后台线程定期轮询状态)
        WorkerInfo taskTracker = suitableWorkers.get(0);
        String taskTrackerAddress = taskTracker.getAddress();

        transportService.tell(Protocol.of(taskTracker.getProtocol()), taskTrackerAddress, req);
        log.info("[Dispatcher-{}|{}] send schedule request to TaskTracker[protocol:{},address:{}] successfully: {}.", jobId, instanceId, taskTracker.getProtocol(), taskTrackerAddress, req);

        // 修改状态
        instanceInfoRepository.update4TriggerSucceed(instanceId, WAITING_WORKER_RECEIVE.getV(), current, taskTrackerAddress, now);

        // 装载缓存
        instanceMetadataService.loadJobInfo(instanceId, jobInfo);
  1. 因为是akka协议,所以用到了AkkaTransporter。拼接客户端地址akka://oms@10.132.17.10:27777/user/worker,调用服务。构造的对象是ServerScheduleJobReq
    public void tell(String address, PowerSerializable object) {
        ActorSelection taskTrackerActor = AkkaStarter.getWorkerActor(address);
        taskTrackerActor.tell(object, null);
    }
  1. 客户端接受到请求,TaskTrackerActor#onReceiveServerScheduleJobReq,会初始化CommonTaskTracker#initTaskTracker,开启定时任务执行Dispatcher。分发任务,会TaskTracker#dispatchTask,用到了akka框架,发送TaskTrackerStartTaskReq对象,用的处理器是processor_tracker
  2. ProcessorTrackerActor#onReceiveTaskTrackerStartTaskReq,会将任务信息封装成ProcessorRunnable,找到对应的BasicProcessor进行处理。
 ClassLoader classLoader = omsContainer == null ? getClass().getClassLoader() : omsContainer.getContainerClassLoader();
        ProcessorRunnable processorRunnable = new ProcessorRunnable(instanceInfo, taskTrackerActorRef, newTask, processor, omsLogger, classLoader, statusReportRetryQueue, workerRuntime);
        try {
            threadPool.submit(processorRunnable);
            success = true;
        } catch (RejectedExecutionException ignore) {
            log.warn("[ProcessorTracker-{}] submit task(taskId={},taskName={}) to ThreadPool failed due to ThreadPool has too much task waiting to process, this task will dispatch to other ProcessorTracker.",
                    instanceId, newTask.getTaskId(), newTask.getTaskName());
        } catch (Exception e) {
            log.error("[ProcessorTracker-{}] submit task(taskId={},taskName={}) to ThreadPool failed.", instanceId, newTask.getTaskId(), newTask.getTaskName(), e);
        }
  1. 调用成功之后,会发送ProcessorReportTaskStatusReq请求。
        // 2. 回复接收成功
        if (success) {
            ProcessorReportTaskStatusReq reportReq = new ProcessorReportTaskStatusReq();
            reportReq.setInstanceId(instanceId);
            reportReq.setSubInstanceId(newTask.getSubInstanceId());
            reportReq.setTaskId(newTask.getTaskId());
            reportReq.setStatus(TaskStatus.WORKER_RECEIVED.getValue());
            reportReq.setReportTime(System.currentTimeMillis());

            taskTrackerActorRef.tell(reportReq, null);

            log.debug("[ProcessorTracker-{}] submit task(taskId={}, taskName={}) success, current queue size: {}.",
                    instanceId, newTask.getTaskId(), newTask.getTaskName(), threadPool.getQueue().size());
        }

客户端-发送心跳

  1. PowerJobWorker实现了InitializingBean,在init方法中会执行timingPool.scheduleAtFixedRate(new WorkerHealthReporter(workerRuntime), 0, 15, TimeUnit.SECONDS);。获取服务端的接口地址,akka://oms-server@192.168.1.5:10086/user/server_actor,发送心跳
        // 发送请求
        String serverPath = AkkaUtils.getServerActorPath(currentServer);
        if (StringUtils.isEmpty(serverPath)) {
            return;
        }
        ActorSelection actorSelection = workerRuntime.getActorSystem().actorSelection(serverPath);
        actorSelection.tell(heartbeat, null);
  1. 服务端AbWorkerRequestHandler#processWorkerHeartbeat,用来处理心跳请求。Server在接收到心跳信息后会进行状态的更新,ClusterStatusHolder#updateStatus
    public void updateStatus(WorkerHeartbeat heartbeat) {

        String workerAddress = heartbeat.getWorkerAddress();
        long heartbeatTime = heartbeat.getHeartbeatTime();

        WorkerInfo workerInfo = address2WorkerInfo.computeIfAbsent(workerAddress, ignore -> {
            WorkerInfo wf = new WorkerInfo();
            wf.refresh(heartbeat);
            return wf;
        });
        long oldTime = workerInfo.getLastActiveTime();
        if (heartbeatTime < oldTime) {
            log.warn("[ClusterStatusHolder-{}] receive the expired heartbeat from {}, serverTime: {}, heartTime: {}", appName, heartbeat.getWorkerAddress(), System.currentTimeMillis(), heartbeat.getHeartbeatTime());
            return;
        }

        workerInfo.refresh(heartbeat);

        List<DeployedContainerInfo> containerInfos = heartbeat.getContainerInfos();
        if (!CollectionUtils.isEmpty(containerInfos)) {
            containerInfos.forEach(containerInfo -> {
                Map<String, DeployedContainerInfo> infos = containerId2Infos.computeIfAbsent(containerInfo.getContainerId(), ignore -> Maps.newConcurrentMap());
                infos.put(workerAddress, containerInfo);
            });
        }
    }

总结

  1. akka是一个说是比较简单的框架,但是是scala写的,不同版本差异很大,不熟悉scala的很难用的好。
  2. vertx是比较简单的框架。入门可以参考这篇博客:https://blog.csdn.net/qq_42985872/article/details/128494611
  3. 目前只整理了如上5个功能的源码,其实我对任务调度框架的理解,就是服务端告诉客户端你该执行了。作者还是做了很多检查任务状态的处理,保证了任务可以顺利执行。更加全面,也更加复杂了。

有关【开源项目】任务调度框架PowerJob介绍及源码解析的更多相关文章

  1. Ruby 解析字符串 - 2

    我有一个字符串input="maybe(thisis|thatwas)some((nice|ugly)(day|night)|(strange(weather|time)))"Ruby中解析该字符串的最佳方法是什么?我的意思是脚本应该能够像这样构建句子:maybethisissomeuglynightmaybethatwassomenicenightmaybethiswassomestrangetime等等,你明白了......我应该一个字符一个字符地读取字符串并构建一个带有堆栈的状态机来存储括号值以供以后计算,还是有更好的方法?也许为此目的准备了一个开箱即用的库?

  2. ruby - 其他文件中的 Rake 任务 - 2

    我试图在一个项目中使用rake,如果我把所有东西都放到Rakefile中,它会很大并且很难读取/找到东西,所以我试着将每个命名空间放在lib/rake中它自己的文件中,我添加了这个到我的rake文件的顶部:Dir['#{File.dirname(__FILE__)}/lib/rake/*.rake'].map{|f|requiref}它加载文件没问题,但没有任务。我现在只有一个.rake文件作为测试,名为“servers.rake”,它看起来像这样:namespace:serverdotask:testdoputs"test"endend所以当我运行rakeserver:testid时

  3. ruby - 如何在 buildr 项目中使用 Ruby 代码? - 2

    如何在buildr项目中使用Ruby?我在很多不同的项目中使用过Ruby、JRuby、Java和Clojure。我目前正在使用我的标准Ruby开发一个模拟应用程序,我想尝试使用Clojure后端(我确实喜欢功能代码)以及JRubygui和测试套件。我还可以看到在未来的不同项目中使用Scala作为后端。我想我要为我的项目尝试一下buildr(http://buildr.apache.org/),但我注意到buildr似乎没有设置为在项目中使用JRuby代码本身!这看起来有点傻,因为该工具旨在统一通用的JVM语言并且是在ruby中构建的。除了将输出的jar包含在一个独特的、仅限ruby​​

  4. ruby - 解析 RDFa、微数据等的最佳方式是什么,使用统一的模式/词汇(例如 schema.org)存储和显示信息 - 2

    我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i

  5. ruby - 用逗号、双引号和编码解析 csv - 2

    我正在使用ruby​​1.9解析以下带有MacRoman字符的csv文件#encoding:ISO-8859-1#csv_parse.csvName,main-dialogue"Marceu","Giveittohimóhe,hiswife."我做了以下解析。require'csv'input_string=File.read("../csv_parse.rb").force_encoding("ISO-8859-1").encode("UTF-8")#=>"Name,main-dialogue\r\n\"Marceu\",\"Giveittohim\x97he,hiswife.\"\

  6. ruby - 如何使用 RSpec::Core::RakeTask 创建 RSpec Rake 任务? - 2

    如何使用RSpec::Core::RakeTask初始化RSpecRake任务?require'rspec/core/rake_task'RSpec::Core::RakeTask.newdo|t|#whatdoIputinhere?endInitialize函数记录在http://rubydoc.info/github/rspec/rspec-core/RSpec/Core/RakeTask#initialize-instance_method没有很好的记录;它只是说:-(RakeTask)initialize(*args,&task_block)AnewinstanceofRake

  7. ruby-on-rails - 项目升级后 Pow 不会更改 ruby​​ 版本 - 2

    我在我的Rails项目中使用Pow和powifygem。现在我尝试升级我的ruby​​版本(从1.9.3到2.0.0,我使用RVM)当我切换ruby​​版本、安装所有gem依赖项时,我通过运行railss并访问localhost:3000确保该应用程序正常运行以前,我通过使用pow访问http://my_app.dev来浏览我的应用程序。升级后,由于错误Bundler::RubyVersionMismatch:YourRubyversionis1.9.3,butyourGemfilespecified2.0.0,此url不起作用我尝试过的:重新创建pow应用程序重启pow服务器更新战俘

  8. ruby-on-rails - 新 Rails 项目 : 'bundle install' can't install rails in gemfile - 2

    我已经像这样安装了一个新的Rails项目:$railsnewsite它执行并到达:bundleinstall但是当它似乎尝试安装依赖项时我得到了这个错误Gem::Ext::BuildError:ERROR:Failedtobuildgemnativeextension./System/Library/Frameworks/Ruby.framework/Versions/2.0/usr/bin/rubyextconf.rbcheckingforlibkern/OSAtomic.h...yescreatingMakefilemake"DESTDIR="cleanmake"DESTDIR="

  9. Ruby 从大范围中获取第 n 个项目 - 2

    假设我有这个范围:("aaaaa".."zzzzz")如何在不事先/每次生成整个项目的情况下从范围中获取第N个项目? 最佳答案 一种快速简便的方法:("aaaaa".."zzzzz").first(42).last#==>"aaabp"如果出于某种原因你不得不一遍又一遍地这样做,或者如果你需要避免为前N个元素构建中间数组,你可以这样写:moduleEnumerabledefskip(n)returnto_enum:skip,nunlessblock_given?each_with_indexdo|item,index|yieldit

  10. ruby-on-rails - 我更新了 ruby​​ gems,现在到处都收到解析树错误和弃用警告! - 2

    简而言之错误:NOTE:Gem::SourceIndex#add_specisdeprecated,useSpecification.add_spec.Itwillberemovedonorafter2011-11-01.Gem::SourceIndex#add_speccalledfrom/opt/local/lib/ruby/site_ruby/1.8/rubygems/source_index.rb:91./opt/local/lib/ruby/gems/1.8/gems/rails-2.3.8/lib/rails/gem_dependency.rb:275:in`==':und

随机推荐