草庐IT

scala-cats

全部标签

scala - 我可以从 Spark 程序而不是 RDD 写入纯文本 HDFS(或本地)文件吗?

我有一个Spark程序(在Scala中)和一个SparkContext。我正在使用RDD的saveAsTextFile编写一些文件。在我的本地机器上,我可以使用本地文件路径,它与本地文件系统一起工作。在我的集群上,它适用于HDFS。我还想写入其他任意文件作为处理结果。我在我的本地机器上将它们作为常规文件写入,但希望它们进入集群上的HDFS。SparkContext似乎有一些与文件相关的方法,但它们似乎都是输入而不是输出。我该怎么做? 最佳答案 感谢marios和kostya,但是从Spark将文本文件写入HDFS只需几个步骤。//H

scala - Apache Spark 处理倾斜数据

我有两个表想连接在一起。其中一个有非常严重的数据偏差。这导致我的Spark作业无法并行运行,因为大部分工作都在一个分区上完成。我听过、读过并尝试对我的key进行加盐以增加分发。https://www.youtube.com/watch?v=WyfHUNnMutg在12:45秒正是我想要做的。如有任何帮助或提示,我们将不胜感激。谢谢! 最佳答案 是的,您应该在较大的表上使用盐渍键(通过随机化),然后复制较小的表/笛卡尔将其连接到新的盐渍表:这里有一些建议:TresataskewjoinRDDhttps://github.com/tre

scala - Spark 在大型洗牌作业上失败,出现 java.io.IOException : Filesystem closed

我经常发现spark在处理大型作业时失败,并出现无用的无意义异常。工作日志看起来正常,没有错误,但它们的状态为“KILLED”。这对于大型随机播放非常常见,因此像.distinct这样的操作。问题是,我如何诊断出了什么问题,理想情况下,我该如何修复它?考虑到这些操作中有很多是幺半群的,我一直在通过将数据分成例如10个block,在每个block上运行应用程序,然后在所有结果输出上运行应用程序来解决这个问题。换句话说-元映射减少。14/06/0412:56:09ERRORclient.AppClient$ClientActor:Masterremovedourapplication:FA

scala - 尝试写入 hdfs : Server IPC version 9 cannot communicate with client version 4 时出错

我正在尝试使用scala将文件写入hdfs,但我不断收到以下错误Causedby:org.apache.hadoop.ipc.RemoteException:ServerIPCversion9cannotcommunicatewithclientversion4atorg.apache.hadoop.ipc.Client.call(Client.java:1113)atorg.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:229)atcom.sun.proxy.$Proxy1.getProtocolVersion(UnknownSource

scala - 使用scala读取后如何删除hdfs目录中的文件?

我使用fileStream从Spark(流上下文)读取hdfs目录中的文件。如果我的Spark关闭并在一段时间后启动,我想读取目录中的新文件。我不想读取目录中已被Spark读取和处理的旧文件。我在这里尽量避免重复。vallines=ssc.fileStream[LongWritable,Text,TextInputFormat]("/home/File")有什么代码片段可以提供帮助吗? 最佳答案 您可以使用FileSystemAPI:importorg.apache.hadoop.fs.{FileSystem,Path}valfs=

scala - 将 S3(法兰克福)与 Spark 结合使用

有人使用hadoop/spark1.6.0在Frankfurt上使用s3吗?我正在尝试将作业的结果存储在s3上,我的依赖项声明如下:"org.apache.spark"%%"spark-core"%"1.6.0"exclude("org.apache.hadoop","hadoop-client"),"org.apache.spark"%%"spark-sql"%"1.6.0","org.apache.hadoop"%"hadoop-client"%"2.7.2","org.apache.hadoop"%"hadoop-aws"%"2.7.2"我设置了以下配置:System.setPr

scala - Scala 并发模型上下文中的 Clojure future

在接触了scala的Actors和Clojure的Futures之后,感觉这两种语言对多核数据处理的支持都非常好。但是,我仍然无法确定这两种模型的并发特性和优缺点之间真正的工程差异。这些语言在处理并发过程抽象方面是互补的还是相反的?其次,关于大数据问题,不清楚scala社区是否继续明确支持Hadoop(而clojure社区显然支持)。Scala开发人员如何与hadoop生态系统交互? 最佳答案 有些解决方案可以由代理人/Actor很好地解决,有些则不能。这种区别实际上与语言无关,而更多地是关于特定问题如何适应一般类别的解决方案。这是

scala - Spark Yarn 架构

我对我正在学习的教程中的这张图片有疑问。因此,基于yarn架构中的这张图片,spark应用程序的执行看起来像这样:首先,您有一个在客户端节点或某个数据节点上运行的驱动程序。在这个驱动程序中(类似于java中的驱动程序?)包含您提交给Spark上下文的代码(用java、python、scala等编写)。然后该spark上下文表示与HDFS的连接,并将您的请求提交给Hadoop生态系统中的资源管理器。然后资源管理器与名称节点通信,以确定集群中哪些数据节点包含客户端节点请求的信息。spark上下文还将在将运行任务的工作节点上放置一个执行程序。然后节点管理器将启动执行器,该执行器将运行Spar

scala - 如何使用 Avro 文件上的架构在 Spark 中加载 Avros?

我正在从Clouderaparcel运行CDH4.4和Spark0.9.0。我有一堆通过Pig的AvroStorageUDF创建的Avro文件。我想使用通用记录或Avro文件上的模式将这些文件加载​​到Spark中。到目前为止,我已经试过了:importorg.apache.avro.mapred.AvroKeyimportorg.apache.avro.mapreduce.AvroKeyInputFormatimportorg.apache.hadoop.io.NullWritableimportorg.apache.commons.lang.StringEscapeUtils.es

scala - 如何在 Spark 中获取 map task 的 ID?

有没有办法在Spark中获取maptask的ID?例如,如果每个map任务调用一个用户定义函数,我能否从该用户定义函数中获取该map任务的ID? 最佳答案 我不确定您所说的maptaskID是什么意思,但您可以使用TaskContext访问任务信息:importorg.apache.spark.TaskContextsc.parallelize(Seq[Int](),4).mapPartitions(_=>{valctx=TaskContext.getvalstageId=ctx.stageIdvalpartId=ctx.parti