草庐IT

pyspark-dataframes

全部标签

java - 线程 "main"java.lang.NoClassDefFoundError : when launching pyspark 中的异常

我只是在Mac上下载了spark,我下载的版本是最新的为Hadoop2.6预先构建当我尝试安装它时,我在终端上输入bin/pyspark我得到的是以下异常Exceptioninthread"main"java.lang.NoClassDefFoundError:org/apache/spark/launcher/MainCausedby:java.lang.ClassNotFoundException:org.apache.spark.launcher.Mainatjava.net.URLClassLoader$1.run(URLClassLoader.java:202)atjava.

python - pyspark tfidf 转换是否保持索引位置?

我正在运行一系列函数:hashingTF=HashingTF()documents=sc.parallelize(df.content_list.values)tf=hashingTF.transform(documents)tf.cache()idf=IDF(minDocFreq=2).fit(tf)tfidf=idf.transform(tf)但是我不确定它们是否仍保留在插入时的相同位置。有没有办法将它们映射回它们的原始值或索引? 最佳答案 我假设你想用IDF.transform的结果zipdocuments:combined=

hadoop - map 转换性能 spark dataframe 与 RDD

我有一个四节点hadoop集群(mapr),每个集群有40GB内存。我需要在大数据集(5亿行)的其中一个字段上“应用”一个函数。我的代码流程是,我从配置单元表中读取数据作为spark数据帧,并在其中一列上应用所需的函数,如下所示:schema=StructType([StructField("field1",IntegerType(),False),StructField("field2",StringType(),False),StructField("field3",FloatType(),False)])udfCos=udf(lambdarow:function_call(row

hadoop - 配置单元 pyspark 日期比较

我正在尝试将hiveQL查询转换为pyspark。我正在过滤日期并获得不同的结果,我想知道如何让pySpark中的行为与Hive的行为相匹配。配置单元查询是:SELECTCOUNT(zip_cd)FROMtableWHEREdt>='2012-01-01';在pySpark中,我正在进入解释器:importpyspark.sql.functionsaspsfimportdatetimeasdthc=HiveContext(sc)table_df=hc.table('table')DateFrom=dt.datetime.strptime('2012-01-01','%Y-%m-%d')

hadoop - Spark Sql 1.5 dataframe saveAsTable 如何添加配置单元表属性

我在配置单元上运行sparksql。我需要在创建新的配置单元表时添加auto.purge表属性。我尝试使用以下代码在调用saveAsTable方法时添加选项:inputDF.write.option("auto.purge"->"true").saveAsTable(hiveTableName)上面的代码行在表的WITHSERDEPROPERTIES下添加了一个属性。我需要在配置单元DDL的TBLPROPERTIES部分下添加此属性。 最佳答案 最后我找到了一个解决方案,我不确定这是否是最好的解决方案。不幸的是,Spark1.5sq

python - pyspark 的分析器缺少输出

我正在尝试分析提交到我的集群的pyspark作业。这个pysparkPR(https://github.com/apache/spark/pull/2556)表示sc.dump_profiles(path)是命令。我已经尝试将配置文件输出转储到hadoophdfs:///user/username/filename和本地file:///home/username/filename,以及/home/用户名/文件名。作业完成但配置文件从未出现。代码基于这个SO问题(Howtoprofilepysparkjobs),当包含--confspark.python.profile=true时,sc

python - 如何让 PySpark 在内存不足之前将中间结果写入磁盘?

背景:在HadoopStreaming中,每个reduce作业在完成时都会写入hdfs,从而为Hadoop集群执行下一个reduce扫清道路。我无法将此范例映射到(Py)Spark。举个例子,df=spark.read.load('path')df.rdd.reduceByKey(my_func).toDF().write.save('output_path')当我运行它时,集群会在将任何内容写入磁盘之前收集数据框中的所有数据。至少这就是我观察工作进展时正在发生的事情。我的问题是我的数据比我的集群内存大得多,所以我在写入任何数据之前就用完了内存。在HadoopStreaming中,我们

hadoop - 在 PySpark 中显示 Hive 查询的状态

我正在从sparksession(spark)运行Hive查询spark.sql('SELECT*FROMSOME_TABLE').show()在sql函数中是否有一个参数,或者一个配置来打印类似于Hivecli中显示的状态?HadoopjobinformationforStage-1:numberofmappers:1193;numberofreducers:10992017-05-1614:54:38,165Stage-1map=0%,reduce=0%2017-05-1614:54:49,625Stage-1map=1%,reduce=0%,CumulativeCPU213.84

python - 在 Pyspark rdd 中更改 saveAsTextFile 选项中的分隔符

我的数据集在HDFS中可用。我正在阅读它并执行过滤操作。dir=sc.textFile('/datasets/DelayedFlights.csv').filter(lambdax:int(x.split(',')[24])==1).map(lambday:y.split(','))Theoutputofaboveoperationis[u'1763',u'2008',u'1',u'3',u'4',u'922.0',u'915',u'',u'1050',u'WN',u'1069',u'N630WN',u'',u'95.0',u'',u'',u'7.0',u'SAN',u'SMF',u'

json - pyspark 将新的嵌套数组添加到现有的 json 文件中

我是Spark的新手,有一个大问题,我无法处理,即使经过数小时的搜索......我有一个看起来像这样的jsonFile:root|--dialogueData:struct(nullable=true)||--dialogueID:string(nullable=true)||--dialogueLength:double(nullable=true)||--speakerChanges:long(nullable=true)|--snippetlist:array(nullable=true)||--element:struct(containsNull=true)|||--conf