草庐IT

python - 使用 pyspark 创建 spark 数据框时出现 Py4J 错误

我已经用python3.6安装了pyspark,我正在使用jupyternotebook来初始化sparksession。frompyspark.sqlimportSparkSessionspark=SparkSession.builder.appName("test").enableHieSupport.getOrCreate()运行没有任何错误但是我写,df=spark.range(10)df.show()它抛出一个错误-->Py4JError:Anerroroccurredwhilecallingo54.showString.Trace:py4j.Py4JException:Me

python - PySpark:在 RDD 中使用对象

我目前正在学习Python,并希望将其应用到Spark上或与Spark结合使用。我有这个非常简单(且无用)的脚本:importsysfrompysparkimportSparkContextclassMyClass:def__init__(self,value):self.v=str(value)defaddValue(self,value):self.v+=str(value)defgetValue(self):returnself.vif__name__=="__main__":iflen(sys.argv)!=1:print("UsageCC")exit(-1)data=[1,2

python - 如何从pyspark中的数组中提取元素

我有一个具有以下类型的数据框:col1|col2|col3|col4xxxx|yyyy|zzzz|[1111],[2222]我希望我的输出是以下类型:col1|col2|col3|col4|col5xxxx|yyyy|zzzz|1111|2222我的col4是一个数组,我想将它转换为一个单独的列。需要做什么?我看到很多关于flatMap的答案,但是它们增加了一行,我只想将元组放在另一列但在同一行中以下是我的实际架构:root|--PRIVATE_IP:string(nullable=true)|--PRIVATE_PORT:integer(nullable=true)|--DESTIN

python - PySpark 中的 PCA 分析

查看http://spark.apache.org/docs/latest/mllib-dimensionality-reduction.html.这些示例似乎只包含Java和Scala。SparkMLlib是否支持Python的PCA分析?如果是这样,请给我举个例子。如果没有,如何将Spark与scikit-learn结合? 最佳答案 星火>=1.5.0虽然PySpark1.5引入了分布式数据结构(pyspark.mllib.linalg.distributed),但API似乎相当有限,并且没有实现computePrincipal

python - PySpark - 将列表作为参数传递给 UDF

我需要将列表传递给UDF,该列表将确定距离的分数/类别。现在,我将所有距离硬编码为第4个分数。a=spark.createDataFrame([("A",20),("B",30),("D",80)],["Letter","distances"])frompyspark.sql.functionsimportudfdefcate(label,feature_list):iffeature_list==0:returnlabel[4]label_list=["Great","Good","OK","PleaseMove","Dead"]udf_score=udf(cate,StringTy

python - Pyspark 错误 : Java gateway process exited before sending its port number

我正在使用Pyspark在JupyterNotebook中运行一些命令,但它抛出错误。我尝试了此链接中提供的解决方案(Pyspark:Exception:Javagatewayprocessexitedbeforesendingthedriveritsportnumber)我尝试执行此处提供的解决方案(例如更改C:Java的路径、卸载JavaSDK10并重新安装Java8,但它仍然抛出同样的错误。我尝试卸载并重新安装pyspark,我也尝试从anaconda提示符运行,但我仍然遇到同样的错误。我使用的是Python3.7,pyspark版本是2.4.0。如果我使用这段代码,我会得到这个

python - 如何使用matplotlib绘制pyspark sql结果

我是pyspark的新手。我想使用matplotlib绘制结果,但不确定要使用哪个函数。我搜索了一种将sql结果转换为pandas然后使用plot的方法。 最佳答案 我已经找到了解决方案。我将sqldataframe转换为pandasdataframe,然后我能够绘制图表。下面是示例代码。来自pyspark.sqlimportRowfrompyspark.sqlimportHiveContextimportpysparkfromIPython.displayimportdisplayimportmatplotlibimportmat

python - 如何使用增量值向 Pyspark 中的 DataFrame 添加一列?

我有一个名为“df”的DataFrame,如下所示:+-------+-------+-------+|Atr1|Atr2|Atr3|+-------+-------+-------+|A|A|A|+-------+-------+-------+|B|A|A|+-------+-------+-------+|C|A|A|+-------+-------+-------+我想用增量值向它添加一个新列并获取以下更新的DataFrame:+-------+-------+-------+-------+|Atr1|Atr2|Atr3|Atr4|+-------+-------+-----

python - 如何通过pyspark以gzip格式保存spark RDD

所以我使用以下代码将sparkRDD保存到S3存储桶。有没有办法压缩(gz格式)并保存而不是将其另存为文本文件。help_data.repartition(5).saveAsTextFile("s3://help-test/logs/help") 最佳答案 saveAsTextFile方法采用一个可选参数,该参数指定压缩编解码器类:help_data.repartition(5).saveAsTextFile(path="s3://help-test/logs/help",compressionCodecClass="org.apa

python - Pyspark 导入 .py 文件不起作用

我的目标是将自定义.py文件导入我的spark应用程序并调用该文件中包含的一些函数这是我尝试过的:我有一个名为Test.py的测试文件,如下所示:deffunc():print"Importisworking"在我的Spark应用程序中,我执行以下操作(如文档中所述):sc=SparkContext(conf=conf,pyFiles=['/[AbsolutePathTo]/Test.py'])我也试过这个(在创建Spark上下文之后):sc.addFile("/[AbsolutePathTo]/Test.py")我什至在提交我的spark申请时尝试了以下方法:./bin/spark-