草庐IT

PySpark3

全部标签

python - 如何在 PySpark 中创建自定义 Estimator

我正在尝试在PySparkMLlib中构建一个简单的自定义Estimator。我有here可以编写自定义Transformer,但我不确定如何在Estimator上执行此操作。我也不明白@keyword_only的作用以及为什么我需要这么多的setter和getter。Scikit-learn似乎有适合自定义模型的文档(seehere),但PySpark没有。示例模型的伪代码:classNormalDeviation():def__init__(self,threshold=3):deffit(x,y=None):self.model={'mean':x.mean(),'std':x.

python - 在 Spark\PySpark 中保存\加载模型的正确方法是什么

我正在使用PySpark和MLlib处理Spark1.3.0,我需要保存和加载我的模型。我使用这样的代码(取自官方documentation)frompyspark.mllib.recommendationimportALS,MatrixFactorizationModel,Ratingdata=sc.textFile("data/mllib/als/test.data")ratings=data.map(lambdal:l.split(',')).map(lambdal:Rating(int(l[0]),int(l[1]),float(l[2])))rank=10numIterati

python - 将 RDD 转换为可迭代的 : PySpark?

我有一个RDD,我通过加载一个文本文件并对其进行预处理来创建它。我不想收集它并将其保存到磁盘或内存(整个数据),而是想将它传递给python中的其他函数,这些函数一个接一个地使用可迭代的数据。这怎么可能?data=sc.textFile('file.txt').map(lambdax:some_func(x))an_iterable=data.##whatshouldIdoheretomakeitgivemeoneelementatatime?defmodel1(an_iterable):foriinan_iterable:do_that(i)model(an_iterable)

python - 文本列上的 Pyspark DataFrame UDF

我正在尝试对PySparkDataFrame中的某些Unicode列进行一些NLP文本清理。我已经在Spark1.3、1.5和1.6中尝试过,但似乎无法让事情在我的生活中发挥作用。我也尝试过使用Python2.7和Python3.4。我创建了一个非常简单的udf,如下所示,它应该只为新列中的每条记录返回一个字符串。其他函数将操作文本,然后将更改后的文本返回到新列中。importpysparkfrompyspark.sqlimportSQLContextfrompyspark.sql.typesimport*frompyspark.sqlimportSQLContextfrompyspa

python - Pyspark StructType 未定义

我正在尝试构建用于数据库测试的架构,而StructType显然由于某种原因无法正常工作。我正在关注tut,它不会导入任何额外的模块。,NameError("name'StructType'isnotdefined",),)我使用的是spark1.4.0,如果这与问题有关,则使用Ubuntu12。我将如何解决这个问题?提前谢谢你。 最佳答案 您是否导入了StructType?如果不是frompyspark.sql.typesimportStructType应该可以解决问题。 关于pytho

python - 在 PySpark 中使用 Apache Spark 数据帧删除重音的最佳方法是什么?

我需要从不同数据集中删除西类牙语和其他语言字符的重音。我已经根据此post中提供的代码做了一个函数删除特殊的口音。问题在于该函数运行缓慢,因为它使用了UDF。我只是想知道我是否可以提高函数的性能以在更短的时间内获得结果,因为这对小数据帧有好处,但对大数据帧不利。提前致谢。这里是代码,您将能够按照显示的方式运行它:#Importingsqltypesfrompyspark.sql.typesimportStringType,IntegerType,StructType,StructFieldfrompyspark.sql.functionsimportudf,colimportunico

python - PySpark 抛出错误方法 __getnewargs__([]) 不存在

我有一组文件。文件的路径保存在一个文件中,例如all_files.txt。使用apachespark,我需要对所有文件进行操作并合并结果。我想做的步骤是:通过读取all_files.txt创建一个RDD对于all_files.txt中的每一行(每一行都是某个文件的路径),将每个文件的内容读入一个RDD然后对所有内容做一个操作这是我为此编写的代码:defreturn_contents_from_file(file_name):returnspark.read.text(file_name).rdd.map(lambdar:r[0])defrun_spark():file_name='pa

python - pyspark:ValueError:推断后无法确定某些类型

我有一个pandas数据框my_df,my_df.dtypes给我们:tsint64fieldAobjectfieldBobjectfieldCobjectfieldDobjectfieldEobjectdtype:object然后我尝试通过以下操作将pandas数据框my_df转换为spark数据框:spark_my_df=sc.createDataFrame(my_df)但是,我遇到了以下错误:ValueErrorTraceback(mostrecentcalllast)in()---->1spark_my_df=sc.createDataFrame(my_df)2spark_my

python - 如何在不更改 log4j.properties 的情况下关闭 PySpark 中日志的信息?

我在一个集群中工作,在该集群中我无权更改文件log4j.properties以在使用pyspark时停止信息记录(如第一个答案here中所述)。以下解决方案如上述问题中所述spark-shell(scala)的第一个答案工作importorg.apache.log4j.Loggerimportorg.apache.log4j.Level但对于sparkwithpython(即pyspark),它没有工作,也没有以下Logger.getLogger("org").setLevel(Level.OFF)Logger.getLogger("akka").setLevel(Level.OFF)

python - 来自执行程序的 PySpark 日志记录

在执行器上使用pyspark访问Spark的log4j记录器的正确方法是什么?在驱动程序中这样做很容易,但我似乎无法理解如何访问执行程序上的日志记录功能,以便我可以在本地记录并让YARN收集本地日志。有什么方法可以访问本地记录器吗?标准的日志记录程序是不够的,因为我无法从执行程序访问spark上下文。 最佳答案 您不能在执行器上使用本地log4j记录器。由执行器jvms生成的Pythonworker没有与java的“回调”连接,它们只是接收命令。但是有一种方法可以使用标准python日志记录从执行程序记录并通过YARN捕获它们。在您