草庐IT

python - pyspark 执行器节点上的 python 进程是否在 ram 中共享广播变量?

我的Spark集群中有一个节点有24个内核和124Gb内存。当我将spark.executor.memory字段设置为4g,然后广播一个需要3.5gb存储在ram中的变量时,内核会共同持有该变量的24个副本吗?还是一份?我正在使用pyspark-v1.6.2 最佳答案 我相信PySpark不使用任何形式的共享内存来在工作人员之间共享广播变量。在类Unix系统上广播变量areloaded在worker的主要功能中,仅调用afterforking来自守护进程,因此无法从父进程空间访问。如果您想在不使用外部服务的情况下减少大变量的占用空间

python - Spark 中的分组线性回归

我在PySpark工作,我想找到一种对数据组执行线性回归的方法。特别给出这个数据框importpandasaspdpdf=pd.DataFrame({'group_id':[1,1,1,2,2,2,3,3,3,3],'x':[0,1,2,0,1,5,2,3,4,5],'y':[2,1,0,0,0.5,2.5,3,4,5,6]})df=sqlContext.createDataFrame(pdf)df.show()#+--------+-+---+#|group_id|x|y|#+--------+-+---+#|1|0|2.0|#|1|1|1.0|#|1|2|0.0|#|2|0|0.0

java - 实现一个 java UDF 并从 pyspark 调用它

我需要创建一个在pysparkpython中使用的UDF,它使用java对象进行内部计算。如果它是一个简单的python,我会做类似的事情:deff(x):return7fudf=pyspark.sql.functions.udf(f,pyspark.sql.types.IntegerType())并使用以下方式调用它:df=sqlContext.range(0,5)df2=df.withColumn("a",fudf(df.id)).show()然而,我需要的功能的实现是在java中而不是在python中。我需要以某种方式包装它,以便我可以从python中以类似的方式调用它。我的第一

python - 将 pyspark 数据框的列转换为小写

我在pyspark中有一个数据框,其中包含大写的列,例如ID、COMPANY等等我想把这些列名做成idcompany等等。根据需要将所有列基本转换为小写或大写。我想做的是让列的数据类型保持不变。我们怎样才能做到这一点? 最佳答案 使用DataFrame中的columns字段df=//loadforcolindf.columns:df=df.withColumnRenamed(col,col.lower())或者,正如@zero323建议的那样:df.toDF(*[c.lower()forcindf.columns])

python - 如何根据 PySpark 中的数组值进行过滤?

我的架构:|--Canonical_URL:string(nullable=true)|--Certifications:array(nullable=true)||--element:struct(containsNull=true)|||--Certification_Authority:string(nullable=true)|||--End:string(nullable=true)|||--License:string(nullable=true)|||--Start:string(nullable=true)|||--Title:string(nullable=true)

python - 将 PySpark 数据框列从列表转换为字符串

我有这个PySpark数据框+-----------+--------------------+|uuid|test_123|+-----------+--------------------+|1|[test,test2,test3]||2|[test4,test,test6]||3|[test6,test9,t55o]|我想将test_123列转换成这样:+-----------+--------------------+|uuid|test_123|+-----------+--------------------+|1|"test,test2,test3"||2|"test4,

python - PySpark:TypeError:条件应为字符串或列

我正在尝试过滤基于如下的RDD:spark_df=sc.createDataFrame(pandas_df)spark_df.filter(lambdar:str(r['target']).startswith('good'))spark_df.take(5)但出现以下错误:TypeErrorTraceback(mostrecentcalllast)in()1spark_df=sc.createDataFrame(pandas_df)---->2spark_df.filter(lambdar:str(r['target']).startswith('good'))3spark_df.t

python - PySpark 将列中的空值替换为其他列中的值

我想用相邻列中的值替换一列中的空值,例如,如果我有A|B0,12,null3,null4,2我希望它是:A|B0,12,23,34,2尝试过df.na.fill(df.A,"B")但是没有用,它说值应该是一个float、整数、长整型、字符串或字典有什么想法吗? 最佳答案 我们可以使用coalescefrompyspark.sql.functionsimportcoalescedf.withColumn("B",coalesce(df.B,df.A)) 关于python-PySpark将列

python - 带有 HappyBase 连接池的 PySpark dataframe.foreach() 返回 'TypeError: can' t pickle thread.lock 对象'

我有一个PySpark作业可以更新HBase中的一些对象(Sparkv1.6.0;happybasev0.9)。如果我为每一行打开/关闭一个HBase连接,它会有点工作:defprocess_row(row):conn=happybase.Connection(host=[hbase_master])#updateHBaserecordwithdatafromrowconn.close()my_dataframe.foreach(process_row)几千次更新插入后,我们开始看到这样的错误:TTransportException:Couldnotconnectto[hbase_ma

python - 大量列的性能下降。派斯帕克

我在处理spark宽数据帧(大约9000列,有时更多)时遇到了问题。任务:通过groupBy和pivot创建宽DF。将列转换为向量并处理为来自pyspark.ml的KMeans。所以我制作了广泛的框架并尝试使用VectorAssembler创建矢量,缓存它并在其上训练KMeans。在独立模式下,我的电脑上的7个不同簇数的组装大约需要11分钟,KMeans大约需要2分钟,帧为~500x9000。另一方面,pandas中的这种处理(pivotdf,并迭代7个集群)花费的时间不到一分钟。显然我理解独立模式和缓存等的开销和性能下降,但这真的让我气馁。有人可以解释一下如何避免这种开销吗?人们如何