草庐IT

java - 计算 RDD 中的行数

我在java中使用spark,我有一个500万行的RDD。有没有一种解决方案可以让我计算我的RDD的行数。我试过RDD.count()但这需要很多时间。我已经看到我可以使用函数fold。但是我没有找到这个函数的java文档。您能否告诉我如何使用它或告诉我另一种解决方案来获取我的RDD的行数。这是我的代码:JavaPairRDDlines=getAllCustomers(sc).cache();JavaPairRDDCFIDNotNull=lines.filter(notNull()).cache();JavaPairRDD>join=lines.join(CFIDNotNull).ca

Spark RDD过滤器按元素类

我有一个带有不同类型元素的RDD,我想通过它们的类型来计算它们,例如,下面的代码将正确起作用。scala>valrdd=sc.parallelize(List(1,2.0,"abc"))rdd:org.apache.spark.rdd.RDD[Any]=ParallelCollectionRDD[0]atparallelizeat:24scala>rdd.filter{casez:Int=>true;case_=>false}.countres0:Long=1scala>rdd.filter{casez:String=>true;case_=>false}.countres1:Long=1现在

RDD的创建

有3种方式可以创建RDD。分别如下介绍:1、由集合创建RDDSpark会将集合中的数据拷贝到集群上去,形成一个分布式的数据集合,也就是一个RDD;相当于集合中的一部分数据会到一个节点上,而另一部分数据会到其他节点上;然后就可以用并行的方式来操作这个分布式数据集合。valrdd=sc.parallelize(List(1,2,3,4,5,6)rdd.countvalrdd=sc.parallelize(List(1,2,3,4,5,6),3)rdd.count上面两种写法结果是一样的,只是分区数不一样。通过WebUI可以发现它们的任务数量不一样。从RDD的特性来看,有多少个分区就有多少个任务,它

python - 在 pyspark RDD 上显示分区

pysparkRDD文档http://spark.apache.org/docs/1.2.1/api/python/pyspark.html#pyspark.RDD没有显示任何显示RDD分区信息的方法。有没有办法在不执行额外步骤的情况下获取该信息,例如:myrdd.mapPartitions(lambdax:iter[1]).sum()以上确实有效..但似乎需要额外的努力。 最佳答案 我错过了:很简单:rdd.getNumPartitions()不再使用java风格的getFooMethod();)更新:添加来自@dnlbrky的评

hadoop - Spark : Avro RDD to csv

我能够将arvo文件读入avroRDD并尝试转换为csvRDD,其中包含以逗号分隔的所有值。使用以下代码,我可以将特定字段读入csvRDD。valcsvRDD=avroRDD.map({case(u,_)=>u.datum.get("empname")})如何将所有值读入csvRDD而不是指定字段名称。我的结果csvRDD应该包含如下记录(100,John,25,IN)(101,Ricky,38,AUS)(102,Chris,68,US) 最佳答案 将Spark1.2+与Spark-Avrointegrationlibrary结合使

mongodb - 使用java将rdd保存到mongo数据库中

我正在尝试使用Java在MongoDB中保存推文,这就是我所拥有的;JavaStreamingContextssc=newJavaStreamingContext(sc,newDuration(3000));JavaDStreamtweets=TwitterUtils.createStream(ssc);JavaDStreamstatuses=tweets.map(newFunction(){publicStringcall(Statusstatus){returnstatus.getUser().getName()+":"+status.getText();}});JavaDStre

hadoop - RDD 的内存数据

我一直在使用Spark,我很好奇RDD的工作原理。我知道RDD是指向数据的指针。如果我尝试为HDFS文件创建RDD,我知道RDD将是指向HDFS文件上实际数据的指针。我不明白数据在内存中的存储位置。当任务发送到工作节点时,特定分区的数据是否存储在该工作节点的内存中?如果是这样,当RDD分区存储在workernode1的内存中,但workernode2必须为RDD的同一分区计算任务时会发生什么?workernode2是否与workernode1通信以获取分区的数据并将其存储在自己的内存中? 最佳答案 原则上,任务在执行器之间划分,每个

hadoop - Spark 元组获取每个键的详细信息/rdd

我有这些行:(key1,Illinois|111|67342|...)(key1,Illinois|121|67142|...)(key2,Hawaii|113|67343|...)(key1,Illinois|211|67442|...)(key3,Hawaii|153|66343|...)(key3,Ohio|193|68343|...)(1)如何获得唯一key?(2)如何获取每个键的行数(键1-3行,键2-1行,键3-2行...因此输出为:3,1,2)(3)如何获取每个键的行的字节大小(5MB,2MB,3MB)编辑1.这是我的新代码:valrdd:RDD[(String,Arra

hadoop - 为什么我们创建RDD来保存Hbase中的数据?还有其他方法可以在 Hbase 中保存数据吗?

我对大数据、Hadoop和Spark完全陌生。我来自Java背景。所以我试图理解为什么人们总是创建RDD来将数据集保存在HBASE中。谁能详细告诉我一下。还有其他方法吗? 最佳答案 在Spark中,一切都归结为RDD。即包括数据框。AFAIK,Spark和hbase没有其他选择,如果您使用spark作为分布式框架通过RDD实现其目标,不可变分区容错由粗粒度操作创建延迟评估可以持久化不变性和分区RDD由分区的记录集合组成。分区是RDD中并行的基本单元,每个分区是数据的一个逻辑分区,分区是不可变的,是通过对现有分区进行一些转换而创建的。

scala - 给定核心和执行程序的数量,如何确定 spark 中 rdd 的分区数量?

10个节点集群有20个执行器和代码读取一个包含100个文件的文件夹的分区数是多少? 最佳答案 它在您运行的不同模式下是不同的,您可以使用spark.default.parallelism设置对其进行调整。来自Spark文档:ForoperationslikeparallelizewithnoparentRDDs,itdependsontheclustermanager:Localmode:numberofcoresonthelocalmachineMesosfinegrainedmode:8Others:totalnumberofc