草庐IT

PySpark3

全部标签

python - 带有虚拟变量的 pyspark 矩阵

有两列:IDText1a2b3c我怎样才能用这样的虚拟变量创建矩阵:IDabc110020103001使用pyspark库及其功能? 最佳答案 另一种解决方案是使用Spark的pivot方法,自Spark1.6.0以来一直存在。例子:frompyspark.sqlimportfunctionsasFdf=sqlContext.createDataFrame([(1,"a"),(2,"b"),(3,"c"),],["ID","Text"])pivoted=df.groupBy("ID").pivot("Text").agg(F.lit

python - 通过过滤对 Pyspark Dataframe 进行分组

我有如下数据框cust_idreqreq_met-----------------1r111r201r212r113r113r214r105r115r205r11我必须看看客户,看看他们有多少要求,看看他们是否至少满足过一次。同一客户和要求可以有多个记录,一个满足和不满足。在上述情况下,我的输出应该是cust_id-------123我做的是#sayinitialdataframeisdfdf1=df\.groupby('cust_id')\.countdistinct('req')\.alias('num_of_req')\.sum('req_met')\.alias('sum_re

python - 当 ID 匹配时,在其他 Pyspark Dataframe 中逐列划分 Pyspark Dataframe

我有一个PySparkDataFrame,df1,它看起来像:CustomerIDCustomerValue12.1714.1514.2517.5017.0117.35我有第二个PySparkDataFramedf2,它是按CustomerID分组并按求和函数聚合的df1。它看起来像这样:CustomerIDCustomerValueSum12.1714.4017.86我想向df1添加第三列,即df1['CustomerValue']除以df2['CustomerValueSum']以获得相同的CustomerID。这看起来像:CustomerIDCustomerValueNormal

python - pyspark: groupby 然后获取每个组的最大值

我想按一个值分组,然后使用PySpark找到每个组中的最大值。我有以下代码,但现在我对如何提取最大值有点困惑。#somefilecontainstuples('user','item','occurrences')data_file=sc.textData('file:///some_file.txt')#CreatethetripletsoIindexstuffdata_file=data_file.map(lambdal:l.split()).map(lambdal:(l[0],l[1],float(l[2])))#Groupbytheuseri.e.r[0]grouped=dat

python - pyspark : Convert DataFrame to RDD[string]

我想将pyspark.sql.dataframe.DataFrame转换为pyspark.rdd.RDD[String]我将DataFramedf转换为RDDdata:data=df.rddtype(data)##pyspark.rdd.RDD新的RDDdata包含Rowfirst=data.first()type(first)##pyspark.sql.types.Rowdata.first()Row(_c0=u'aaa',_c1=u'bbb',_c2=u'ccc',_c3=u'ddd')我想将Row转换为String列表,如下例所示:u'aaa',u'bbb',u'ccc',u'd

python - PySpark 按条件计算值

我有一个DataFrame,这里有一个片段:[['u1',1],['u2',0]]基本上是一个名为f的字符串字段,第二个元素为1或0(is_fav)。我需要做的是在第一个字段上分组并计算1和0的出现次数。我希望做类似的事情num_fav=count((col("is_fav")==1)).alias("num_fav")num_nonfav=count((col("is_fav")==0)).alias("num_nonfav")df.groupBy("f").agg(num_fav,num_nonfav)它不能正常工作,在这两种情况下我得到的结果相同,相当于组中项目的计数,因此过滤器

python - 使用 pyspark 创建 spark 数据框时出现 Py4J 错误

我已经用python3.6安装了pyspark,我正在使用jupyternotebook来初始化sparksession。frompyspark.sqlimportSparkSessionspark=SparkSession.builder.appName("test").enableHieSupport.getOrCreate()运行没有任何错误但是我写,df=spark.range(10)df.show()它抛出一个错误-->Py4JError:Anerroroccurredwhilecallingo54.showString.Trace:py4j.Py4JException:Me

python - PySpark:在 RDD 中使用对象

我目前正在学习Python,并希望将其应用到Spark上或与Spark结合使用。我有这个非常简单(且无用)的脚本:importsysfrompysparkimportSparkContextclassMyClass:def__init__(self,value):self.v=str(value)defaddValue(self,value):self.v+=str(value)defgetValue(self):returnself.vif__name__=="__main__":iflen(sys.argv)!=1:print("UsageCC")exit(-1)data=[1,2

python - 如何从pyspark中的数组中提取元素

我有一个具有以下类型的数据框:col1|col2|col3|col4xxxx|yyyy|zzzz|[1111],[2222]我希望我的输出是以下类型:col1|col2|col3|col4|col5xxxx|yyyy|zzzz|1111|2222我的col4是一个数组,我想将它转换为一个单独的列。需要做什么?我看到很多关于flatMap的答案,但是它们增加了一行,我只想将元组放在另一列但在同一行中以下是我的实际架构:root|--PRIVATE_IP:string(nullable=true)|--PRIVATE_PORT:integer(nullable=true)|--DESTIN

python - PySpark 中的 PCA 分析

查看http://spark.apache.org/docs/latest/mllib-dimensionality-reduction.html.这些示例似乎只包含Java和Scala。SparkMLlib是否支持Python的PCA分析?如果是这样,请给我举个例子。如果没有,如何将Spark与scikit-learn结合? 最佳答案 星火>=1.5.0虽然PySpark1.5引入了分布式数据结构(pyspark.mllib.linalg.distributed),但API似乎相当有限,并且没有实现computePrincipal