草庐IT

Scala 嵌套映射到 Spark RDD

我正在尝试将映射列表(Seq[Map[String,Map[String,String]])转换为RDD表/元组,其中映射中的每个键->值对平面映射到元组用外面map的key。例如Map(1->Map('k'->'v','k1'->'v1'))成为(1,'k','v')(1,'k1','v1')我试过下面的方法,但它似乎在并发问题上失败了。我有两个工作节点,它复制了键->值两次(我认为这是因为我做错了)假设我将我的map类型保存在案例类“记录”中valrdd=sc.parallelize(1torecords.length)valrecordsIt=records.iteratorva

hadoop - Spark RDD 持久化和分区

例如在Spark中创建某个RDD时:lines=sc.textFile("README.md")然后在这个RDD上调用一个转换:pythonLines=lines.filter(lambdaline:"Python"inline)如果你在这个转换后的过滤器RDD上调用一个Action(例如pythonlines.first)当他们说每次你运行一个Action时RDD将被重新计算是什么意思在他们身上?我认为在您对该原始RDD调用filter转换后,您使用textFile方法创建的原始RDD不会保留。那么它是否会重新计算最近转换的RDD,在这种情况下,它是我使用过滤器转换创建的RDD?如果

hadoop - Spark RDD 沿袭和存储

inputRDD=sc.textFile("log.txt")errorsRDD=inputRDD.filter(lambdax:"error"inx)warningsRDD=inputRDD.filter(lambdax:"warning"inx)badLinesRDD=errorsRDD.union(warningsRDD)badLinesCount=badLinesRDD.count()warningCount=warningsRDD.count()在上面的代码中,在倒数第二行代码执行之前,不会评估任何转换,您计算了badLinesRDD中的对象数量。因此,当运行此badLine

hadoop - 线程 "main"java.io.IOException : Incomplete HDFS URI, 中的异常没有主机:hdfs Spark RDD

我正在尝试使用Spark将文本文件的内容保存在hdfs中:importorg.apache.spark.{SparkContext,SparkConf}objectFormatTlfHdfs{defmain(args:Array[String]){valconf=newSparkConf().setAppName("Cleandata").setMaster("local").setSparkHome("/usr/lib/spark")valsc=newSparkContext(conf)varvertices=sc.textFile("hdfs:///user/cloudera/ds

scala - 在 scala 中将 Spark Dataframe 转换为 RDD

我正在寻找更好的方法将Dataframe转换为RDD。现在我正在将数据帧转换为集合和循环集合以准备RDD。但我们知道循环不是好的做法。valrandomProduct=scala.collection.mutable.MutableList[Product]()valresults=hiveContext.sql("selectid,valuefromdetails");valcollection=results.collect();vari=0;results.collect.foreach(t=>{valproduct=newProduct(collection(i)(0).asI

hadoop - RDD 存储在哪里?

如果我有一个Hadoop集群,比如说,3个数据节点和1个名称节点,并且在spark代码中我使用类似dataframe.persist(MEMORY_AND_DISK)的东西,这些数据在哪里持久化?它是在Namenode的(驱动程序)内存中还是在Datanode的(执行程序)内存中,还是两者都有?此外,缓存数据的存储是否取决于堆大小?如果是这样,我该如何增加所有节点的堆大小? 最佳答案 NameNode不是driver,Datanode不是executor。YARN框架中的所有Spark进程都发生在ResourceManagers中(

regex - 在 Spark RDD 中使用正则表达式从字符串中提取时间戳

我有一个像这样的日志:[Pipeline]timestamps[Pipeline]{[Pipeline]echo20:33:050[Pipeline]echo我试图只在此处提取时间信息(20:33:05)。我已尝试执行以下操作:vallines=sc.textFile("/logs/log7.txt")valindividualLines=lines.flatMap(_.split("\n"))//Splittingfilecontentintoindividuallinesvaldates=individualLines.filter(value=>value.startsWith(

apache-spark - 如何根据数据大小重新分区rdd

我正在开发SparkStreaming项目,该项目从Kafka获取数据并应用一些规则并将数据保存在Hive中。我的问题是数据摄取率不固定。60秒可能是100万条消息到来,也可能是1条。我想在Dstream上添加重新分区。因为Dstream只有3个分区,无法在一分钟内处理百万条记录。重新分区在少于20条记录时会出现问题。它在Hive中创建多个小文件。dataStream.map(_._2).repartition(20)我的问题是如何根据rdd大小对rdd进行重新分区。这样它就可以处理一条消息或100万条消息。 最佳答案 你无法以任何

scala - 将 n 个元素的 RDD 转换为单个元素的 RDD

我有一个如下所示的字符串RDDvalrdd1:RDD[String]=RDD("a","b","c","d")我想连接上面RDD的元素并将其转换为如下单个元素的RDDRDD("a,b,c,d")这样做的最佳方法是什么? 最佳答案 使用glom函数:valres:RDD[Array[String]]=RDD("a","b","c","d").glom//>res=RDD(Array("a","b","c","d"))然后您可以使用mkString函数将数组转换为字符串。 关于scala-将

hadoop - Apache 星火RDD

工具:带有spark0.9.0的hadoop集群(没有YARN)。理想情况是在HDFS上的名称节点上运行一个spark程序,而数据节点之间没有通信。该程序将执行此操作:举个例子:在HDFS上我有两种类型的数据:A和B,我的集群由3个数据节点组成。我的目标是运行一个可以处理A和1/3B的所有数据的程序。Datanode1与A和B1(第一个三分之一)交互,Datanode2与A和B2(第二个三分之一)交互,Datanode3与A和B3交互...所以为了尊重“机器之间直到最后才通信”的条件,我将不得不在datanode1的内存中有A和B1,在...的内存中有A和B2。程序在每个数据节点上的结