草庐IT

sparking

全部标签

scala - 停止 Spark Streaming : exception in the cleaner thread but it will continue to run

我正在开发一个Spark-Streaming应用程序,我只是想获得一个KafkaDirectStream工作的简单示例:packagecom.usernameimport_root_.kafka.serializer.StringDecoderimportorg.apache.spark.sql.SparkSessionimportorg.apache.spark.streaming.kafka._importorg.apache.spark.streaming.{Seconds,StreamingContext}objectMyAppextendsApp{valtopic=args(

scala - Spark Streaming dropDuplicates

Spark2.1.1(scalaapi)从s3位置流式传输json文件。我想根据在json中为每条记录找到的ID列(“event_id”)对所有传入记录进行重复数据删除。我不在乎保留了哪份记录,即使记录只是部分重复。我正在使用追加模式,因为数据只是通过spark.sql()方法被丰富/过滤,没有分组依据/窗口聚合。然后我使用追加模式将Parquet文件写入s3。根据文档,我应该能够使用不加水印的dropDuplicates来进行重复数据删除(显然这在长时间运行的生产中无效)。但是,这失败并出现错误:用户类抛出异常:org.apache.spark.sql.AnalysisExcepti

hadoop - 为什么预分区会因为减少洗牌而有利于 Spark 工作?

很多教程都提到RDD的pre-partition会优化sparkjobs的datashuffling。令我困惑的是,据我了解,预分区也会导致洗牌,为什么在这里提前洗牌会有利于某些操作?特别是sparkitself会针对一组转换做优化。例如:如果我想加入两个数据集国家(id,国家)和收入(id,(收入,月,年)),这两种操作有什么区别?(我使用PySpark架构)按id预分区country=country.partitionBy(10).persist()income=income.partitionBy(10).persist()income.join(country)不预分区直接加入

hadoop - Spark 不会在 yarn-cluster 模式下运行 final `saveAsNewAPIHadoopFile` 方法

我编写了一个Spark应用程序,它读取一些CSV文件(~5-10GB),转换数据并将数据转换为HFiles。数据从HDFS读取并保存到HDFS。当我在yarn-client中运行应用程序时,一切似乎都工作正常模式。但是当我尝试以yarn-cluster运行它时应用程序,进程似乎没有运行最终saveAsNewAPIHadoopFile对我已转换并准备好保存的RDD采取行动!这是我的SparkUI的快照,您可以在其中看到所有其他作业都已处理:以及相应的阶段:这是我应用程序的最后一步,其中saveAsNewAPIHadoopFile方法被调用:JavaPairRDDcells=...try{

java - 从 Java Spark 读取时未读 block 数据

我试图从HDFS和/或文件系统中读取一些文件,但我得到了这个异常Driverstacktrace:][unreadblockdata]]org.apache.spark.SparkException:Jobabortedduetostagefailure:Task0instage0.0failed4times,mostrecentfailure:Losttask0.3instage0.0(TID6,C-4073.CM.ES,executor1):java.lang.IllegalStateException:unreadblockdataatjava.io.ObjectInputStr

scala - 循环遍历文件行并通过 Spark 中的每次迭代执行函数

我在HDFS中有一个名为file1的文件,其中包含以下几行:(每一行都是一个目录路径)this/is/path1this/is/path2this/is/path3...this/is/path1000ormore我有一个ScalaSpark函数如下:valresultset=sc.hadoopFile(inputpath,classOf[TextInputFormat],classOf[LongWritable],classOf[Text]).flatMap{case(k,v)=>if(k.get==0)Seq(v.toString)elseSeq.empty[String]}我想传

apache - Spark 选择查询在配置单元表中的大型数据集上失败

我下面的代码是使用spark从配置单元表中读取数据。该表中有1亿条记录。当我在我的Rdd中选择这么多记录并尝试执行result.show()时,它给出了严重的问题异常。我基本上想通过从该表中为1亿条记录集选择几列来在其他表中插入记录。这是我的代码:importorg.apache.spark.sql.functions._importorg.apache.spark.sql._valsqlContext=neworg.apache.spark.sql.SQLContext(sc)valhiveContext=neworg.apache.spark.sql.hive.HiveContex

scala - Scala Spark 作业需要多长时间才能处理一个文件中的一百万行?

我在HDFS中有一个名为file1的文件,其中包含多个文件的路径:this/is/path1this/is/path2this/is/path3...this/is/path1000000如果我通过在Scala中执行以下行从该文件中获取所有行作为列表,vallines=Source.fromFile("/my/path/file1.txt").getLines.toList如果我如下使用“for”循环,在一个单独的函数中处理file1的每一行,该函数涉及每一行的一些映射功能,for(iif(k.get==0)Seq(v.toString)elseSeq.empty[String]}}假

scala - 使用 Spark sc.textFile 读取文件时如何捕获 BlockMissingException?

当读取存储在HDFS上的文本文件时,如果我在使用sc.textFile读取这些文件时遇到BlockMissingException(或其他一些异常),我该如何捕获错误并继续执行emptyRDD?我可能遇到BlockMissingException的原因是,例如,如果文件存储在复制因子为1的HDFS上并且数据节点出现故障。考虑以下最小示例代码:valmyRDD:RDD[String]=try{sc.textFile("hdfs:///path/to/fileWithMissingBlock")}catch{casee:BlockMissingException=>println("mis

hadoop - 在 Yarn 集群上运行时 Spark 批处理未完成

设置场景我正在努力使Spark流应用程序(Spark2.2.1withScala)在Yarn集群(Hadoop2.7.4)上运行。到目前为止,我已经成功地使用spark-submit将应用程序提交到Yarn集群。我可以看到接收器任务正确启动并从数据库(CouchbaseServer5.0)中获取大量记录,我还可以看到记录被分成批处理。问题当我查看SparkWebUI上的流统计信息时,我可以看到我的批处理从未被处理过。我看到有0条记录的批处理处理并完成,但是当有记录的批处理开始处理时,它永远不会完成。有一次它甚至卡在了0条记录的批处理上。我什至尝试尽可能简化SteamingContext