草庐IT

scala-cats

全部标签

scala - 使用 Spark/Scala 根据列值减少组中的行

我想根据以下条件在减少每个组中的行的意义上实现Netting:-如果UNITS列在每组中的行中具有负值和正值,则进行算术求和。最后一行将有来自具有的行的Amt量越大。-如果Units在一个组中只有正值或负值,那么我们将按原样传递所有行在下面的数据集中,我想做Netty计算但无法弄清楚,因为这不是聚合:+-----+------+----+-----+|store|prod|amt|units|+-----+------+----+-----+|West|Apple|2.0|-10||West|Apple|3.0|10||West|Orange|5.0|-15||West|Orange|

java - 如何在 Scala 中忽略 "NullPointerException"并继续完成 MapReduce 任务的下一个工作?

我正在我的UbuntuVM中运行Hadoop-MapReduce作业。在中间阶段,我需要检查Hbase数据库是否存在某个字符串数组,如果没有找到则什么都不做并“继续”到下一个作业,而不抛出“NullPointerException”。下面的Scala代码(版本2.11.7)片段检查Hbase数据库中是否存在先前从MapReduce作业创建的行(即字符串数组)。为此,它首先使用.readColocationStoreRow()根据rowName输入读取hbase中的该行,然后如果存在则将其拆分,然后使用.contains()检查该数组的部分是否(由分隔符#分隔)存在于那里。整个作业迭代运

scala - 如何在 Scala/Spark 中将文件从 Hadoop (hdfs) 复制到远程 SFTP 服务器?

在Hadoop的文件系统中,我有Excel文件。我的任务是将该文件从Hadoop复制到我的Scala/Spark应用程序中的远程SFTP服务器。我认为直接这样做是行不通的。如果我的担心是正确的,我需要采取后续步骤:1)从Hadoop中删除excel文件到本地目录。例如,我可以使用ScalaDSL来实现:importscala.sys.process._s"hdfsdfs-copyToLocal/hadoop_path/file_name.xlsx/local_path/"!2)从本地目录发送文件到远程SFTP服务器。您可以为这项任务推荐哪种图书馆?我的推理正确吗?解决我的问题的最佳方法

scala - 检查数据框中列的计数并添加列并计数为 Map

我是一个Scala初学者。我试图在表的列中查找空值的计数,并在Map中添加列名和计数作为键值对。下面的代码没有按预期工作。请指导我如何修改此代码以使其工作defnullCheck(databaseName:String,tableName:String)={varmap=scala.collection.mutable.Map[String,Int]()validationColumn=Array(col1,col2)for(i函数应该返回((col1,count),(col2,count))作为Map 最佳答案 这可以通过创建一个

scala - 使用自定义 Hadoop 输入格式在 Spark 中处理二进制文件

我开发了一个基于hadoop的解决方案来处理二进制文件。这使用经典的hadoopMR技术。二进制文件大约10GB,分为73个HDFSblock,写成map进程的业务逻辑分别在这73个block上运行。我们在Hadoop中开发了一个customInputFormat和CustomRecordReader,将key(intWritable)和value(BytesWritable)返回给map函数。该值只是HDFSblock的内容(二进制数据)。业务逻辑知道如何读取这些数据。现在,我想将这段代码移植到spark中。我是spark的初学者,可以在spark中运行简单的示例(wordcount

Scala mapreduce WordCount 程序

我为字数统计编写了这个Scala程序。下面给出了主类objectaaa{defmain(args:Array[String]):Int={valconf=newConfiguration()valotherArgs=newGenericOptionsParser(conf,args).getRemainingArgsif(otherArgs.length!=2){println("Usage:wordcount")return2}valjob=newJob(conf,"wordcount")job.setJarByClass(classOf[TokenizerMapper])job.s

scala - Spark 中的 RDD 持久化

这个问题在这里已经有了答案:(Why)doweneedtocallcacheorpersistonaRDD(5个答案)关闭7年前。我有一个关于RDD何时存储在内存中的问题。假设我有这段代码:valdataset=originalDataset.flatMap(data=>modifyDatasetFormat(data,mappingsInMap)).persist(StorageLevel.MEMORY_AND_DISK)到目前为止,我有一个RDD存储在每个工作节点的内存中。问题:如果我对这个RDD进行另一个转换或操作,这个持久性是否会停止存在并且我应该创建另一个或者它与它没有任何关

scala - Spark - SQL : value implicits is not a member of org. apache.spark.sql.SQLContext

您好,请找到下面的代码和相应的错误:即使我使用了导入语句,但仍然出现错误importorg.apache.spark.sql._valsparkConf=newSparkConf().setAppName("new_proj")implicitvalsc=newSparkContext(sparkConf)valsqlContext=neworg.apache.spark.sql.SQLContext(sc)importsqlContext._importsqlContext.implicits._valprojects=sqlContext.read.json("/part-m-00

scala - 由于类型不匹配(单元和字符串),不能像这样在 reduce 中打印?

我想打印一个文件中的内容,下面的代码是我如何做到的。importorg.apache.spark.SparkContextimportorg.apache.spark.SparkContext._importorg.apache.spark.SparkConfimportorg.apache.spark.rdd.RDDobjectSimpleSpark{defmain(arg:Array[String])={valdistFile=sc.textFile("/a/path/to/a/file")valaClass:MyClass=newMyClass()valmappedRDD=aCl

java - 如何通过 Yarn、Hadoop 提交 Spark scala 作业

我是Spark的新手,我正在尝试在伪分布式Hadoop系统上运行Scala作业。Hadoop2.6+Yarn+Spark1.6.1+scala2.10.6+JVM8,一切从头开始安装。我的Scala应用程序是简单的WordCount示例,我不知道错误是什么。/usr/local/sparkapps/WordCount/src/main/scala/com/mydomain/spark/wordcount/WordCount.scalapackagecom.mydomain.spark.wordcountimportorg.apache.spark.{SparkConf,SparkCon