草庐IT

hadoop - 如何更改 pyspark 中的 hdfs block 大小?

我使用pySpark编写parquet文件。我想更改该文件的hdfsblock大小。我这样设置block大小,但它不起作用:sc._jsc.hadoopConfiguration().set("dfs.block.size","128m")这是否必须在启动pySpark作业之前设置?如果可以,怎么做。 最佳答案 尝试通过sc._jsc.hadoopConfiguration()和SparkContext设置它frompysparkimportSparkConf,SparkContextconf=(SparkConf().setMas

hadoop - 如何在使用 pyspark 读取 Parquet 文件时指定模式?

在使用scala或pyspark读取存储在hadoop中的parquet文件时发生错误:#scalavardff=spark.read.parquet("/super/important/df")org.apache.spark.sql.AnalysisException:UnabletoinferschemaforParquet.Itmustbespecifiedmanually.;atorg.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:189)atorg.ap

python - PySpark 加载 CSV AttributeError : 'RDD' object has no attribute '_get_object_id'

我正在尝试将CSV文件加载到sparkDataFrame中。这是我到目前为止所做的:#scisanSparkContext.appName="testSpark"master="local"conf=SparkConf().setAppName(appName).setMaster(master)sc=SparkContext(conf=conf)sqlContext=sql.SQLContext(sc)#csvpathtext_file=sc.textFile("hdfs:///path/to/sensordata20171008223515.csv")df=sqlContext.l

python - 将 PySpark DF 写入专用格式的文件

我正在使用PySpark2.1,我需要想出一种方法将我的数据帧写入专门格式的.txt文件;所以不是典型的json或csv,而是CTF格式(对于CNTK)。该文件不能有额外的括号或逗号等。它遵循以下形式:|labelval|featuresvalvalval...val|labelval|featuresvalvalval...val显示这一点的一些代码可能如下所示:l=[('Ankit',25),('Jalfaizy',22),('saurabh',20),('Bala',26)]rdd=sc.parallelize(l)people=rdd.map(lambdax:Row(name=

python - 如何将多列(即时间、年、月和日期)转换为 pyspark 数据框中的日期时间格式

Dataframe有4列year,month,date,hhmmhhmm-小时和分钟连接在一起例如:10:30等于1030dd=spark.createDataFrame([(2019,2,13,1030),(2018,2,14,1000),(2029,12,13,0300)],["Year","month","date","hhmm"])dd.collect()pysparkdataframedd中日期时间格式的预期输出dd.collect()2019-02-1310:30:002018-2-1410:00:002019-12-1303:00:00 最佳答

python - Spark 1.5.2 + Hadoop 2.6.2 spark-submit 和 pyspark 不使用独立的所有节点

我在独立模式下运行spark-submit或pyspark时遇到问题,如下所示:spark/bin/pyspark--masterspark://:这通常会使用所有节点(至少在以前的版本中)在UI中创建一个正在运行的Spark应用程序。出于某种原因,这样做只会在主节点上运行它,尽管UI显示所有节点都连接到主节点。从节点上的日志中没有错误。任何人都知道可能出了什么问题?作为引用,我的spark-env.sh具有以下配置:exportHADOOP_CONF_DIR=/mnt/hadoop/etc/hadoopexportSPARK_PUBLIC_DNS=exportSPARK_MASTER

scala - 如何在 PySpark 中压缩两个 RDD?

我一直在尝试合并averagePoints1和kpoints2下面的两个Rdd。一直报错ValueError:CannotdeserializeRDDwithdifferentnumberofitemsinpair:(2,1)而且我尝试了很多东西,但我不能这两个Rdds是相同的,具有相同数量的分区。我的下一步是在两个列表上应用欧几里德距离函数来衡量差异,因此如果有人知道如何解决此错误或有不同的方法我可以遵循,我将非常感激。提前致谢averagePoints1=averagePoints.map(lambdax:x[1])averagePoints1.collect()Out[15]:[

hadoop - Oozie pyspark 工作

我的工作流程非常简单。${jobTracker}${nameNode}mapred.compress.map.outputtruelocal[*]SparkExamplemapping.py--executor-memory1G--num-executors3--executor-cores1argument1argument2"Killedjobduetoerror"Spark脚本几乎什么都不做:iflen(sys.argv)脚本位于hdfs上,与workflow.xml位于同一文件夹中。运行工作流程时出现以下错误LauncherERROR,reason:Mainclass[org.

python - 无法从 cmd 表单 spark 目录运行 pyspark

我已经在我的windows10系统中安装了spark1.6(prebuiltforhadoop2.6)版本并且我已经正确设置了环境变量。当我运行pyspark时,我得到这个errormessage.但是我可以从spark目录运行“python”命令并返回正确的版本。谁能帮我解决这个问题? 最佳答案 当您运行python时,它会直接进入python命令行,但对于pyspark,您必须执行此位置不存在的pyspark可执行文件。您正在尝试进入C:\spark但pyspark文件存在于此位置C:\spark\bin\pyspark因此您需

hadoop - 如何配置 pyspark 默认写入 HDFS?

我正在尝试让spark默认写入HDFS。目前,当我在RDD上调用saveAsTextFile时,它​​会写入我的本地文件系统。具体来说,如果我这样做:rdd=sc.parallelize([1,2,3,4,5])rdd.saveAsTextFile("/tmp/sample")它将写入我本地文件系统上名为/tmp/sample的文件。但是,如果我这样做rdd=sc.parallelize([1,2,3,4,5])rdd.saveAsTextFile("hdfs://localhost:9000/tmp/sample")然后它会保存到我本地hdfs实例上的适当位置。有没有办法配置或初始化