草庐IT

incubator-airflow

全部标签

大数据调度平台 Airflow(五):Airflow 使用

Airflow使用上文说到使用Airflow进行任务调度大体步骤如下:创建python文件,根据实际需要,使用不同的Operator在python文件不同的Operator中传入具体参数,定义一系列task在python文件中定义Task之间的关系,形成DAG将python文件上传执行,调度DAG,每个task会形成一个Instance使用命令行或者WEBUI进行查看和管理以上python文件就是Airflowpython脚本,使用代码方式指定DAG的结构一、Airflow调度Shell命令下面我们以调度执行shell命令为例,来讲解Airflow使用。1.首先我们需要创建一个python文件

redis - 在 Airflow 的不同机器上运行一个 DAG 的多个任务

我需要创建一个看起来像这样的dag-print_date任务需要从服务器A运行,模板化任务需要从服务器B运行。从文档中可以清楚地看出,将需要带有Redis或RabbitMq的celery。我正在使用celery和Redis(puckel/docker-airflow)。我已经在带有celery执行器的服务器B中运行了Airflow。我是否也需要在服务器A中进行相同的设置?另外,我如何将这两个任务连接到一个实际存在于不同服务器中的dag中?非常感谢此类用例的示例框架。 最佳答案 使用AirflowQueues.当您定义任务时,添加一个

python - Apache Airflow -mysql 'Specified key was too long; max key length is 1000 bytes'

尝试为MySQL设置Airflow,当Airflow尝试在“airflowinitdb”期间设置主键时出现错误File"/usr/lib/pymodules/python2.7/MySQLdb/cursors.py",line166,inexecuteself.errorhandler(self,exc,value)File"/usr/lib/pymodules/python2.7/MySQLdb/connections.py",line35,indefaulterrorhandlerraiseerrorclass,errorvaluesqlalchemy.exc.Operationa

mysql - 与 MySQL 的 Airflow 连接

我想对目前位于awsrds上的mysql数据库进行一些临时查询。我在AirflowUI上创建了一个具有所有必要凭据的连接,但是数据库没有显示在“数据分析”>“临时查询”部分下。感谢任何帮助。谢谢! 最佳答案 对于最初的问题,OP可能只需要安装一个python-mysql适配器。我刚遇到类似的问题。对我来说,这个问题是因为我的系统上没有安装依赖项。当我尝试连接到Postgres数据库时,我安装了python-postgres适配器psycopg2:pip安装psycopg2我重新启动了Airflow网络服务器,Postgres连接开始

mysql - apache airflow initdb 在 mysql 的 kubernetes_resource_checkingpoint 失败

我想使用MySQL作为apacheairflow的后端数据库在我运行时安装依赖项之后airflowinitdbAirflow开始设置数据库,但随后失败并显示以下日志shahbaz@OpenSource:~$airflowinitdb[2019-07-1112:01:13,726]{settings.py:182}INFO-settings.configure_orm():Usingpoolsettings.pool_size=5,pool_recycle=1800,pid=17492[2019-07-1112:01:13,917]{__init__.py:51}INFO-Usingex

Python - Airflow再会

1.前言近期计划做一个任务调度系统,于是,重拾airflow,借机深入学习下。主要调研和测试具体使用方法、能否满足我们的项目需求,以及可能存在哪些坑。不了解airflow的朋友,可以参考我的上篇文章:Python-Airflow任务调度系统初识简单回顾一下两组关键名词:Dag->DagRun(DagInstance)Operator->Task->TaskInstanceCoreConcepts—AirflowDocumentation(apache.org)2.使用方法2.1.编写Dag文件,并测试image.png梳理实际用户需求,是否存在选择分支,是否存在人工审核步骤,是否存在任务重跑,

OSCS开源安全周报第 56 期:Apache Airflow Spark Provider 任意文件读取漏洞

本周安全态势综述OSCS社区共收录安全漏洞3个,公开漏洞值得关注的是ApacheNiFi连接URL验证绕过漏洞(CVE-2023-40037)、PowerJob未授权访问漏洞(CVE-2023-36106)、ApacheAirflowSparkProvider任意文件读取漏洞(CVE-2023-40272)。针对NPM、PyPI仓库,共监测到81个不同版本的毒组件,其中NPM组件包mall-front-babel-directive等携带远控木马,该系列的组件包具有持续性威胁行为。重要安全漏洞列表1.ApacheNiFi连接URL验证绕过漏洞(CVE-2023-40037)ApacheNiFi

python - Airflow XCOM KeyError : 'task_instance'

我正在尝试设置动态序列etl作业,它将使用XCOM从运行的第一个任务中获取数据。这是当前代码:fromairflowimportDAGfromairflow.operators.bash_operatorimportBashOperatorfromdatetimeimportdatetimeasdt,timedeltaastd,datefromairflow.modelsimportBaseOperatorfromairflow.operators.sensorsimportExternalTaskSensorfromairflow.operators.dummy_operatorim

python - Airflow 相对导入外部/dag 目录

我无法将通用代码移出Airflow使用的dag目录。我看过airflowsource并找到imp.load_source.是否可以使用imp.load_source加载存在于dag目录之外的模块?在下面的示例中,这将从公共(public)目录导入foo或bar。──airflow_home|────dags│├──dag_1.py│└──dag_2.py├──common├──foo.py└──bar.py 最佳答案 只需在所有3个文件夹中添加__init__.py文件。它应该工作。事实上,我的文件夹结构中的每个文件夹都有__ini

python - 来自 Airflow 调度程序的奇怪类型错误——在 v1.9 中是否更改了 @once 用于调度程序间隔的用法?

我有一个super简单的测试DAG,如下所示:fromdatetimeimportdatetimefromairflow.modelsimportDAGfromairflow.operators.python_operatorimportPythonOperatorDAG=DAG(dag_id='scheduler_test_dag',start_date=datetime(2017,9,9,4,0,0,0),#..EC2time.Equalto11pmhoraMéxicomax_active_runs=1,schedule_interval='@once'#externallytri