草庐IT

airflow+k8s 多用户-分布式-跨集群-容器化调度

wx636261b2d66e0 2023-03-28 原文

对于考虑使用拖拉拽编排使用云原生调度的可以参考
​​​https://github.com/tencentmusic/argo-workflow​

全栈工程师开发手册 (作者:栾鹏)
架构系列文章


最开始采用airflow+k8s分布式容器化调度的方案主要是为了解决下面的问题:

1、特有环境/特有脚本调度的调度问题

2、大数据量任务或大算力任务节点故障和调度管理问题

目前已经基于airflow+k8s改造成多用户-分布式-跨集群-容器化调度的平台。

airflow官网:https://airflow.apache.org/docs/stable/

airflow介绍

airflow 是一个编排、调度和监控workflow的平台,由Airbnb开源,现在在Apache Software Foundation 孵化。airflow 将workflow编排为tasks组成的DAGs,调度器在一组workers上按照指定的依赖关系执行tasks。同时,airflow 提供了丰富的命令行工具和简单易用的用户界面以便用户查看和操作,并且airflow提供了监控和报警系统。

airflow源码改造,镜像封装

airflow的官方git在https://github.com/apache/airflow,而源码部分在airflow/airflow/目录下面,对应我们二次开源的版本地址在airflow/docker/mine/airflow-1.10.9/airflow 目录下面。前后端代码在www和www_rbac下面。

为了能方便的开源改造后部署,我们使用源码安装的方法部署airflow:

源码安装的方法比较简单,先使用pip安装一遍airflow,是为了使用pip将airflow的依赖包都安装上,再将源码目录复制进去,并且设置PYTHONPATH环境变量添加源码的目录,这样就可以使用我们的airflow库进行启动了。

RUN pip install apache-airflow==1.10.9
COPY airflow-1.10.9 /home/airflow/airflow
ENV PYTHONPATH $PYTHONPATH:/home/airflow/airflow
镜像构建分装分为两层:为避免频繁修改代码打包比较麻烦,所以先统一封装好环境镜像,再在环境镜像的基础上封装airflow代码。

airflow web改造

airflow rbac版本是基于flask appbuilder改造的,需要改造的可以先学习一下flask appbuilder。我们基于airflow1.10.9版本,对airflow web进行了部分改造。这样方便更多的用户进行使用。注意:不同版本的airflow,dag文件的语法不太一样。我们使用的airflow为1.10.9版本。在web上我们主要增加了如下的内容:

1、ioa登录认证
2、rbac鉴权
3、在线上传/删除dag文件
4、修复时区问题
5、插件化开发部署

有了多用户就可以针对每个人不同的权限来控制对调度任务的管理,实现多用户权限隔离。

插件化开发

了解airflow的同学应该知道,要想让airflow识别我们的pipeline,需要我们自己编写dag文件,是一个py文件。大体的结构可以参考:https://airflow.apache.org/docs/stable/concepts.html

如果直接推广给用户,就需要用户学习python和airflow的dag书写规则,这个成本还是比较高的。所以我们在airflow dag的基础上封装了一层工具的概念,也可以理解为插件的概念(注意不是airflow官方的插件概念)。这个工具组可以将很多通用 dag进行模板化,封装为工具,这些工具的使用只需要用户在界面上填写该工具需要使用的参数,保存这些参数后就可以自动渲染为dag。避免了用户自己编写dag和用户自己封装镜像。现在支持nrt插件、flinkx插件、datax插件、bash命令插件。

docker-compose部署调试

由于airflow包含多个功能组件,需要启动多个容器才能正常进行调试。多容器本地调试最好的方法就是使用docker-compose。

airflow的配置涉及到多个文件:.env、airflow.cfg、airflow.env、bootstrap.sh、entrypoint.sh,可以根据自己的情况修改里面的内容。

使用docker-compose up启动airflow会启动redis、webserver、scheduler、worker等组件,并且连接外置的mysql,所以还需要你自己搭建mysql数据库,这是因为大部分开发者都有自己的mysql数据库,所以没有包含在部署文件中,如果你没有自己的mysql测试库,可以使用下面的命令自己部署一个

linux
docker run --network host --restart always --name mysql -e MYSQL_ROOT_PASSWORD=admin -d mysql:5.7

mac
docker run -p 3306:3306 --restart always --name mysql -e MYSQL_ROOT_PASSWORD=admin -d mysql:5.7

进入数据库创建一个db
CREATE DATABASE IF NOT EXISTS airflow DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci;

这些组件中redis做任务队列和任务结果存储。mysql存储平台元数据。scheduler每5分钟扫描指定目录检测可用的dag,并根据dag的调度时间产生后面5分钟需要产生的任务,并推送给redis。worker从redis中领取任务,并执行任务。webserver为airflow web的前后台部分,可以在线管理dag和任务实例。如果本地调试,redis和mysql是必须要启动的,其他的组件可以根据自己当前调试的内容来选择启动。

Airflow k8s高可用部署

部署k8s集群

这是airflow的原生架构,虽然我们在本地使用docker-compose进行调试,但是为了保证高可用,我们在生产线上肯定是使用多台机器。我们采用k8s模式部署airflow的原生架构,另外我们使用ceph分布式存储来挂载dags目录到schedule,web,worker中。当然scheduler的高可用这里没有涉及(如果是多个scheduler要注意避免任务重复),redis使用的组从模式(公司的ckv,ckv+部分功能不支持),mysql可以用cdb,多个worker容器,分布在不同的机器上。

在部署airflow前需要先部署依赖组件redis和mysql。然后就可以在k8s上部署airflow了。

airflow+k8s调度结构

airflow的原有架构是部署在k8s之上的,不过按照原有架构,所有任务都是在worker容器里执行的。虽然worker可以进行伸缩容来满足不同数量的任务,或者配置参数使得worker进程数进行动态伸缩容。

但是在worker执行的任务需要资源非常大的时候,worker就有可能oom,或者造成机器死机,这样在该worker容器或机器上的所有任务就都会受影响。我们在worker基础上添加k8s。让worker只进行k8s pod的管理,不具体进行大任务的执行,具体的任务在k8s pod中执行。

注意:在这里两个地方涉及到k8s,一个是airflow原生组件是使用k8s部署的。另外一个是每个worker节点也在向各种k8s集群发起pod部署指令。这些pod是执行具体业务任务的。

kubernetes-operator

上面架构的实现方法是使用了airflow中的kubernetes-operator。这里是一个包含了kubernetes-operator的示例https://git.code.oa.com/tme-data-infra/airflow/blob/master/dags/k8s-demo.py

通过KubernetesPodOperator方法控制pod的部署。其中

  • namespace:为pod部署的命名空间
  • image:为pod的镜像
  • cmds:为pod的启动命令 数组格式
  • labels::为pod的label,字典格式
  • name:为pod的名称
  • task_id:为pipeline中当前task的名册个
  • get_logs:为是否实时获取pod的日志在airflow中显示
  • dag:为当前task所属的dag的名称
  • in_cluster:为是否在当前集群部署pod
  • service_account_name:为pod部署时所绑定的账户名
  • affinity:为pod部署的亲密度,字典格式,可以参考git中的写法
  • env_vars:为pod内的环境变量,字典格式
  • volumes=[]:为pod的数据卷,数组格式,参考git中写法
  • volume_mounts=[]:为pod部署的挂载点,参考git中写法
  • hostnetwork:为pod部署是否使用host模式
  • resources:为pod部署的资源限制
  • is_delete_operator_pod:为pod完成后是否删除pod
  • startup_timeout_seconds:为创建task多久后pod都没有成功启动
  • config_file:为部署pod的k8s集群的kubeconfig文件
  • image_pull_secrets:为pod拉取镜像的hubsecret
  • image_pull_policy:为pod拉取镜像的策略。
    通过上面的参数,我们就可以在选定的集群,选定的机器下进行并发的容器化任务调度了。

k8s安全升级airflow

当我们修改了参数或者修改了源码重新封装了镜像需要进行升级的时候,我们就需要重启容器。但是直接删除worker 会把里面job直接kill掉,然后schedule会重新调度。有时候我们的任务需要运行很长时间才会完成,我们并不想直接删除worker的pod。下面介绍一种优雅升级的方法,修改airflow的worker的升级模式设置为手动删除才升级。这样我们在修改了配置信息后,pod也不会自动重启。

updateStrategy:
type: OnDelete # OnDelete为手动升级 RollingUpdate
# podManagementPolicy: Parallel # 会同时终止所有pod

配置以后,进入任何一个worker容器,通过celery api设置为不再继续添加新任务。

celery control -b redis://:admin@redis-master.infra:6379/1 cancel_consumer default -d celery@airflow-worker-1

这样worker会消费当前已经接收的任务,不再继续接收新任务了,待已存在的任务全部结束后,就可以直接手动重启容器,重启后容器后就会继续接收新任务按照新的参数运行了。

因为airflow是基于celery来实现的,我们可以使用celery的命名来查看当前任务的调度运行情况。

airflow平台的使用

目前平台调度管理着dau各项指标监控、clickhosue读写分离构建、nrt/tdsql容器化调度、脚本容器化调度、abt数据各种任务容器。

有关airflow+k8s 多用户-分布式-跨集群-容器化调度的更多相关文章

  1. ruby-on-rails - 使用 rails 4 设计而不更新用户 - 2

    我将应用程序升级到Rails4,一切正常。我可以登录并转到我的编辑页面。也更新了观点。使用标准View时,用户会更新。但是当我添加例如字段:name时,它​​不会在表单中更新。使用devise3.1.1和gem'protected_attributes'我需要在设备或数据库上运行某种更新命令吗?我也搜索过这个地方,找到了许多不同的解决方案,但没有一个会更新我的用户字段。我没有添加任何自定义字段。 最佳答案 如果您想允许额外的参数,您可以在ApplicationController中使用beforefilter,因为Rails4将参数

  2. ruby-on-rails - 简单的 Ruby on Rails 问题——如何将评论附加到用户和文章? - 2

    我意识到这可能是一个非常基本的问题,但我现在已经花了几天时间回过头来解决这个问题,但出于某种原因,Google就是没有帮助我。(我认为部分问题在于我是一个初学者,我不知道该问什么......)我也看过O'Reilly的RubyCookbook和RailsAPI,但我仍然停留在这个问题上.我找到了一些关于多态关系的信息,但它似乎不是我需要的(尽管如果我错了请告诉我)。我正在尝试调整MichaelHartl'stutorial创建一个包含用户、文章和评论的博客应用程序(不使用脚手架)。我希望评论既属于用户又属于文章。我的主要问题是:我不知道如何将当前文章的ID放入评论Controller。

  3. ruby - RVM "ERROR: Unable to checkout branch ."单用户 - 2

    我在新的Debian6VirtualBoxVM上安装RVM时遇到问题。我已经安装了所有需要的包并使用下载了安装脚本(curl-shttps://rvm.beginrescueend.com/install/rvm)>rvm,但以单个用户身份运行时bashrvm我收到以下错误消息:ERROR:Unabletocheckoutbranch.安装在这里停止,并且(据我所知)没有安装RVM的任何文件。如果我以root身份运行脚本(对于多用户安装),我会收到另一条消息:Successfullycheckedoutbranch''安装程序继续并指示成功,但未添加.rvm目录,甚至在修改我的.bas

  4. ruby - 分布式事务和队列,ruby,erlang,scala - 2

    我有一个涉及多台机器、消息队列和事务的问题。因此,例如用户点击网页,点击将消息发送到另一台机器,该机器将付款添加到用户的帐户。每秒可能有数千次点击。事务的所有方面都应该是容错的。我以前从未遇到过这样的事情,但一些阅读表明这是一个众所周知的问题。所以我的问题。我假设安全的方法是使用两阶段提交,但协议(protocol)是阻塞的,所以我不会获得所需的性能,我是否正确?我通常写Ruby,但似乎Redis之类的数据库和Rescue、RabbitMQ等消息队列系统对我的帮助不大——即使我实现某种两阶段提交,如果Redis崩溃,数据也会丢失,因为它本质上只是内存。所有这些让我开始关注erlang和

  5. ruby - 在没有基准或时间的情况下用 Ruby 测量用户时间或系统时间 - 2

    因为我现在正在做一些时间测量,我想知道是否可以在不使用Benchmark类或命令行实用程序time的情况下测量用户时间或系统时间。使用Time类只显示挂钟时间,而不显示系统和用户时间,但是我正在寻找具有相同灵active的解决方案,例如time=TimeUtility.now#somecodeuser,system,real=TimeUtility.now-time原因是我有点不喜欢Benchmark,因为它不能只返回数字(编辑:我错了-它可以。请参阅下面的答案。)。当然,我可以解析输出,但感觉不对。*NIX系统的time实用程序也应该可以解决我的问题,但我想知道是否已经在Ruby中实

  6. ruby-on-rails - 使用 javascript 更改数据方法不会更改 ajax 调用用户的什么方法? - 2

    我遇到了一个非常奇怪的问题,我很难解决。在我看来,我有一个与data-remote="true"和data-method="delete"的链接。当我单击该链接时,我可以看到对我的Rails服务器的DELETE请求。返回的JS代码会更改此链接的属性,其中包括href和data-method。再次单击此链接后,我的服务器收到了对新href的请求,但使用的是旧的data-method,即使我已将其从DELETE到POST(它仍然发送一个DELETE请求)。但是,如果我刷新页面,HTML与"new"HTML相同(随返回的JS发生变化),但它实际上发送了正确的请求类型。这就是这个问题令我困惑的

  7. ruby - HTTP 请求中的用户代理,Ruby - 2

    我是Ruby的新手。我试过查看在线文档,但没有找到任何有效的方法。我想在以下HTTP请求botget_response()和get()中包含一个用户代理。有人可以指出我正确的方向吗?#PreliminarycheckthatProggitisupcheck=Net::HTTP.get_response(URI.parse(proggit_url))ifcheck.code!="200"puts"ErrorcontactingProggit"returnend#Attempttogetthejsonresponse=Net::HTTP.get(URI.parse(proggit_url)

  8. ruby-on-rails - capybara poltergeist - 覆盖用户代理 - 2

    有人知道如何将capybarapoltergeist的用户代理覆盖到移动用户代理以进行测试吗?我发现了一些有关为seleniumwebdriver配置它的信息:http://blog.plataformatec.com.br/2011/03/configuring-user-agents-with-capybara-selenium-webdriver/这在capybara闹鬼中怎么可能? 最佳答案 请参阅poltergeistgithub页面上的链接:https://github.com/teampoltergeist/polte

  9. ruby-on-rails - 如何用不同的用户运行nginx主进程 - 2

    A/ctohttp://wiki.nginx.org/CoreModule#usermaster进程曾经以root用户运行,是否可以以不同的用户运行nginxmaster进程? 最佳答案 只需以非root身份运行init脚本(即/etc/init.d/nginxstart),就可以用不同的用户运行nginxmaster进程。如果这真的是你想要做的,你将需要确保日志和pid目录(通常是/var/log/nginx&/var/run/nginx.pid)对该用户是可写的,并且您所有的listen调用都是针对大于1024的端口(因为绑定(

  10. ruby-on-rails - 验证电子邮件地址是 Paypal 用户 - 2

    我想验证一个电子邮件地址是否是PayPal用户。是否有API调用来执行此操作?是否有执行此操作的ruby​​库?谢谢 最佳答案 GetVerifiedStatus来自PayPal'sAdaptiveAccounts平台会为您做这件事。PayPal没有任何codesamples或SDKs用于Ruby中的自适应帐户,但我确实找到了编写codeforGetVerifiedStatusinRuby的人.您需要更改该代码以检查他们拥有的帐户类型的唯一更改是更改if@xml['accountStatus']!=nilaccount_status

随机推荐