草庐IT

python - PySpark 抛出错误方法 __getnewargs__([]) 不存在

我有一组文件。文件的路径保存在一个文件中,例如all_files.txt。使用apachespark,我需要对所有文件进行操作并合并结果。我想做的步骤是:通过读取all_files.txt创建一个RDD对于all_files.txt中的每一行(每一行都是某个文件的路径),将每个文件的内容读入一个RDD然后对所有内容做一个操作这是我为此编写的代码:defreturn_contents_from_file(file_name):returnspark.read.text(file_name).rdd.map(lambdar:r[0])defrun_spark():file_name='pa

python - pyspark:ValueError:推断后无法确定某些类型

我有一个pandas数据框my_df,my_df.dtypes给我们:tsint64fieldAobjectfieldBobjectfieldCobjectfieldDobjectfieldEobjectdtype:object然后我尝试通过以下操作将pandas数据框my_df转换为spark数据框:spark_my_df=sc.createDataFrame(my_df)但是,我遇到了以下错误:ValueErrorTraceback(mostrecentcalllast)in()---->1spark_my_df=sc.createDataFrame(my_df)2spark_my

python - 如何在不更改 log4j.properties 的情况下关闭 PySpark 中日志的信息?

我在一个集群中工作,在该集群中我无权更改文件log4j.properties以在使用pyspark时停止信息记录(如第一个答案here中所述)。以下解决方案如上述问题中所述spark-shell(scala)的第一个答案工作importorg.apache.log4j.Loggerimportorg.apache.log4j.Level但对于sparkwithpython(即pyspark),它没有工作,也没有以下Logger.getLogger("org").setLevel(Level.OFF)Logger.getLogger("akka").setLevel(Level.OFF)

python - 来自执行程序的 PySpark 日志记录

在执行器上使用pyspark访问Spark的log4j记录器的正确方法是什么?在驱动程序中这样做很容易,但我似乎无法理解如何访问执行程序上的日志记录功能,以便我可以在本地记录并让YARN收集本地日志。有什么方法可以访问本地记录器吗?标准的日志记录程序是不够的,因为我无法从执行程序访问spark上下文。 最佳答案 您不能在执行器上使用本地log4j记录器。由执行器jvms生成的Pythonworker没有与java的“回调”连接,它们只是接收命令。但是有一种方法可以使用标准python日志记录从执行程序记录并通过YARN捕获它们。在您

python - 从 PySpark 中的数据框中删除重复项

我在本地处理pyspark1.4中的数据帧,并且在使dropDuplicates方法起作用时遇到问题。它不断返回错误:"AttributeError:'list'objecthasnoattribute'dropDuplicates'"不太清楚为什么我似乎遵循了latestdocumentation中的语法.#loadingtheCSVfileintoanRDDinordertostartworkingwiththedatardd1=sc.textFile("C:\myfilename.csv").map(lambdaline:(line.split(",")[0],line.spli

python - Pyspark 将标准列表转换为数据框

这个问题在这里已经有了答案:CreateSparkDataFrame.Cannotinferschemafortype(4个答案)关闭4年前。案例非常简单,我需要使用以下代码将python列表转换为数据框frompyspark.sql.typesimportStructTypefrompyspark.sql.typesimportStructFieldfrompyspark.sql.typesimportStringType,IntegerTypeschema=StructType([StructField("value",IntegerType(),True)])my_list=[1

python - 如何计算 pyspark 数据框中每个不同值的计数?

我有一列以字符串形式填充了一堆州的首字母。我的目标是如何计算此类列表中每个州的数量。例如:(("TX":3),("NJ":2))应该是出现两次"TX"时的输出和“NJ”。我是pyspark的新手,所以我被这个问题难住了。任何帮助将不胜感激。 最佳答案 我认为您希望使用groupBy的DataFrame习惯用法和count.例如,给定以下数据框,每行一个状态:df=sqlContext.createDataFrame([('TX',),('NJ',),('TX',),('CA',),('NJ',)],('state',))df.sho

python - 如何将字符串类型的列转换为pyspark数据框中的int形式?

我在pyspark中有数据框。它的一些数字列包含nan,因此当我读取数据并检查数据帧的架构时,这些列将具有string类型。如何将它们更改为int类型。我用0替换了nan值并再次检查了架构,但随后它也显示了这些列的字符串类型。我遵循以下代码:data_df=sqlContext.read.format("csv").load('data.csv',header=True,inferSchema="true")data_df.printSchema()data_df=data_df.fillna(0)data_df.printSchema()我的数据是这样的:这里的列Plays和draf

python - 使用 monotonically_increasing_id() 将行号分配给 pyspark 数据帧

我正在使用monotonically_increasing_id()使用以下语法将行号分配给pyspark数据帧:df1=df1.withColumn("idx",monotonically_increasing_id())现在df1有26,572,528条记录。所以我期望idx值在0-26,572,527之间。但是当我选择max(idx)时,它的值出奇地大:335,008,054,165。这个函数是怎么回事?使用此函数与另一个具有相似记录数的数据集合并是否可靠?我有大约300个数据帧,我想将它们组合成一个数据帧。因此,一个数据框包含ID,而其他数据框包含与它们逐行对应的不同记录

python - 如何在 pySpark 数据框中添加行 ID

这个问题在这里已经有了答案:PrimarykeyswithApacheSpark(4个答案)关闭6年前。我有一个csv文件;我在pyspark中将其转换为DataFrame(df);经过一些改造;我想在df中添加一列;这应该是简单的行ID(从0或1到N)。我将df转换为rdd并使用“zipwithindex”。我将生成的rdd转换回df。这种方法有效,但它生成了250k任务并且需要花费大量时间来执行。我想知道是否有其他方法可以减少运行时间。以下是我的代码片段;我正在处理的csv文件很大;包含数十亿行。debug_csv_rdd=(sc.textFile("debug.csv").fil