我正在使用YARN在Hadoop集群上运行以下代码。它解析一些电子邮件并执行情感注释,最后将结果DataFrame写入HDFS上的Parquet表。不幸的是,它在HDFS上最后一个数据帧写入的#66行一直失败,错误显示在底部。无论如何,我无法解释为什么每当我使用数据集的一小部分样本时它都会成功终止。objectETLDriver{valappName="ENRON-etl"valconf=newSparkConf().setAppName(appName)valsc=newSparkContext(conf)defmain(args:Array[String]):Unit={valal
我有一个用Scala编写的Spark程序,它从HDFS读取CSV文件,计算一个新列并将其保存为Parquet文件。我在YARN集群中运行该程序。但每次我尝试启动它时,执行程序都会在某个时候因此错误而失败。您能帮我找出可能导致此错误的原因吗?从执行器上登录16/10/2715:58:10WARNstorage.BlockManager:Puttingblockrdd_12_225failedduetoanexception16/10/2715:58:10WARNstorage.BlockManager:Blockrdd_12_225couldnotberemovedasitwasnotf
我有一个spark作业(在spark1.3.1中运行)必须迭代几个键(大约42个)并处理该作业。这是程序的结构从map中获取key从与key匹配的配置单元(下面的hadoop-yarn)中获取数据作为数据框处理数据将结果写入配置单元当我为一个键运行它时,一切正常。当我使用42个键运行时,我在第12次迭代时遇到内存不足异常。有没有办法在每次迭代之间清理内存?帮助表示赞赏。这是我正在使用的高级代码。publicabstractclassSparkRunnable{publicstaticSparkContextsc=null;publicstaticJavaSparkContextjsc=
在我的spark应用程序中,我想在循环中对数据帧执行操作并将结果写入hdfs。伪代码:vardf=emptyDataframeforn=1to200000{someDf=read(n)df=df.mergeWith(somedf)}df.writetohdfs在上面的示例中,当“mergeWith”执行unionAll时,我得到了很好的结果。但是,当我在“mergeWith”中进行(简单的)连接时,工作变得非常慢(>1h,有2个执行器,每个执行器有4个内核)并且永远不会完成(工作自行中止)。在我的场景中,我对仅包含~1mb文本数据的文件进行了约50次迭代。因为合并顺序对我来说很重要,我
有没有办法在Oozie中捕获spark作业的控制台输出?我想在spark作业之后的下一个操作节点中使用特定的打印值。我在想我可以使用${wf:actionData("action-id")["Variable"]}但是oozie似乎没有能力从sparkAction节点捕获输出,这与您可以使用echo"var=12345"的ShellAction不同。然后调用oozie中的wf:actionData以用作整个工作流中的Oozie变量。我想实现这一点,因为我想打印可能处理的记录数并将其存储为oozie变量,并将其用于工作流中的下一个操作节点,而无需执行任何需要您将该数据存储在外部的功能工作
我正在尝试使用scalaFileSystemApi将文件写入HDFS,在客户端和hadoop日志上出现以下错误:File/user/testuser/test.txtcouldonlybereplicatedto0nodesinsteadofminReplication(=1).Thereare1datanode(s)runningand1node(s)areexcludedinthisoperation.testuser有读、写、执行权限。我检查了ambari上的hdfs是否已启动并正在运行,不确定为什么会出现此错误在谷歌搜索错误后,我已经尝试停止所有服务,格式化名称节点并启动所有服
我有一个连接的数据帧,其中包含一个where子句,表明dataStampe在一个范围内:valstartTime=newTimestamp(NewDate.atStartOfDay.toEpochSecond(ZoneOffset.UTC)*1000)valendTime=newTimestamp(NewDate.plusDays(1).atStartOfDay.toEpochSecond(ZoneOffset.UTC)*1000)valjoinedTable=table1.join(table2,table1("date")===table1("key")&&....).where(
我正在尝试使用org.apache.hadoop.fs的文件系统库将文件写入本地FileSystem。下面是我在大scala代码中的一个线性代码,它应该这样做,但事实并非如此。fs.copyToLocalFile(false,hdfsSourcePath,newPath(newFile.getAbsolutePath),true)newFile的值为:valnewFile=newFile(s"${localPath}/fileName.dat")localPath只是一个包含本地磁盘上完整路径的变量。hdfsSourcePath是HDFS位置的完整路径。作业正常执行,但我没有看到在本地
我有一个在Yarn集群上运行的spark应用程序,它需要从S3兼容对象存储上的多个存储桶中读取文件,每个存储桶都有自己的一组凭据。根据hadoopdocumentation应该可以通过设置spark.hadoop.fs.s3a.bucket..access.key=形式的配置来为多个存储桶指定凭证在事件SparkSession但这在实践中对我不起作用。根据文档,我认为应该可行的示例:importorg.apache.spark.sql.{SaveMode,SparkSession}caseclassBucketCredential(bucketName:String,accessKey
我正在编写map缩减应用程序Scala。直到map功能一切正常。但是在编写reducer时我遇到了问题。overridedefreduce(key:Text,values:java.lang.Iterable[Text],context:ReducerContext){}ReducerContext被定义为引用上下文内部类,所以我在这里没问题。问题出在Iterable(Java)组件上。我无法遍历它。我知道首先我将它转换成scalaIterable然后迭代它,我也这样做了但仍然没有得到结果。我已经尝试了scala.collection.JavaConverters._和JavaConv