草庐IT

python - 在 Airflow DAG 中导入本地模块(python 脚本)

我正在尝试将本地模块(python脚本)导入到我的DAG。目录结构:airflow/├──dag│  ├──__init__.py│  └──my_DAG.py└──script└──subfolder├──__init__.py└──local_module.pymy_DAG.py中的示例代码:#tryingtoimportfromlocalmodulefromscript.subfolderimportlocal_module#callingafunctioninlocal_module.pya=some_function()我在Airflow中收到一条错误消息,提示“损坏的DAG

python - Airflow 安装成功,但无法运行

C:\Python27\Scripts>airflowinitdb'airflow'isnotrecognizedasaninternalorexternalcommand,operableprogramorbatchfile.C:\Python27\Scripts>airflowinit'airflow'isnotrecognizedasaninternalorexternalcommand,operableprogramorbatchfile.C:\Python27\Scripts>airflowwebserver-p8080'airflow'isnotrecognizedasan

python - Airflow DAG 中的外部文件

我正在尝试访问Airflow任务中的外部文件以读取一些sql,但出现“找不到文件”。有人遇到过这个吗?fromairflowimportDAGfromairflow.operators.python_operatorimportPythonOperatorfromdatetimeimportdatetime,timedeltadag=DAG('my_dat',start_date=datetime(2017,1,1),catchup=False,schedule_interval=timedelta(days=1))defrun_query():#readthequeryquery=o

python - Airflow 没有正确安排 Python

代码:Python版本2.7.x和Airflow版本1.5.1我的dag脚本是这样的fromairflowimportDAGfromairflow.operatorsimportBashOperatorfromdatetimeimportdatetime,timedeltadefault_args={'owner':'xyz','depends_on_past':False,'start_date':datetime(2015,10,13),'email':['xyz@email.in'],'schedule_interval':timedelta(minutes=5),'email_

python - Airflow scheduler 是否有可能在开始下一天之前先完成前一天的周期?

现在,我的DAG中的节点会在该DAG的其余节点完成之前继续执行第二天的任务。有没有办法让它在进入第二天的DAG周期之前等待DAG的其余部分完成?(我确实将depends_on_past设为true,但在这种情况下不起作用)我的DAG看起来像这样:OlVO->O->O->O->O此外,dag的TreeView图片] 最佳答案 这个答案可能有点晚了,但我遇到了同样的问题,我解决它的方法是在每个dag中添加两个额外的任务。开头为“Previous”,结尾为“Complete”。上一个任务是监视上一个作业的外部任务传感器。Complete只

python - 作为守护进程运行 Airflow 调度程序的问题

我有一个使用LocalExecutor运行airflow1.8.0的EC2实例。根据文档,我预计以下两个命令之一会在守护进程模式下启动调度程序:airflowscheduler--daemon--num_runs=20或Airflow调度程序--daemon=True--num_runs=5但事实并非如此。第一个命令似乎可以正常工作,但它只是在返回终端之前返回以下输出,而不产生任何后台任务:[2017-09-2818:15:02,794]{__init__.py:57}INFO-UsingexecutorLocalExecutor[2017-09-2818:15:03,064]{dri

python - 有没有办法通过 Airflow API 创建/修改连接

通过Admin->Connections,我们可以创建/修改连接的参数,但我想知道我是否可以通过API做同样的事情,这样我就可以以编程方式设置连接airflow.models.Connection似乎只处理实际连接到实例而不是将其保存到列表中。这似乎是一个应该实现的功能,但我不确定在哪里可以找到这个特定功能的文档。 最佳答案 连接实际上是一个模型,您可以使用它来查询和插入新连接fromairflowimportsettingsfromairflow.modelsimportConnectionconn=Connection(conn

python - 无法启动 Airflow worker/flower,需要澄清 Airflow 架构以确认安装正确

在不同的机器上运行worker会导致下面指定的错误。我关注了theconfigurationinstructions并同步dags文件夹。我还要确认一下,RabbitMQ和PostgreSQL只需要安装在Airflow核心机上,不需要安装在worker上(worker只连接core)。设置规范详述如下:Airflow核心/服务器计算机已安装以下内容:Python2.7与Airflow(AIRFLOW_HOME=~/airflow)celery心理治疗师2RabbitMQPostgreSQLairflow.cfg中的配置:sql_alchemy_conn=postgresql+psyco

python - 如何运行一个简单的 Airflow DAG

我是Airflow的新手。我想在指定日期运行一个简单的DAG。我正在努力区分开始日期、执行日期和回填日期。运行DAG的命令是什么?这是我从那以后尝试过的:airflowrundag_1task_12017-1-23我第一次运行该命令时,任务执行正确,但当我再次尝试时它没有工作。这是我运行的另一个命令:airflowbackfilldag_1-s2017-1-23-e2017-1-24我不知道该命令会带来什么。DAG会在每天23点到24点执行吗?在运行上面的两个命令之前,我这样做了:airflowinitdbairflowschedulerairflowwebserver-p8085--

python - 运行时的 Airflow 动态任务

关于“动态任务”的其他问题似乎涉及在计划或设计时动态构建DAG。我对在执行期间向DAG动态添加任务很感兴趣。fromairflowimportDAGfromairflow.operators.dummy_operatorimportDummyOperatorfromairflow.operators.python_operatorimportPythonOperatorfromdatetimeimportdatetimedag=DAG('test_dag',description='atest',schedule_interval='00***',start_date=datetime