我正在尝试弄清楚哪种方式是使用Airflow和Spark/Hadoop的最佳方式。我已经有一个Spark/Hadoop集群,我正在考虑为Airflow创建另一个集群,它将远程提交作业到Spark/Hadoop集群。有什么建议吗?看起来从另一个集群远程部署spark有点复杂,并且会创建一些文件配置重复。 最佳答案 你真的只需要配置一个yarn-site.xml文件,我相信,为了spark-submit--masteryarn--deploy-modeclient上类。(您可以尝试集群部署模式,但我认为让Airflow管理驱动程序并不是
有没有办法在任何运算符之外使用Airflow宏?例如,在DAG中我有一个Action:datestamp='{{ds}}'print(datestamp)#printsstringnotthedatewhenIrunitforanydatescanner=S3KeySensor(task_id='scanner',poke_interval=60,timeout=24*60*60,soft_fail=False,wildcard_match=True,bucket_key=getPath()+datestamp,#datestampcorrectlyreplacedwithexecut
Airflow安装在EC2上,它在EMR上触发脚本。如果我使用UI中的“清除”选项,UI会显示任务处于关闭状态,但我仍然可以看到任务在EMR上运行。我正在使用的Airflow正在运行LocalExecutor,我想知道如何终止正在运行的任务。我应该使用UI中的“清除”选项来停止正在运行的任务吗?还是使用清除任务以及一些代码更改下面是我的代码defexecute_on_emr(cmd):f=open(file,'r')s=f.read()keyfile=StringIO.StringIO(s)mykey=paramiko.RSAKey.from_private_key(keyfile)s
我使用root帐户在我的集群上安装了ApacheAirflow。我知道这是不好的做法,但这只是测试环境。我创建了一个简单的DAG:fromairflowimportDAGfromairflow.operators.bash_operatorimportBashOperatorfromdatetimeimportdatetime,timedeltadag=DAG('create_directory',description='simplecreatedirectoryworkflow',start_date=datetime(2017,6,1))t1=BashOperator(task_
我是Airflow和Spark的新手,我正在努力使用SparkSubmitOperator。我们的Airflow调度器和我们的hadoop集群没有设置在同一台机器上(第一个问题:这是一个好的做法吗?)。我们有很多自动化程序需要调用pyspark脚本。这些pyspark脚本存储在hadoop集群(10.70.1.35)中。Airflow数据存储在Airflow机器(10.70.1.22)中。目前,当我们想要使用airflowspark-submit一个pyspark脚本时,我们使用一个简单的BashOperator,如下所示:cmd="sshhadoop@10.70.1.35spark-
我的后端是redis,但是airflow包中的airflow.cfg文件只给出了使用mysql作为后端的示例celery执行器。基本上不确定如何为redis后端设置/替换airflow.cfg中的以下sql_alchemy_conn。sql_alchemy_conn=sqlite:////Users/myself/airflow/airflow.db尝试谷歌搜索但没有成功。是否有使用redis作为CeleryExecutor后端的示例airflow.cfg? 最佳答案 “sql_alchemy_conn”用于处理DAG运行等的元数据
我在使用airflow1.9.0和CeleryExecutor使用redis作为代理时遇到了一些问题。我需要运行一项需要6个多小时才能完成的工作,而且我正在失去我的celeryworker。查看GitHub中的Airflow代码,有一个硬编码配置:https://github.com/apache/incubator-airflow/blob/d760d63e1a141a43a4a43daee9abd54cf11c894b/airflow/config_templates/default_celery.py#L31我怎样才能绕过这个问题? 最佳答案
defmysql_operator_test():DEFAULT_DATE=datetime(2017,10,9)t=MySqlOperator(task_id='basic_mysql',sql="SELECTcount(*)fromtable1whereid>100;",mysql_conn_id='mysql_default',dag=dag)t.run(start_date=DEFAULT_DATE,end_date=DEFAULT_DATE,ignore_ti_state=False)run_this=PythonOperator(task_id='getRecoReq',p
我在airflow中有一个SubDAG,它有一个长时间运行的步骤(通常大约2小时,但它会根据正在运行的单元而有所不同)。在1.7.1.3下,此步骤将始终导致AIRFLOW-736当其中的所有步骤都成功时,SubDAG将停止在“运行”状态。我们可以通过在数据库中手动将SubDagOperator标记为成功(而不是运行)来解决此问题,因为我们在SubDAG之后没有步骤。我们现在正在测试Airflow1.8.1,通过执行以下操作进行升级:关闭我们的调度器和工作器通过pip,卸载airflow并安装apache-airflow(版本1.8.1)运行Airflow升级b运行Airflow调度器和
我正在通过命令安装airflow:python3setup.py安装。它接收需求文件requirements/athena.txt,即:apache-airflow[celery,postgres,hive,password,crypto]==1.10.1我遇到了一个错误:RuntimeError:BydefaultoneofAirflow'sdependenciesinstallsaGPLdependency(unidecode).ToavoidthisdependencysetSLUGIFY_USES_TEXT_UNIDECODE=yesinyourenvironmentwheny