草庐IT

language-scala

全部标签

scala - 学习mapreduce,如何在map reduce数据流中翻译SQL命令。字数统计示例不能满足我的理解。

在网上,我看到了很多关于规范字数统计图减少遍历的示例。我了解k,v的映射器输入=>以减少k,list(v)的输入。mapreduce带来了一些神奇的效果。我不太明白如何将mapreduce应用于更实际的示例。例如:假设我有一个文件,其中包含美国所有员工的薪水以及一些其他详细信息,例如州和城市等......mapreduce如何工作以提供包含以下列汇总的输出报告?州,城市,平均(工资)在SQL中,我可以通过这样的查询得到它:Selectstate,city,avg(salaries)Fromemployee_tblGroupbystate,citymapreduce将如何为我提供上述结果

scala - 如何枚举HDFS目录中的文件

如何枚举HDFS目录中的文件?这是为了使用Scala枚举ApacheSpark集群中的文件。我看到有sc.textfile()选项,但它也会读取内容。我只想读取文件名。我实际上尝试了listStatus。但是没有用。得到以下错误。我正在使用AzureHDInsightSpark,Blob存储文件夹“testContainer@testhdi.blob.core.windows.net/example/”包含.json文件。valfs=FileSystem.get(newConfiguration())valstatus=fs.listStatus(newPath("wasb://tes

scala - 使用嵌套字段更新数据框 - Spark

这个问题在这里已经有了答案:AddinganestedcolumntoSparkDataFrame(1个回答)关闭3年前。我有如下两个数据框Df1+----------------------+---------+|products|visitorId|+----------------------+---------+|[[i1,0.68],[i2,0.42]]|v1||[[i1,0.78],[i3,0.11]]|v2|+----------------------+---------+Df2+---+----------+|id|name|+---+----------+|i1|N

scala - 如何从 Scala 代码读取 HDFS 文件

我是Scala和HDFS的新手:我只是想知道我能够从Scala代码读取本地文件,但如何从HDFS读取:importscala.io.sourceobjectReadLine{defmain(args:Array[String]){if(args.length>0){for(line在参数中我已经传递了hdfs://localhost:9000/usr/local/log_data/file1..但它给出了FileNotFoundException错误我肯定错过了一些东西..任何人都可以帮我吗? 最佳答案 scala.io.sourc

scala - ZooKeeper 返回 HBase 服务器地址的垃圾字符

这个问题不太可能帮助任何future的访问者;它只与一个小的地理区域、一个特定的时间点或一个非常狭窄的情况有关,这些情况并不普遍适用于互联网的全局受众。为了帮助使这个问题更广泛地适用,visitthehelpcenter.关闭10年前。最近怎么样?对不起,如果我在这里听起来很愚蠢。我正在尝试创建一个基本的play2.0-HBase应用程序。当我尝试从游戏连接HBase时,ZooKeeper向我返回HBase服务器地址的垃圾字符。defmain(args:Array[String]):Unit={valconf=HBaseConfigurationcreatevaladmin=newHB

java - Scala MapReduce 框架提供类型不匹配

我在Scala中有一个基于多个org.apache.hadoop库的MapReduce框架。它适用于一个简单的字数统计程序。但是,我想将它应用到有用的东西上,但遇到了障碍。我想获取一个csv文件(或任何分隔符)并将第一列中的任何内容作为键传递,然后计算键的发生率。映射器代码如下所示classWordCountMapperextendsMapper[LongWritable,Text,Text,LongWritable]withHImplicits{protectedoverridedefmap(lnNumber:LongWritable,line:Text,context:Mapper

scala - Spark集群提交无法绑定(bind)slave地址

ERRORnetty.NettyTransport:failedtobindtospark.master/172.28.128.3:0,shuttingdownNettytransport15/03/1604:08:50WARNutil.Utils:Service'Driver'couldnotbindonport0.Attemptingport1.^^^是我从我的从属日志中得到的错误。我正在使用spark-submit提交我的工作。这是没有意义的,因为从站能够连接到主站,如web-ui所示。我认为我已经配置了正确的端口,如下所示是我在所有机器上的配置。Spark-Env.shexpo

scala - 何时坚持以及何时取消坚持 Spark 中的 RDD

假设我有以下内容:valdataset2=dataset1.persist(StorageLevel.MEMORY_AND_DISK)valdataset3=dataset2.map(.....)如果您对dataset2进行转换,那么您必须持久化它并将其传递给dataset3并取消持久化之前的数据?我正在尝试确定何时持久化和取消持久化RDD。对于每一个新创建的rdd,我都必须坚持它吗?谢谢 最佳答案 Spark自动监控每个节点上的缓存使用情况,并以最近最少使用(LRU)的方式删除旧数据分区。如果您想手动删除RDD而不是等待它从缓存中

scala - 读取 s3 存储桶时出错

我在尝试使用spark从s3读取文件时遇到异常。错误和代码如下。该文件夹由许多名为part-00000part-00001等的文件组成,这些文件来自hadoop。它们的文件大小范围从0kb到几gb16/04/0715:38:58INFONativeS3FileSystem:Openingkey'titlematching214/1.0/bypublicdemand/part-00000'forreadingatposition'0'16/04/0715:38:58ERRORExecutor:Exceptionintask0.0instage0.0(TID0)org.apache.had

Scala 和 Hive : best way to write a generic method that works with all types of Writable

我正在玩在Scala中为Hive编写通用UDF。我的第一个测试是编写一个函数来对数组(复杂数据类型)求和。我的代码stub如下所示(因为这是stub,请忽略asInstanceOf的用法:D):...classSumElementsextendsGenericUDF{protectedvalexpectedCategories:Array[Category]=Array(ObjectInspector.Category.LIST)protectedvarlistInspector:ListObjectInspector=_@throws(classOf[UDFNullArgumentE