草庐IT

Airflow2

全部标签

python - 并行运行 Airflow 任务/dags

我正在使用airflow编排一些python脚本。我有一个“主”dag,从中运行了几个subdags。我的主要dag应该根据以下概述运行:我已经通过使用以下几行在我的主dag中找到了这个结构:etl_internal_sub_dag1>>etl_internal_sub_dag2>>etl_internal_sub_dag3etl_internal_sub_dag3>>etl_adzuna_sub_dagetl_internal_sub_dag3>>etl_adwords_sub_dagetl_internal_sub_dag3>>etl_facebook_sub_dagetl_int

python - 如何将 Airflow dag 配置为每天在特定时间运行?

如何将Airflowdag配置为在每天的指定时间执行,无论发生什么,就像crons一样。我知道使用TimeSensor可以获得类似的行为,但在这种情况下,它取决于传感器任务,并且可能与dag执行时间冲突。示例:使用传感器方法,如果我有传感器在第0时第15分钟运行,但如果dag稍后执行,那么我的任务将被延迟,所以即使对于传感器方法,我也需要确保Dag正确执行时间。那么如何保证Dag在指定的时间执行呢? 最佳答案 例如,要在每天凌晨2:30启动一个DAG,您可以执行以下操作:DAG(dag_id='dag_id',#startdate:

python - 如何在单元测试中测试 Airflow dag?

我正在尝试在测试环境中测试具有多个任务的dag。我能够测试与dag关联的单个任务,但我想在dag中创建多个任务并启动第一个任务。为了测试我正在使用的dag中的一项任务task1.run()正在执行。但是,当我在dag的下游一个接一个地执行许多任务时,这种方法就不起作用了。fromairflowimportDAGfromairflow.operators.bash_operatorimportBashOperatorfromdatetimeimportdatetime,timedeltadefault_args={'owner':'airflow','depends_on_past':F

python - 如何获取 Airflow dag 运行的 JobID?

当我们执行dagrun时,在AirflowUI上的“图TableView”中,我们会获得每个作业运行的详细信息。JobID类似于“scheduled__2017-04-11T10:47:00”。我需要这个JobID来跟踪和创建日志,我在其中维护每个任务/dagrun花费的时间。所以我的问题是如何在正在运行的同一个dag中获取JobID。谢谢,切坦 最佳答案 这个值实际上叫做run_id,可以通过上下文或宏访问。在python运算符中,这是通过上下文访问的,而在bash运算符中,这是通过bash_command字段上的jinja模板访

airflow - dag 中 Airflow 任务的状态

我需要任务的状态,比如它是在运行还是正在重试或在同一个dag中失败。所以我尝试使用下面的代码获取它,尽管我没有输出...Auto=PythonOperator(task_id='test_sleep',python_callable=execute_on_emr,op_kwargs={'cmd':'python/home/hadoop/test/testsleep.py'},dag=dag)logger.info(Auto)目的是在Airflow上的特定任务完成后终止某些正在运行的任务。问题是我如何获取任务的状态,比如它是处于运行状态还是失败或成功 最佳答案

python - DAG 在 Google Cloud Composer 网络服务器上不可点击,但在本地 Airflow 上运行良好

我正在使用GoogleCloudComposer(谷歌云平台上的托管Airflow)图像版本composer-0.5.3-airflow-1.9.0和Python2.7,我面临一个奇怪的问题:导入我的DAG后,它们是不可从WebUI中点击(并且没有“TriggerDAG”、“Graphview”等按钮),而在运行本地Airflow时一切正常。即使无法从Composer上的网络服务器使用,我的DAG仍然存在。我可以使用CLI(list_dags)列出它们,描述它们(list_tasks),甚至触发它们(trigger_dag)。重现问题的最小示例我用来重现该问题的最小示例如下所示。使用钩

python - 如何处理 Airflow 中的DAG lib?

我有一个关于python操作符中使用的包的依赖管理的小问题我们在工业化模式下使用Airflow来运行预定的python作业。它运行良好,但我们面临着处理每个DAG所需的不同python库的问题。您是否知道如何让开发人员为他们的工作安装他们自己的依赖项,而无需管理员身份并确保这些依赖项不会与其他工作冲突?你会推荐一个bash任务在工作开始时加载一个虚拟环境吗?有任何官方建议吗?谢谢!罗曼。 最佳答案 一般来说,我认为您的问题有两种可能的解决方案:Airflow有一个PythonVirtualEnvOperator这允许任务在自动创建和

python - Airflow ExternalTask​​Sensor 卡住了

我正在尝试使用ExternalTask​​Sensor,它卡在戳另一个DAG的任务,而该任务已经成功完成。在这里,第一个DAG“a”完成了它的任务,然后应该触发通过ExternalTask​​Sensor的第二个DAG“b”。相反,它会卡在查找a.first_task上。第一个DAG:importdatetimefromairflowimportDAGfromairflow.operators.python_operatorimportPythonOperatordag=DAG(dag_id='a',default_args={'owner':'airflow','start_date

python - 在 Airflow DAG 中导入本地模块(python 脚本)

我正在尝试将本地模块(python脚本)导入到我的DAG。目录结构:airflow/├──dag│  ├──__init__.py│  └──my_DAG.py└──script└──subfolder├──__init__.py└──local_module.pymy_DAG.py中的示例代码:#tryingtoimportfromlocalmodulefromscript.subfolderimportlocal_module#callingafunctioninlocal_module.pya=some_function()我在Airflow中收到一条错误消息,提示“损坏的DAG

python - Airflow 安装成功,但无法运行

C:\Python27\Scripts>airflowinitdb'airflow'isnotrecognizedasaninternalorexternalcommand,operableprogramorbatchfile.C:\Python27\Scripts>airflowinit'airflow'isnotrecognizedasaninternalorexternalcommand,operableprogramorbatchfile.C:\Python27\Scripts>airflowwebserver-p8080'airflow'isnotrecognizedasan