草庐IT

Pyspark-Cluster

全部标签

python - Pyspark RDD .filter() 带通配符

我有一个PysparkRDD,其中有一个我想用作过滤器的文本列,所以我有以下代码:table2=table1.filter(lambdax:x[12]=="*TEXT*")问题是...如您所见,我正在使用*试图告诉他将其解释为通配符,但没有成功。没有人有帮助吗? 最佳答案 lambda函数是纯python函数,所以像下面这样的东西就可以了table2=table1.filter(lambdax:"TEXT"inx[12]) 关于python-PysparkRDD.filter()带通配符

python - pyspark 中减少数据帧的最有效方法是什么?

我有以下数据框,第一行看起来像这样:['station_id','country','temperature','time']['12','usa','22','12:04:14']我想按“法国”前100个站点的降序显示平均温度。在pyspark中最好(最有效)的方法是什么? 最佳答案 我们通过以下方式将您的查询转换为SparkSQL:frompyspark.sql.functionsimportmean,descdf.filter(df["country"]=="france")\#onlyfrenchstations.group

python - 为什么对 rand() 生成的列进行操作的 PySpark UDF 会失败?

给定以下Python函数:deff(col):returncol如果我将其转换为UDF并将其应用于列对象,它就可以工作...frompyspark.sqlimportfunctionsasFfrompyspark.sql.typesimportDoubleTypedf=spark.range(10)udf=F.udf(f,returnType=DoubleType()).asNondeterministic()df.withColumn('new',udf(F.lit(0))).show()...除非该列是由rand生成的:df.withColumn('new',udf(F.rand(

python - 从 Pyspark 中的 RDD 中提取字典

这是一道作业题:我有一个RDD,它是元组集合。我还有从每个输入元组返回字典的函数。不知何故,与reduce函数相反。有了map,我可以很容易地从元组的RDD到字典的RDD。但是,由于字典是(key,value)对的集合,我想将字典的RDD转换为(key,value)元组的RDD每个字典的内容。那样的话,如果我的RDD包含10个元组,那么我会得到一个RDD包含10个字典和5个元素(例如),最后我得到一个RDD50个元组。我认为这一定是可能的,但是如何实现呢?(可能是我不知道这个操作英文怎么叫的问题) 最佳答案 我的2美分:有一个名为“

python - PySpark 中的列过滤

我有一个从Hive表加载的数据帧df,它有一个时间戳列,比如ts,字符串类型的格式为dd-MMM-yyhh.mm.ss.MSa(转换为python日期时间库,这是%d-%b-%y%I.%M.%S.%f%p)。现在我想过滤数据框中最近五分钟的行:only_last_5_minutes=df.filter(datetime.strptime(df.ts,'%d-%b-%y%I.%M.%S.%f%p')>datetime.now()-timedelta(minutes=5))但是,这不起作用,我收到了这条消息TypeError:strptime()argument1mustbestring,

python - 将 RDD 写入 PySpark 中的多个文件

我有一个包含键值对的rdd。只有3个键,我想将给定键的所有元素写入文本文件。目前我分3次完成此操作,但我想看看我是否可以一次完成。这是我目前所拥有的:#Ihaveanrdd(calledmy_rdd)suchthatarecordisakeyvaluepair,e.g.:#('data_set_1','value1,value2,value3,...,value100')my_rdd.cache()my_keys=['data_set_1','data_set_2','data_set_3']forkeyinmy_keys:my_rdd.filter(lambdal:l[0]==key

python - 在 pyspark 中运行脚本时缺少应用程序资源

我一直在尝试通过pyspark执行脚本.py但我一直收到此错误:11:55$./bin/spark-submit--jarsspark-cassandra-connector-2.0.0-M2-s_2.11.jar--py-filesexample.pyExceptioninthread"main"java.lang.IllegalArgumentException:Missingapplicationresource.atorg.apache.spark.launcher.CommandBuilderUtils.checkArgument(CommandBuilderUtils.ja

python - 为 PySpark 捆绑 Python3 包导致缺少导入

我正在尝试运行依赖于某些python3库的PySpark作业。我知道我可以在Spark集群上安装这些库,但由于我正在为多个作业重用该集群,所以我宁愿捆绑所有依赖项并通过--py-files指令。为此,我使用:pip3install-rrequirements.txt--target./build/dependenciescd./build/dependencieszip-qrm.../dependencies.zip它有效地压缩了所需包中的所有代码,以便在根级别使用。在我的main.py中,我可以导入依赖项ifos.path.exists('dependencies.zip'):sys

python - PySpark 窗口函数 : multiple conditions in orderBy on rangeBetween/rowsBetween

是否可以为rangeBetween或rowsBetween创建一个可以在orderBy中具有多个条件的窗口函数。假设我有一个如下所示的数据框。user_idtimestampdateevent0040b5f02018-01-2213:04:322018-01-2210040b5f02018-01-2213:04:352018-01-2200040b5f02018-01-2518:55:082018-01-2510040b5f02018-01-2518:56:172018-01-2510040b5f02018-01-2520:51:432018-01-2510040b5f02018-01

python - 使用 spark(PySpark) 进行多处理

这个问题在这里已经有了答案:HowtorunindependenttransformationsinparallelusingPySpark?(1个回答)关闭5年前。用例如下:我有一个大数据框,其中有一个“user_id”列(每个user_id可以出现在很多行中)。我有一个用户列表my_users我需要对其进行分析。Groupby、filter和aggregate可能是个好主意,但pyspark中包含的可用聚合函数不符合我的需要。在pysparkver中,userdefinedaggregationfunctions仍然没有得到完全支持,我决定暂时保留它..相反,我只是迭代my_use