草庐IT

python - 如何向 Airflow 添加新的 DAG?

我在名为tutorial_2.py的文件中定义了一个DAG(实际上是airflow教程中提供的tutorial.py的副本,除了dag_id更改为tutorial_2)。当我查看默认的、未修改的airflow.cfg(位于~/airflow)时,我看到dags_folder设置为/home/alex/airflow/dags。我做cd/home/alex/airflow;mkdir目录;裁谈会dags;cp[...]/tutorial_2.pytutorial_2.py。现在我有一个dags文件夹匹配airflow.cfg中设置的路径,其中包含我之前创建的tutorial_2.py文件

python - 使用 Apache airflow 存储和访问密码

我们使用Airflow作为调度器。我想在DAG中调用一个简单的bash运算符。bash脚本需要密码作为参数才能进行进一步处理。如何在Airflow(config/variables/connection)中安全地存储密码并在dag定义文件中访问它?我是Airflow和Python的新手,因此不胜感激。 最佳答案 您可以将密码存储在Hook中-只要您设置了fernetkey,密码就会被加密。以下是如何通过UI创建连接:然后:要访问此密码:fromairflow.hooks.base_hookimportBaseHook#Depreca

python - Airflow 使用 PythonOperator 的模板文件

让BashOperator或SqlOperator为其模板选取外部文件的方法在某种程度上已清楚地记录在案,但查看PythonOperator我对我从文档中理解的内容进行的测试不起作用。我不确定templates_exts和templates_dict参数如何正确交互以获取文件。在我创建的dags文件夹中:pyoptemplate.sql和pyoptemplate.t以及test_python_operator_template.py:pyoptemplate.sql:SELECT*FROM{{params.table}};pyoptemplate.t:SELECT*FROM{{para

docker - Airbnb Airflow 使用所有系统资源

我们已经使用LocalExecutor为我们的ETL设置了Airbnb/ApacheAirflow,并且随着我们开始构建更复杂的DAG,我们注意到Airflow已经开始使用大量的系统资源。这让我们感到惊讶,因为我们主要使用Airflow来编排发生在其他服务器上的任务,因此AirflowDAG大部分时间都在等待它们完成——并没有在本地发生实际执行。最大的问题是Airflow似乎总是用完100%的CPU(在AWSt2.medium上),并且使用默认的airflow.cfg设置使用超过2GB的内存。如果相关,我们使用docker-compose运行Airflow,运行容器两次;一次作为sch

docker - Airbnb Airflow 使用所有系统资源

我们已经使用LocalExecutor为我们的ETL设置了Airbnb/ApacheAirflow,并且随着我们开始构建更复杂的DAG,我们注意到Airflow已经开始使用大量的系统资源。这让我们感到惊讶,因为我们主要使用Airflow来编排发生在其他服务器上的任务,因此AirflowDAG大部分时间都在等待它们完成——并没有在本地发生实际执行。最大的问题是Airflow似乎总是用完100%的CPU(在AWSt2.medium上),并且使用默认的airflow.cfg设置使用超过2GB的内存。如果相关,我们使用docker-compose运行Airflow,运行容器两次;一次作为sch

mysql - docker-compose 未知 MySQL 服务器主机 'mysql'( Airflow )

我尝试创建我的Airflow服务。当我分别运行三个docker(没有docker-compose)时,一切正常,但是当我尝试使用docker-compose做同样的事情时,我得到了错误。Airflow似乎没有看到数据库。docker-compose.ymlversion:'2'services:mysql:image:"someregisty/mysql"environment:-MYSQL_ROOT_PASSWORD=somepasswordredis:image:"someregisty/redis"airflow:image:"someregisty/airflow"volume

python - 如何强制 Airflow 任务失败?

我有一个处理csv文件条目的python可调用process_csv_entries。我希望我的任务只有在所有条目都成功处理后才能成功完成。否则任务应该失败defprocess_csv_entries(csv_file):#Booleanfile_completely_parsed=returnnotfile_completely_parsedCSV_FILE=t1=PythonOperator(dag=dag,task_id='parse_csv_completely',python_operator=process_csv_entries,op_args=[CSV_FILE])无论

python - 如何使用 AirFlow 运行 python 文件的文件夹?

我在一个Python文件文件夹中有一系列Python任务:file1.py、file2.py、...我阅读了Airflow文档,但没有看到如何在DAG中指定python文件的文件夹和文件名?我想执行那些python文件(不是通过PythonOperator的Python函数)。任务1:执行file1.py(带有一些导入包)任务2:执行file2.py(与其他一些导入包)这会很有帮助。谢谢,问候 最佳答案 我知道您问的是“想要执行那些Python文件(而不是通过Python运算符执行的Python函数)”。但我认为这可能比您使用Air

python - 从终端触发 Airflow DAG 不起作用

我正在尝试使用Airflow来定义我想从命令行手动触发的特定工作流程。我创建了DAG并添加了一堆任务。dag=airflow.DAG("DAG_NAME",start_date=datetime(2015,1,1),schedule_interval=None,default_args=args)然后我在终端运行airflowtrigger_dagDAG_NAME然后什么也没有发生。调度程序正在另一个线程中运行。任何方向都非常感谢。谢谢你 最佳答案 我刚刚遇到了同样的问题。假设您可以在airflowlist_dags中看到您的dag

python - Airflow Python单元测试?

我想为我们的DAG添加一些单元测试,但找不到。是否有用于DAG的单元测试框架?存在一个端到端测试框架,但我猜它已经死了:https://issues.apache.org/jira/browse/AIRFLOW-79.请推荐,谢谢! 最佳答案 像这样测试你的操作符:classTestMyOperator(TestCase):deftest_execute(self):withDAG(dag_id="foo",start_date=datetime.now()):task=MyOperator(task_id="foo")ti=Tas