草庐IT

scala - 使用 Spark 中的动态列将 RDD 数据写入 CSV - Scala

我正在从HDFS目录读取多个文件,并且对于每个文件,生成的数据使用以下方式打印:frequencies.foreach(x=>println(x._1+":"+x._2))打印的数据是(对于File1.txt):'text':45'data':100'push':150其他文件的key可能不同,例如(File2.txt):'data':45'lea':100'jmp':150key不一定在所有文件中都相同。我希望将所有文件数据写入以下格式的.csv文件:FilenametextdatapushleajmpFile1.txt4510015000File2.txt0450100150...

hadoop - 为缓存的 RDD 分配了多少内存?

我有一个5个工作节点集群,每个集群有6GB内存(Spark执行程序内存设置为4608GB)。我一直在用尽内存,Spark告诉我我的一位执行者试图使用超过5.0GB的内存。如果每个执行程序获得5GB的内存,那么我的整个集群之间应该有25GB的内存。ExecutorLostFailure(executor4exitedcausedbyoneoftherunningtasks)Reason:ContainerkilledbyYARNforexceedingmemorylimits.5.0GBof5.0GBphysicalmemoryused.Considerboostingspark.yar

apache-spark - PySpark:使用具有 1000 个字段但具有可变列数的行的模式创建 RDD->DF->Parquet

我正在尝试读取一个ElasticSearch索引,它有数百万个文档,每个文档都有可变数量的字段。我有一个模式,其中有1000个字段,每个字段都有自己的名称和类型。现在,当我通过ES-Hadoop连接器创建一个RDD并稍后通过指定模式转换为一个DataFrame时,它没有说-Inputrowdoesn'thaveexpectednumberofvaluesrequiredbytheschema我有几个问题。1.是否有可能有一个RDD/DF的行包含可变数量的字段?如果不是,除了为每列中缺失的字段添加空值外,还有什么替代方法?我看到默认情况下Spark将所有内容转换为StringType,因

scala - 如何在Spark中找到RDD的长度

这个问题在这里已经有了答案:HowtofindsparkRDD/Dataframesize?(3个答案)关闭4年前。如何找到下面RDD的长度?varmark=sc.parallelize(List(1,2,3,4,5,6))scala>mark.map(l=>l.length).collect:27:error:valuelengthisnotamemberofIntmark.map(l=>l.length).collect

scala - 如何对 RDD 进行分区

我有一个文本文件,其中包含大量由空格分隔的随机浮点值。我正在将此文件加载到Scala中的RDD中。这个RDD是如何分区的?此外,是否有任何方法可以生成自定义分区,以便所有分区具有相同数量的元素以及每个分区的索引?valdRDD=sc.textFile("hdfs://master:54310/Data/input*")keyval=dRDD.map(x=>process(x.trim().split('').map(_.toDouble),query_norm,m,r))我在这里从HDFS加载多个文本文件,进程是我调用的函数。我可以使用mapPartitonsWithIndex解决方案

python - Spark 和 Python 使用自定义文件格式/生成器作为 RDD 的输入

我想问一下Spark中的输入可能性。我可以从http://spark.apache.org/docs/latest/programming-guide.html看到,我可以使用sc.textFile()将文本文件读取到RDD,但我想做一些预处理,然后再分发到RDD,例如我的文件可能是JSON格式例如。{id:123,text:"...",value:6}我只想使用JSON的某些字段进行进一步处理。我的想法是,是否有可能以某种方式使用Python生成器作为SparkContext的输入?或者如果Spark中有一些更自然的方式来处理自定义文件,而不是由Spark处理纯文本文件?编辑:似乎接

scala - 转换 RDD 中的字符串集合

我正在尝试解析HL7值,因为我使用了几个jar文件,这些jar文件从字符串中解析HL7值,我执行了代码并且它在scala中运行良好,但现在我想要运行相同的东西scala为此,我在示例中使用了以下代码,但出现了以下错误。所以为了解决这个问题,我想将org.apache.spark.rdd.RDD[String]转换为String。代码:objectExampleUseTerser{defmain(args:Array[String]):Unit={valtextfile=sc.textFile("/user/cloudera/xxxx/File")valcontext:HapiConte

hadoop - 将 Spark RDD 保存到 Hive 表

在spark中,我想将RDD对象保存到配置单元表中。我正在尝试使用createDataFrame但这是抛出Exceptioninthread"main"java.lang.NullPointerExceptionvalproducts=sc.parallelize(evaluatedProducts.toList);//hereproductsareRDD[Product]valproductdf=hiveContext.createDataFrame(products,classOf[Product])我使用的是Spark1.5版本。 最佳答案

python - RDD 只有第一列值 : Hbase, PySpark

我们正在使用以下命令使用Pyspark读取Hbase表。frompyspark.sql.typesimport*host=port=keyConv="org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"valueConv="org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"cmdata_conf={"hbase.zookeeper.property.clientPort":port

scala - 何时坚持以及何时取消坚持 Spark 中的 RDD

假设我有以下内容:valdataset2=dataset1.persist(StorageLevel.MEMORY_AND_DISK)valdataset3=dataset2.map(.....)如果您对dataset2进行转换,那么您必须持久化它并将其传递给dataset3并取消持久化之前的数据?我正在尝试确定何时持久化和取消持久化RDD。对于每一个新创建的rdd,我都必须坚持它吗?谢谢 最佳答案 Spark自动监控每个节点上的缓存使用情况,并以最近最少使用(LRU)的方式删除旧数据分区。如果您想手动删除RDD而不是等待它从缓存中