草庐IT

scala-cats

全部标签

scala - 了解 Spark 中的闭包和并行性

我正在尝试了解某些​​事情在Spark中是如何工作的。在示例中如http://spark.apache.org/docs/latest/programming-guide.html#understanding-closures-a-nameclosureslinka表示代码将对RDD中的值求和并将其存储在计数器中,但这里不是这种情况,因为它不起作用。只有当您删除paralelize时,它​​才会起作用。有人可以向我解释一下这是如何工作的吗?还是例子有误?谢谢valdata=Array(1,2,3,4,5)varcounter=0varrdd=sc.parallelize(data)//

scala - Spark yarn 返回退出代码未更新,因为 webUI 中失败 - spark 提交

我正在使用Spark提交通过YARN运行spark作业,在我的spark作业失败后,作业仍显示状态为SUCCEED而不是FAILED。如何将退出代码作为失败状态从代码返回到YARN?我们如何从代码中发送yarn不同的应用程序代码状态? 最佳答案 我认为你做不到。我在spark-1.6.2上遇到过同样的行为,但在分析失败后,我没有看到任何明显的方法可以从我的应用程序发送“错误”退出代码。 关于scala-Sparkyarn返回退出代码未更新,因为webUI中失败-spark提交,我们在St

scala - 通过 JDBC 在 spark 上从远程配置单元读取数据返回空结果

我需要从spark在远程配置单元服务器上执行配置单元查询,但由于某些原因我只收到列名(没有数据)。表中有可用数据,我通过HUE和javajdbc连接检查了它。这是我的代码示例:valtest=spark.read.option("url","jdbc:hive2://remote.hive.server:10000/work_base").option("user","user").option("password","password").option("dbtable","some_table_with_data").option("driver","org.apache.hive

scala - 通过键写入多个输出 Scalding Hadoop,一个 MapReduce 作业

如何在单个MapReduce作业中使用Scalding(/级联)写入依赖于键的多个输出。我当然可以对所有可能的键使用.filter,但这是一个可怕的hack,它会激发许多工作。 最佳答案 有TemplatedTsv在Scalding中(从版本0.9.0rc16及更高版本),与CascadingTemplateTsv完全相同。Tsv(args("input"),('COUNTRY,'GDP)).read.write(TemplatedTsv(args("output"),"%s",'COUNTRY))//itwillcreateadi

scala - 在 Spark/Scala 中将 RDD 转换为数据帧

RDD以Array[Array[String]]格式创建并具有以下值:valrdd:Array[Array[String]]=Array(Array("4580056797","0","2015-07-2910:38:42","0","1","1"),Array("4580056797","0","2015-07-2910:38:43","0","1","1"))我想用模式创建一个数据框:valschemaString="callIdoCallIdcallTimedurationcalltypeswId"后续步骤:scala>valrowRDD=rdd.map(p=>Array(p(0

scala - 如何在连接中选择数据框的所有列 - Spark-scala

我正在连接2个数据框并选择左框的所有列,例如:valjoin_df=first_df.join(second_df,first_df("id")===second_df("id"),"left_outer")在上面我想做selectfirst_df.*。如何在join中选择一帧的所有列? 最佳答案 别名:first_df.alias("fst").join(second_df,Seq("id"),"left_outer").select("fst.*") 关于scala-如何在连接中选择

scala - 如何在 Scala 2.9.0 中实现 Hadoop 映射器?

当我从2.8.1迁移到Scala2.9.0时,除了Hadoop映射器之外,所有代码都可以正常运行。因为我有一些包装器对象,所以我提炼成以下示例:importorg.apache.hadoop.mapreduce.{Mapper,Job}objectMyJob{defmain(args:Array[String]){valjob=newJob(newConfiguration())job.setMapperClass(classOf[MyMapper])}}classMyMapperextendsMapper[LongWritable,Text,Text,Text]{overridede

scala - Spark 和 HBase 快照

假设如果直接从HDFS中提取而不是使用HBaseAPI,我们可以更快地访问数据,我们正在尝试基于HBase的表快照构建RDD。所以,我有一个名为“dm_test_snap”的快照。我似乎能够使大部分配置工作正常,但我的RDD为空(尽管快照本身中有数据)。我很难找到任何人使用Spark对HBase快照进行离线分析的示例,但我不敢相信只有我一个人在尝试让这项工作正常进行。非常感谢任何帮助或建议。这是我的代码片段:objectTestSnap{defmain(args:Array[String]){valconfig=ConfigFactory.load()valhbaseRootDir=c

scala - 如何使用 Spark 在 S3 中捆绑多个文件

我在S3中有2000万个文件,跨越大约8000天。文件按UTC时间戳组织,如下所示:s3://mybucket/path/txt/YYYY/MM/DD/filename.txt.gz。每个文件都是UTF-8文本,包含0(空)到100KB的文本(第95个百分位数,尽管有一些文件高达数MB)。使用Spark和Scala(我对两者都不熟悉,想学习),我想保存“每日包”(其中8000个),每个包包含当天找到的任意数量的文件。理想情况下,我想存储原始文件名及其内容。输出也应驻留在S3中并以某种适合在进一步的Spark步骤和实验中输入的格式进行压缩。一个想法是将包存储为一堆JSON对象(每行一个,

scala - 将多个小文件合并到 Spark 中的几个大文件中

我通过Spark使用配置单元。我的spark代码中有一个Insertintopartitionedtable查询。输入数据为200+gb。当Spark写入分区表时,它会吐出非常小的文件(kb的文件)。所以现在输出分区表文件夹有5000多个小kb文件。我想将这些合并到几个大MB文件中,可能是几个200mb文件。我厌倦了使用配置单元合并设置,但它们似乎不起作用。'valresult7A=hiveContext.sql("sethive.exec.dynamic.partition=true")valresult7B=hiveContext.sql("sethive.exec.dynamic