defmysql_operator_test():DEFAULT_DATE=datetime(2017,10,9)t=MySqlOperator(task_id='basic_mysql',sql="SELECTcount(*)fromtable1whereid>100;",mysql_conn_id='mysql_default',dag=dag)t.run(start_date=DEFAULT_DATE,end_date=DEFAULT_DATE,ignore_ti_state=False)run_this=PythonOperator(task_id='getRecoReq',p
我在airflow中有一个SubDAG,它有一个长时间运行的步骤(通常大约2小时,但它会根据正在运行的单元而有所不同)。在1.7.1.3下,此步骤将始终导致AIRFLOW-736当其中的所有步骤都成功时,SubDAG将停止在“运行”状态。我们可以通过在数据库中手动将SubDagOperator标记为成功(而不是运行)来解决此问题,因为我们在SubDAG之后没有步骤。我们现在正在测试Airflow1.8.1,通过执行以下操作进行升级:关闭我们的调度器和工作器通过pip,卸载airflow并安装apache-airflow(版本1.8.1)运行Airflow升级b运行Airflow调度器和
我正在通过命令安装airflow:python3setup.py安装。它接收需求文件requirements/athena.txt,即:apache-airflow[celery,postgres,hive,password,crypto]==1.10.1我遇到了一个错误:RuntimeError:BydefaultoneofAirflow'sdependenciesinstallsaGPLdependency(unidecode).ToavoidthisdependencysetSLUGIFY_USES_TEXT_UNIDECODE=yesinyourenvironmentwheny
我正在使用airflow编排一些python脚本。我有一个“主”dag,从中运行了几个subdags。我的主要dag应该根据以下概述运行:我已经通过使用以下几行在我的主dag中找到了这个结构:etl_internal_sub_dag1>>etl_internal_sub_dag2>>etl_internal_sub_dag3etl_internal_sub_dag3>>etl_adzuna_sub_dagetl_internal_sub_dag3>>etl_adwords_sub_dagetl_internal_sub_dag3>>etl_facebook_sub_dagetl_int
如何将Airflowdag配置为在每天的指定时间执行,无论发生什么,就像crons一样。我知道使用TimeSensor可以获得类似的行为,但在这种情况下,它取决于传感器任务,并且可能与dag执行时间冲突。示例:使用传感器方法,如果我有传感器在第0时第15分钟运行,但如果dag稍后执行,那么我的任务将被延迟,所以即使对于传感器方法,我也需要确保Dag正确执行时间。那么如何保证Dag在指定的时间执行呢? 最佳答案 例如,要在每天凌晨2:30启动一个DAG,您可以执行以下操作:DAG(dag_id='dag_id',#startdate:
我正在尝试在测试环境中测试具有多个任务的dag。我能够测试与dag关联的单个任务,但我想在dag中创建多个任务并启动第一个任务。为了测试我正在使用的dag中的一项任务task1.run()正在执行。但是,当我在dag的下游一个接一个地执行许多任务时,这种方法就不起作用了。fromairflowimportDAGfromairflow.operators.bash_operatorimportBashOperatorfromdatetimeimportdatetime,timedeltadefault_args={'owner':'airflow','depends_on_past':F
当我们执行dagrun时,在AirflowUI上的“图TableView”中,我们会获得每个作业运行的详细信息。JobID类似于“scheduled__2017-04-11T10:47:00”。我需要这个JobID来跟踪和创建日志,我在其中维护每个任务/dagrun花费的时间。所以我的问题是如何在正在运行的同一个dag中获取JobID。谢谢,切坦 最佳答案 这个值实际上叫做run_id,可以通过上下文或宏访问。在python运算符中,这是通过上下文访问的,而在bash运算符中,这是通过bash_command字段上的jinja模板访
我需要任务的状态,比如它是在运行还是正在重试或在同一个dag中失败。所以我尝试使用下面的代码获取它,尽管我没有输出...Auto=PythonOperator(task_id='test_sleep',python_callable=execute_on_emr,op_kwargs={'cmd':'python/home/hadoop/test/testsleep.py'},dag=dag)logger.info(Auto)目的是在Airflow上的特定任务完成后终止某些正在运行的任务。问题是我如何获取任务的状态,比如它是处于运行状态还是失败或成功 最佳答案
我正在使用GoogleCloudComposer(谷歌云平台上的托管Airflow)图像版本composer-0.5.3-airflow-1.9.0和Python2.7,我面临一个奇怪的问题:导入我的DAG后,它们是不可从WebUI中点击(并且没有“TriggerDAG”、“Graphview”等按钮),而在运行本地Airflow时一切正常。即使无法从Composer上的网络服务器使用,我的DAG仍然存在。我可以使用CLI(list_dags)列出它们,描述它们(list_tasks),甚至触发它们(trigger_dag)。重现问题的最小示例我用来重现该问题的最小示例如下所示。使用钩
我有一个关于python操作符中使用的包的依赖管理的小问题我们在工业化模式下使用Airflow来运行预定的python作业。它运行良好,但我们面临着处理每个DAG所需的不同python库的问题。您是否知道如何让开发人员为他们的工作安装他们自己的依赖项,而无需管理员身份并确保这些依赖项不会与其他工作冲突?你会推荐一个bash任务在工作开始时加载一个虚拟环境吗?有任何官方建议吗?谢谢!罗曼。 最佳答案 一般来说,我认为您的问题有两种可能的解决方案:Airflow有一个PythonVirtualEnvOperator这允许任务在自动创建和