草庐IT

PySpark3

全部标签

python - 如何在 PySpark 中读取 Avro 文件

我正在使用python编写spark作业。但是,我需要读入一大堆avro文件。This是我在Spark的示例文件夹中找到的最接近的解决方案。但是,您需要使用spark-submit提交此python脚本。在spark-submit的命令行中,你可以指定driver-class,这样的话,你所有的avrokey,avrovalue类都会被找到。avro_rdd=sc.newAPIHadoopFile(path,"org.apache.avro.mapreduce.AvroKeyInputFormat","org.apache.avro.mapred.AvroKey","org.apach

python - 如何从 RDD[PYSPARK] 中删除重复值

我有下表作为RDD:KeyValue1y1y1y1n1n2y2n2n我想从Value中删除所有重复项。输出应该是这样的:KeyValue1y1n2y2n在pyspark中工作时,输出应该是像这样的键值对列表:[(u'1',u'n'),(u'2',u'n')]我不知道如何在这里应用for循环。在普通的Python程序中,这会非常容易。我想知道pyspark中是否有一些相同的功能。 最佳答案 恐怕我对python一无所知,所以我在这个答案中提供的所有引用和代码都是与java相关的。但是,将它翻译成python代码应该不是很困难。你应该看

python - 在非 Spark 环境中加载 pyspark ML 模型

我对在python中部署机器学习模型很感兴趣,因此可以通过向服务器发出请求来进行预测。我将创建一个Cloudera集群并利用pyspark库利用Spark开发模型。我想知道如何保存模型以便在服务器上使用它。我已经看到不同的算法具有.save函数(就像在这篇文章HowtosaveandloadMLLibmodelinApacheSpark中回答的那样),但是由于服务器将在没有Spark的不同机器上而不是在Cloudera集群中,我不知道不知道是否可以使用他们的.load和.predict函数。是否可以通过使用pyspark库函数进行预测而不使用Spark?或者我是否必须进行任何转换才能保

python - pyspark 在将 rdd 转换为数据帧时对 mapPartitions 使用一个任务

我很困惑为什么在将生成的RDD转换为DataFrame时,Spark似乎对rdd.mapPartitions使用了1个任务。这对我来说是个问题,因为我想从:DataFrame-->RDD-->rdd.mapPartitions-->DataFrame这样我就可以读取数据(DataFrame),将非SQL函数应用于数据block(RDD上的mapPartitions),然后转换回DataFrame,这样我就可以使用DataFrame.write过程。我可以从DataFrame-->mapPartitions开始,然后使用像saveAsTextFile这样的RDD编写器,但这不太理想,因为

python - PySpark DataFrames - 在不转换为 Pandas 的情况下进行枚举的方法?

我有一个非常大的pyspark.sql.dataframe.DataFrame名为df。我需要一些枚举记录的方法——因此,能够访问具有特定索引的记录。(或选择具有索引范围的记录组)在Pandas中,我可以做到indexes=[2,3,6,7]df[indexes]我想要类似的东西,(并且没有将数据框转换为pandas)我能得到的最接近的是:通过以下方式枚举原始数据框中的所有对象:indexes=np.arange(df.count())df_indexed=df.withColumn('index',indexes)使用where()函数搜索我需要的值。问题:为什么它不起作用以及如何让

python - PySpark DataFrames - 在不转换为 Pandas 的情况下进行枚举的方法?

我有一个非常大的pyspark.sql.dataframe.DataFrame名为df。我需要一些枚举记录的方法——因此,能够访问具有特定索引的记录。(或选择具有索引范围的记录组)在Pandas中,我可以做到indexes=[2,3,6,7]df[indexes]我想要类似的东西,(并且没有将数据框转换为pandas)我能得到的最接近的是:通过以下方式枚举原始数据框中的所有对象:indexes=np.arange(df.count())df_indexed=df.withColumn('index',indexes)使用where()函数搜索我需要的值。问题:为什么它不起作用以及如何让

python - 如何从 PySpark 中的不同线程在一个 Sparkcontext 中运行多个作业?

从Spark文档中了解到SchedulingWithinanApplication:InsideagivenSparkapplication(SparkContextinstance),multipleparalleljobscanrunsimultaneouslyiftheyweresubmittedfromseparatethreads.By“job”,inthissection,wemeanaSparkaction(e.g.save,collect)andanytasksthatneedtoruntoevaluatethataction.Spark’sschedulerisful

python - 如何从 PySpark 中的不同线程在一个 Sparkcontext 中运行多个作业?

从Spark文档中了解到SchedulingWithinanApplication:InsideagivenSparkapplication(SparkContextinstance),multipleparalleljobscanrunsimultaneouslyiftheyweresubmittedfromseparatethreads.By“job”,inthissection,wemeanaSparkaction(e.g.save,collect)andanytasksthatneedtoruntoevaluatethataction.Spark’sschedulerisful

python - 如何在 PySpark 中使用窗口函数?

我正在尝试对数据框使用一些Windows函数(ntile和percentRank),但我不知道如何使用它们。谁能帮我解决这个问题?在PythonAPIdocumentation没有关于它的例子。具体来说,我正在尝试获取数据框中数字字段的分位数。我正在使用spark1.4.0。 最佳答案 要使用窗口函数,您必须先创建一个窗口。定义与普通SQL几乎相同,这意味着您可以定义顺序、分区或两者。首先让我们创建一些虚拟数据:importnumpyasnpnp.random.seed(1)keys=["foo"]*10+["bar"]*10val

python - 如何在 PySpark 中使用窗口函数?

我正在尝试对数据框使用一些Windows函数(ntile和percentRank),但我不知道如何使用它们。谁能帮我解决这个问题?在PythonAPIdocumentation没有关于它的例子。具体来说,我正在尝试获取数据框中数字字段的分位数。我正在使用spark1.4.0。 最佳答案 要使用窗口函数,您必须先创建一个窗口。定义与普通SQL几乎相同,这意味着您可以定义顺序、分区或两者。首先让我们创建一些虚拟数据:importnumpyasnpnp.random.seed(1)keys=["foo"]*10+["bar"]*10val