点亮 ⭐️ Star · 照亮开源之路
https://github.com/apache/dolphinscheduler

1 DolphinScheduler的设计与策略
1.1 分布式设计
1.1.1 中心化
1.1.2 去中心化
1.2 DophinScheduler架构设计
1.3 容错问题
1.3.1 宕机容错
1.3.2 失败重试
1.4 远程日志访问
2 DolphinScheduler源码分析
2.1 工程模块介绍与配置文件
2.1.1 工程模块介绍
2.1.2 配置文件
2.2 Api主要任务操作接口
2.3 Quaterz架构与运行流程
2.3.1 概念与架构
2.3.2 初始化与执行流程
2.3.3 集群运转
2.4 Master启动与执行流程
2.4.1 概念与执行逻辑
2.4.2 集群与槽(slot)
2.4.3 代码执行流程
2.5 Work启动与执行流程
2.5.1 概念与执行逻辑
2.5.2 代码执行流程
2.6 rpc交互
2.6.1 Master与Worker交互
2.6.2 其他服务与Master交互
2.7 负载均衡算法
2.7.1 加权随机
2.7.2 线性负载
2.7.3 平滑轮询
2.8 日志服务
2.9 报警
3 后记
3.1 Make friends
3.2 参考文献
研究Apache Dolphinscheduler也是机缘巧合,平时负责基于xxl-job二次开发出来的调度平台,因为遇到了并发性能瓶颈,到了不得不优化重构的地步,所以搜索市面上应用较广的调度平台以借鉴优化思路。
在阅读完DolphinScheduler代码之后,便生出了将其设计与思考记录下来的念头,这便是此篇文章的来源。因为没有正式生产使用,业务理解不一定透彻,理解可能有偏差,欢迎大家交流讨论。
大家能关注DolphinScheduler那么一定对调度系统有了一定的了解,对于调度所涉及的到一些专有名词在这里就不做过多的介绍,重点介绍一下流程定义,流程实例,任务定义,任务实例。(没有作业这个概念确实也很新奇,可能是不想和Quartz的JobDetail重叠)。
任务定义:各种类型的任务,是流程定义的关键组成,如sql,shell,spark,mr,python等;
任务实例:任务的实例化,标识着具体的任务执行状态;
流程定义:一组任务节点通过依赖关系建立的起来的有向无环图(DAG);
流程实例:通过手动或者定时调度生成的流程实例;
定时调度:系统采用Quartz 分布式调度器,并同时支持cron表达式可视化的生成;
分布式系统的架构设计基本分为中心化和去中心化两种,各有优劣,凭借各自的业务选择。
中心化设计比较简单,集群中的节点安装角色可以分为Master和Slave两种,如下图:

Master: Master的角色主要负责任务分发并监督Slave的健康状态,可以动态的将任务均衡到Slave上,以致Slave节点不至于“忙死”或”闲死”的状态。
中心化设计存在一些问题。
第一点,一旦Master出现了问题,则群龙无首,整个集群就会崩溃。
为了解决这个问题,大多数Master/Slave架构模式都采用了主备Master的设计方案,可以是热备或者冷备,也可以是自动切换或手动切换,而且越来越多的新系统都开始具备自动选举切换Master的能力,以提升系统的可用性。
第二点,如果Scheduler在Master上,虽然可以支持一个DAG中不同的任务运行在不同的机器上,但是会产生Master的过负载。如果Scheduler在Slave上,一个DAG中所有的任务都只能在某一台机器上进行作业提交,在并行任务比较多的时候,Slave的压力可能会比较大。
xxl-job就是采用这种设计方式,但是存在相应的问题。管理器(admin)宕机集群会崩溃,Scheduler在管理器上,管理器负责所有任务的校验和分发,管理器存在过载的风险,需要开发者想方案解决。

在去中心化设计里,通常没有Master/Slave的概念,所有的角色都是一样的,地位是平等的,去中心化设计的核心设计在于整个分布式系统中不存在一个区别于其他节点的“管理者”,因此不存在单点故障问题。
但由于不存在“管理者”节点所以每个节点都需要跟其他节点通信才得到必须要的机器信息,而分布式系统通信的不可靠性,则大大增加了上述功能的实现难度。实际上,真正去中心化的分布式系统并不多见。
反而动态中心化分布式系统正在不断涌出。在这种架构下,集群中的管理者是被动态选择出来的,而不是预置的,并且集群在发生故障的时候,集群的节点会自发的举行会议来选举新的管理者去主持工作。
一般都是基于Raft算法实现的选举策略。Raft算法,目前社区也有相应的PR,还没合并。
PR链接:https://github.com/apache/dolphinscheduler/issues/10874
动态展示见链接:http://thesecretlivesofdata.com/
DolphinScheduler的去中心化是Master/Worker注册到注册中心,实现Master集群和Worker集群无中心。
随手盗用一张官网的系统架构图,可以看到调度系统采用去中心化设计,由UI,API,MasterServer,Zookeeper,WorkServer,Alert等几部分组成。

API: API接口层,主要负责处理前端UI层的请求。该服务统一提供RESTful api向外部提供请求服务。接口包括工作流的创建、定义、查询、修改、发布、下线、手工启动、停止、暂停、恢复、从该节点开始执行等等。
MasterServer: MasterServer采用分布式无中心设计理念,MasterServer集成了Quartz,主要负责 DAG 任务切分、任务提交监控,并同时监听其它MasterServer和WorkerServer的健康状态。MasterServer服务启动时向Zookeeper注册临时节点,通过监听Zookeeper临时节点变化来进行容错处理。WorkServer:WorkerServer也采用分布式无中心设计理念,WorkerServer主要负责任务的执行和提供日志服务。WorkerServer服务启动时向Zookeeper注册临时节点,并维持心跳。
ZooKeeper: ZooKeeper服务,系统中的MasterServer和WorkerServer节点都通过ZooKeeper来进行集群管理和容错。另外系统还基于ZooKeeper进行事件监听和分布式锁。
**Alert:**提供告警相关接口,接口主要包括两种类型的告警数据的存储、查询和通知功能,支持丰富的告警插件自由拓展配置。
容错分为服务宕机容错和任务重试,服务宕机容错又分为Master容错和Worker容错两种情况;
服务容错设计依赖于ZooKeeper的Watcher机制,实现原理如图:

其中Master监控其他Master和Worker的目录,如果监听到remove事件,则会根据具体的业务逻辑进行流程实例容错或者任务实例容错,容错流程图相对官方文档里面的流程图,人性化了些,大家可以参考一下,具体如下所示。

ZooKeeper Master容错完成之后则重新由DolphinScheduler中Scheduler线程调度,遍历 DAG 找到“正在运行”和“提交成功”的任务,对“正在运行”的任务监控其任务实例的状态,对“提交成功”的任务需要判断Task Queue中是否已经存在,如果存在则同样监控任务实例的状态,如果不存在则重新提交任务实例。

Master Scheduler线程一旦发现任务实例为” 需要容错”状态,则接管任务并进行重新提交。注意由于” 网络抖动”可能会使得节点短时间内失去和ZooKeeper的心跳,从而发生节点的remove事件。
对于这种情况,我们使用最简单的方式,那就是节点一旦和ZooKeeper发生超时连接,则直接将Master或Worker服务停掉。
这里首先要区分任务失败重试、流程失败恢复、流程失败重跑的概念:
任务失败重试是任务级别的,是调度系统自动进行的,比如一个Shell任务设置重试次数为3次,那么在Shell任务运行失败后会自己再最多尝试运行3次。
流程失败恢复是流程级别的,是手动进行的,恢复是从只能从失败的节点开始执行或从当前节点开始执行。流程失败重跑也是流程级别的,是手动进行的,重跑是从开始节点进行。
接下来说正题,我们将工作流中的任务节点分了两种类型。
一种是业务节点,这种节点都对应一个实际的脚本或者处理语句,比如Shell节点、MR节点、Spark节点、依赖节点等。
还有一种是逻辑节点,这种节点不做实际的脚本或语句处理,只是整个流程流转的逻辑处理,比如子流程节等。
每一个业务节点都可以配置失败重试的次数,当该任务节点失败,会自动重试,直到成功或者超过配置的重试次数。逻辑节点不支持失败重试。但是逻辑节点里的任务支持重试。
如果工作流中有任务失败达到最大重试次数,工作流就会失败停止,失败的工作流可以手动进行重跑操作或者流程恢复操作。
由于Web(UI)和Worker不一定在同一台机器上,所以查看日志不能像查询本地文件那样。
有两种方案:
将日志放到ES搜索引擎上;
通过netty通信获取远程日志信息;
介于考虑到尽可能的DolphinScheduler的轻量级性,所以选择了RPC实现远程访问日志信息,具体代码的实践见2.8章节。
上一章的讲解可能初步看起来还不是很清晰,本章的主要目的是从代码层面一一介绍第一张讲解的功能。关于系统的安装在这里并不会涉及,安装运行请大家自行探索。
dolphinscheduler-alert 告警模块,提供告警服务;
dolphinscheduler-api web应用模块,提供 Rest Api 服务,供 UI 进行调用;
dolphinscheduler-common 通用的常量枚举、工具类、数据结构或者基类 dolphinscheduler-dao 提供数据库访问等操作;
dolphinscheduler-remote 基于netty的客户端、服务端 ;
dolphinscheduler-server 日志与心跳服务 ;
dolphinscheduler-log-server LoggerServer 用于Rest Api通过RPC查看日志;
dolphinscheduler-master MasterServer服务,主要负责 DAG 的切分和任务状态的监控 ;
dolphinscheduler-worker WorkerServer服务,主要负责任务的提交、执行和任务状态的更新;
dolphinscheduler-service service模块,包含Quartz、Zookeeper、日志客户端访问服务,便于server模块和api模块调用 ;
dolphinscheduler-ui 前端模块;
dolphinscheduler-common common.properties
dolphinscheduler-api application.yaml
dolphinscheduler-master application.yaml
dolphinscheduler-worker application.yaml
主要关注数据库,quartz, zookeeper, masker, worker配置。
其他业务接口可以不用关注,只需要关注最最主要的流程上线功能接口,此接口可以发散出所有的任务调度相关的代码。
接口:/dolphinscheduler/projects/{projectCode}/schedules/{id}/online;此接口会将定义的流程提交到Quartz调度框架;代码如下:
public Map<String, Object> setScheduleState(User loginUser, long projectCode, Integer id, ReleaseState scheduleStatus) { Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByCode(projectCode); // check project auth boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result); if (!hasProjectAndPerm) { return result; }
// check schedule exists Schedule scheduleObj = scheduleMapper.selectById(id);
if (scheduleObj == null) { putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, id); return result; } // check schedule release state if (scheduleObj.getReleaseState() == scheduleStatus) { logger.info("schedule release is already {},needn't to change schedule id: {} from {} to {}", scheduleObj.getReleaseState(), scheduleObj.getId(), scheduleObj.getReleaseState(), scheduleStatus); putMsg(result, Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, scheduleStatus); return result; } ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(scheduleObj.getProcessDefinitionCode()); if (processDefinition == null || projectCode != processDefinition.getProjectCode()) { putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(scheduleObj.getProcessDefinitionCode())); return result; } List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, scheduleObj.getProcessDefinitionCode()); if (processTaskRelations.isEmpty()) { putMsg(result, Status.PROCESS_DAG_IS_EMPTY); return result; } if (scheduleStatus == ReleaseState.ONLINE) { // check process definition release state if (processDefinition.getReleaseState() != ReleaseState.ONLINE) { logger.info("not release process definition id: {} , name : {}", processDefinition.getId(), processDefinition.getName()); putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName()); return result; } // check sub process definition release state List<Long> subProcessDefineCodes = new ArrayList<>(); processService.recurseFindSubProcess(processDefinition.getCode(), subProcessDefineCodes); if (!subProcessDefineCodes.isEmpty()) { List<ProcessDefinition> subProcessDefinitionList = processDefinitionMapper.queryByCodes(subProcessDefineCodes); if (subProcessDefinitionList != null && !subProcessDefinitionList.isEmpty()) { for (ProcessDefinition subProcessDefinition : subProcessDefinitionList) { /** * if there is no online process, exit directly */ if (subProcessDefinition.getReleaseState() != ReleaseState.ONLINE) { logger.info("not release process definition id: {} , name : {}", subProcessDefinition.getId(), subProcessDefinition.getName()); putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, String.valueOf(subProcessDefinition.getId())); return result; } } } } }
// check master server exists List<Server> masterServers = monitorService.getServerListFromRegistry(true);
if (masterServers.isEmpty()) { putMsg(result, Status.MASTER_NOT_EXISTS); return result; }
// set status scheduleObj.setReleaseState(scheduleStatus);
scheduleMapper.updateById(scheduleObj);
try { switch (scheduleStatus) { case ONLINE: logger.info("Call master client set schedule online, project id: {}, flow id: {},host: {}", project.getId(), processDefinition.getId(), masterServers); setSchedule(project.getId(), scheduleObj); break; case OFFLINE: logger.info("Call master client set schedule offline, project id: {}, flow id: {},host: {}", project.getId(), processDefinition.getId(), masterServers); deleteSchedule(project.getId(), id); break; default: putMsg(result, Status.SCHEDULE_STATUS_UNKNOWN, scheduleStatus.toString()); return result; } } catch (Exception e) { result.put(Constants.MSG, scheduleStatus == ReleaseState.ONLINE ? "set online failure" : "set offline failure"); throw new ServiceException(result.get(Constants.MSG).toString(), e); }
putMsg(result, Status.SUCCESS); return result; }
Quartz 框架主要包括如下几个部分:
SchedulerFactory:任务调度工厂,主要负责管理任务调度器;
Scheduler :任务调度器,主要负责任务调度,以及操作任务的相关接口;
Job :任务接口,实现类包含具体任务业务代码;
JobDetail:用于定义作业的实例;
Trigger:任务触发器,主要存放 Job 执行的时间策略。例如多久执行一次,什么时候执行,以什么频率执行等等;
JobBuilder :用于定义/构建 JobDetail 实例,用于定义作业的实例。
TriggerBuilder :用于定义/构建触发器实例;
Calendar:Trigger 扩展对象,可以排除或者包含某个指定的时间点(如排除法定节假日);
JobStore:存储作业和任务调度期间的状态Scheduler的生命期,从 SchedulerFactory 创建它时开始,到 Scheduler 调用Shutdown() 方法时结束;
Scheduler 被创建后,可以增加、删除和列举 Job 和 Trigger,以及执行其它与调度相关的操作(如暂停 Trigger)。但Scheduler 只有在调用 start() 方法后,才会真正地触发 trigger(即执行 job)
Quartz的基本原理就是通过Scheduler来调度被JobDetail和Trigger定义的安装Job接口规范实现的自定义任务业务对象,来完成任务的调度。基本逻辑如下图:

代码时序图如下:

基本内容就是初始化任务调度容器Scheduler,以及容器所需的线程池,数据交互对象JobStore,任务处理线程QuartzSchedulerThread用来处理Job接口的具体业务实现类。
DolphinScheduler的业务类是ProcessScheduleJob,主要功能就是根据调度信息往commond表中写数据。
需要注意的事:
当Quartz采用集群形式部署的时候,存储介质不能使用内存的形式,也就是不能使用JobStoreRAM。
Quartz集群对于对于需要被调度的Triggers实例的扫描是使用数据库锁TRIGGER_ACCESS来完成的,保障此扫描过程只能被一个Quartz实例获取到。代码如下:
public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow) throws JobPersistenceException { String lockName; if(isAcquireTriggersWithinLock() || maxCount > 1) { lockName = LOCK_TRIGGER_ACCESS; } else { lockName = null; } return executeInNonManagedTXLock(lockName, new TransactionCallback<List<OperableTrigger>>() { public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException { return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow); } }, new TransactionValidator<List<OperableTrigger>>() { public Boolean validate(Connection conn, List<OperableTrigger> result) throws JobPersistenceException { try { List<FiredTriggerRecord> acquired = getDelegate().selectInstancesFiredTriggerRecords(conn, getInstanceId()); Set<String> fireInstanceIds = new HashSet<String>(); for (FiredTriggerRecord ft : acquired) { fireInstanceIds.add(ft.getFireInstanceId()); } for (OperableTrigger tr : result) { if (fireInstanceIds.contains(tr.getFireInstanceId())) { return true; } } return false; } catch (SQLException e) { throw new JobPersistenceException("error validating trigger acquisition", e); } } }); }
3.集群失败实例恢复需要注意的是各个实例恢复各自实例对应的异常实例,因为数据库有调度容器的instanceId信息。代码如下:

关键概念:
Quartz相关:
Scheduler(任务调度容器,一般都是StdScheduler实例)。
ProcessScheduleJob:(实现Quarts调度框架的Job接口的业务类,专门生成DolphinScheduler数据库业务表t_ds_commond数据);
DolphinScheduler相关:
NettyRemotingServer(netty服务端,包含netty服务端serverBootstrap对象与netty服务端业务处理对象serverHandler), NettyServerHandler:(netty服务端业务处理类:包含各类处理器以及处理器对应的执行线程池);
TaskPluginManager(任务插件管理器,不同类型的任务以插件的形式管理,在应用服务启动的时候,通过@AutoService加载实现了TaskChannelFactory接口的工厂信息到数据库,通过工厂对象来加载各类TaskChannel实现类到缓存);
MasterRegistryClient(master操作zk的客户端,封装了master对于zk的所有操作,注册,查询,删除等);
MasterSchedulerService(扫描服务,包含业务执行线程和work包含的nettyhe护短,负责任务调度业务,slot来控制集群模式下任务不被重复调度,底层实现是zookeeper分布式锁);
WorkflowExecuteThread(真正的业务处理线程,通过插槽获取命令commond,执行之前会校验slot的变化,如果变化不执行,关键功能就是构建任务相关的参数,定义,优先级等,然后发送到队列,供队列处理线程消费);
CommonTaskProcessor(普通任务处理器,实现ITaskProcessor接口,根据业务分为普通,依赖,子任务,阻塞,条件任务类型,包含了任务的提交,运行,分发,杀死等业务,通过@AutoService加载的类,根本就是封装了对);
TaskPriorityQueueImpl(任务队列,负责任务队列的存储控制);
TaskPriorityQueueConsumer(任务队列消费线程,负责任务的根据负载均衡策略在worker之间分发与执行);
ServerNodeManager (节点信息控制器,负责节点注册信息更新与槽位(slot)变更,底层实现是zookeeper分布式锁的应用);
EventExecuteService(事件处理线程,通过缓存起来的任务处理线程,处理每个任务在处理过程中注册在线程事件队列中的事件);
FailoverExecuteThread(故障转移线程,包含Master和worker的);
MasterRegistryDataListener(托管在zk管理框架cautor的故障监听器,负责对worker和master注册在zk上的节点的新增和删除)。
主节点容错代码如下,业务解释见1.5.1Master容错解释:
其实这里的采用Zookeer分布式锁准确也不准确,为什么这么说,因为Slot是CommondId对Master列表长度取模来计算的,而Master列表长度的刷新是Zookeeper分布式锁来控制,Master节点的调度数据扫描是通过Slot来控制的。
具体代码如下:
Slot刷新
Slot应用

代码过于繁琐,此处不再一一粘贴代码解释各个类的功能,自行看代码更加清晰。
NettyRemotingServer(worker包含的netty服务端) WorkerRegistryClient(zk客户端,封装了worker与zk相关的操作,注册,查询,删除等) ;
TaskPluginManager(任务插件管理器,封装了插件加载逻辑和任务实际执行业务的抽象) ;
WorkerManagerThread(任务工作线程生成器,消费netty处理器推进队列的任务信息,并生成任务执行线程提交线程池管理) ;
TaskExecuteProcessor(Netty任务执行处理器,生成master分发到work的任务信息,并推送到队列) ;
TaskExecuteThread(任务执行线程) ;
TaskCallbackService(任务回调线程,与master包含的netty client通信);
AbstractTask(任务实际业务的抽象类,子类包含实际的任务执行业务,SqlTask,DataXTask等) ;
RetryReportTaskStatusThread(不关注)
Worker节点代码时序图如下:

代码过于繁琐,此处不再一一粘贴代码解释各个类的功能,自行看代码更加清晰。
因为节点和应用服务之间的RPC通信都是基于Netty实现的,Netty相关知识不在这里过多的讲解,当前章节只涉及Master与Worker之间的交互模式的设计与实现。
整体设计如下

Master与worker之间的业务逻辑的交互是基于Netty服务端与客户端来实现Rpc通信的,Master和Worker启动的时候会将自己的Netty服务端信息注册到ZK相应的节点上,Master的任务分发线程和任务杀死等业务运行时,拉取ZK上的Worker节点信息,根据负载均衡策略选择一个节点(下章介绍负载均衡),构建Netty客户端与Worker的Netty服务端通信,Worker收到Master的RPC请求之后会缓存Channel信息并处理对应业务,同时Callback回调线程会获取缓存的通道来执行回调操作,这样就形成的闭环。
任务的执行杀死,以及回调状态处理等操作都是通过Netty客户端与服务端绑定的Processer处理器来进行的。
Master部分具体代码如下:
Master启动的时候会初始化Nettyserver,注册对应的请求处理器到NettyHandler并启动:
Master的NettyExecutorManager初始化的时候会将NettyRemotingClient也初始化,并且会注册处理Worker回调请求的处理器,真正的端口绑定是在获取到执行器端口之后:
任务分发代码如下:
Worker部分具体代码如下:
同理Woker在启动的时候会初始化NettyServer,注册对应处理器并启动:
回调线程对象初始化的时候,会将包含的Nettyremotingclient一起初始化,并注册好对应的业务处理器:
回调线程会通过其他执行器中缓存下来的Chanel与Master的客户端进行通信:
以日志服务为例,前端触发请求日志的接口,通过参数与数据库交互获取到Master的NettyServer信息,然后构建Netty客户端与Master进行通信获取日志并返回。具体代码如下
Nettyclient随着日志业务对象初始化而初始化:
Master在选择执行器的时候DolphinScheduler提供了三种负载均衡算法,且所有的算法都用到了节点权重:加权随机(random),平滑轮询(roundrobin),线性负载(lowerweight)。通过配置文件来控制到底使用哪一个负载均衡策略,默认配置是权重策略:host-selector: lower_weight。
看代码更好理解:按照全部权重值求和,然后取汇总结果的随机整数,随机整数对原先所有host的权重累差,返回小于零的时候的host,没有就随机返回一个。
权重计算逻辑:利用注册的Cpu占用、内存占用以及加载因子还有启动时间消耗做计算。
获取权重最小的节点,并把节点权重置为最大。
这个算法不是很好的能够理解,所以我不知道我的理解是否正确,它有一个预热的过程,之前都是取第一个,等到累计的权重超过最大就整数就开始按权重轮询。
2.6.2已经介绍不在做过多的说明。
暂未研究,目测基本就是根据规则筛选数据,然后调用指定类型的报警服务接口做报警操作,比如邮件,微信,短信通知等。
因为没有正式生产使用,业务理解不一定透彻,理解可能有偏差,欢迎大家一起进入社区交流讨论。
Apache DolphinScheduler Slack群链接:https://join.slack.com/t/asf-dolphinscheduler/shared_invite/zt-1e36toy4n-5n9U2R__FDM05R~MJFFVBg
https://dolphinscheduler.apache.org/zh-cn/development/architecture-design.html;
https://www.w3cschool.cn/quartz_doc/quartz_doc-1xbu2clr.html.
最后,感谢社区蔡顺峰、钟嘉杰和阮文俊对本文整理和修改提出建设性意见,以及对本文发布提供的帮助。

非常欢迎大家加入 DolphinScheduler 大家庭,融入开源世界!
我们鼓励任何形式的参与社区,最终成为 Committer 或 PPMC,如:
将遇到的问题通过 GitHub 上 issue 的形式反馈出来。
回答别人遇到的 issue 问题。
帮助完善文档。
帮助项目增加测试用例。
为代码添加注释。
提交修复 Bug 或者 Feature 的 PR。
发表应用案例实践、调度流程分析或者与调度相关的技术文章。
帮助推广 DolphinScheduler,参与技术大会或者 meetup 的分享等。
欢迎加入贡献的队伍,加入开源从提交第一个 PR 开始。
注:贡献不仅仅限于 PR 哈,对促进项目发展的都是贡献。
相信参与 DolphinScheduler,一定会让您从开源中受益!
随着国内开源的迅猛崛起,Apache DolphinScheduler 社区迎来蓬勃发展,为了做更好用、易用的调度,真诚欢迎热爱开源的伙伴加入到开源社区中来,为中国开源崛起献上一份自己的力量,让本土开源走向全球。
参与 DolphinScheduler 社区有非常多的参与贡献的方式,包括:
贡献第一个PR(文档、代码) 我们也希望是简单的,第一个PR用于熟悉提交的流程和社区协作以及感受社区的友好度。
社区汇总了以下适合新手的问题列表:https://github.com/apache/dolphinscheduler/issues/5689
非新手问题列表:https://github.com/apache/dolphinscheduler/issues?q=is%3Aopen+is%3Aissue+label%3A"volunteer+wanted"
如何参与贡献链接:https://dolphinscheduler.apache.org/zh-cn/community/development/contribute.html
来吧,DolphinScheduler开源社区需要您的参与,为中国开源崛起添砖加瓦吧,哪怕只是小小的一块瓦,汇聚起来的力量也是巨大的。
参与开源可以近距离与各路高手切磋,迅速提升自己的技能,如果您想参与贡献,我们有个贡献者种子孵化群,可以添加社区小助手,手把手教会您( 贡献者不分水平高低,有问必答,关键是有一颗愿意贡献的心 )。
添加小助手微信时请说明想参与贡献。来吧,开源社区非常期待您的参与。
我有一个字符串input="maybe(thisis|thatwas)some((nice|ugly)(day|night)|(strange(weather|time)))"Ruby中解析该字符串的最佳方法是什么?我的意思是脚本应该能够像这样构建句子:maybethisissomeuglynightmaybethatwassomenicenightmaybethiswassomestrangetime等等,你明白了......我应该一个字符一个字符地读取字符串并构建一个带有堆栈的状态机来存储括号值以供以后计算,还是有更好的方法?也许为此目的准备了一个开箱即用的库?
我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i
我正在使用ruby1.9解析以下带有MacRoman字符的csv文件#encoding:ISO-8859-1#csv_parse.csvName,main-dialogue"Marceu","Giveittohimóhe,hiswife."我做了以下解析。require'csv'input_string=File.read("../csv_parse.rb").force_encoding("ISO-8859-1").encode("UTF-8")#=>"Name,main-dialogue\r\n\"Marceu\",\"Giveittohim\x97he,hiswife.\"\
简而言之错误:NOTE:Gem::SourceIndex#add_specisdeprecated,useSpecification.add_spec.Itwillberemovedonorafter2011-11-01.Gem::SourceIndex#add_speccalledfrom/opt/local/lib/ruby/site_ruby/1.8/rubygems/source_index.rb:91./opt/local/lib/ruby/gems/1.8/gems/rails-2.3.8/lib/rails/gem_dependency.rb:275:in`==':und
一、引擎主循环UE版本:4.27一、引擎主循环的位置:Launch.cpp:GuardedMain函数二、、GuardedMain函数执行逻辑:1、EnginePreInit:加载大多数模块int32ErrorLevel=EnginePreInit(CmdLine);PreInit模块加载顺序:模块加载过程:(1)注册模块中定义的UObject,同时为每个类构造一个类默认对象(CDO,记录类的默认状态,作为模板用于子类实例创建)(2)调用模块的StartUpModule方法2、FEngineLoop::Init()1、检查Engine的配置文件找出使用了哪一个GameEngine类(UGame
我正在使用ruby2.1.0我有一个json文件。例如:test.json{"item":[{"apple":1},{"banana":2}]}用YAML.load加载这个文件安全吗?YAML.load(File.read('test.json'))我正在尝试加载一个json或yaml格式的文件。 最佳答案 YAML可以加载JSONYAML.load('{"something":"test","other":4}')=>{"something"=>"test","other"=>4}JSON将无法加载YAML。JSON.load("
我想用Nokogiri解析HTML页面。页面的一部分有一个表,它没有使用任何特定的ID。是否可以提取如下内容:Today,3,455,34Today,1,1300,3664Today,10,100000,3444,Yesterday,3454,5656,3Yesterday,3545,1000,10Yesterday,3411,36223,15来自这个HTML:TodayYesterdayQntySizeLengthLengthSizeQnty345534345456563113003664354510001010100000344434113622315
我使用的第一个解析器生成器是Parse::RecDescent,它的指南/教程很棒,但它最有用的功能是它的调试工具,特别是tracing功能(通过将$RD_TRACE设置为1来激活)。我正在寻找可以帮助您调试其规则的解析器生成器。问题是,它必须用python或ruby编写,并且具有详细模式/跟踪模式或非常有用的调试技术。有人知道这样的解析器生成器吗?编辑:当我说调试时,我并不是指调试python或ruby。我指的是调试解析器生成器,查看它在每一步都在做什么,查看它正在读取的每个字符,它试图匹配的规则。希望你明白这一点。赏金编辑:要赢得赏金,请展示一个解析器生成器框架,并说明它的
我有这样的HTML代码:Label1Value1Label2Value2...我的代码不起作用。doc.css("first").eachdo|item|label=item.css("dt")value=item.css("dd")end显示所有首先标记,然后标记标签,我需要“标签:值” 最佳答案 首先,您的HTML应该有和中的元素:Label1Value1Label2Value2...但这不会改变您解析它的方式。你想找到s并遍历它们,然后在每个你可以使用next_element得到;像这样:doc=Nokogiri::HTML(
我想禁用HTTP参数的自动XML解析。但我发现命令仅适用于Rails2.x,它们都不适用于3.0:config.action_controller.param_parsers.deleteMime::XML(application.rb)ActionController::Base.param_parsers.deleteMime::XMLRails3.0中的等价物是什么? 最佳答案 根据CVE-2013-0156的最新安全公告你可以将它用于Rails3.0。3.1和3.2ActionDispatch::ParamsParser::