草庐IT

scala-cats

全部标签

scala - 停止 Spark Streaming : exception in the cleaner thread but it will continue to run

我正在开发一个Spark-Streaming应用程序,我只是想获得一个KafkaDirectStream工作的简单示例:packagecom.usernameimport_root_.kafka.serializer.StringDecoderimportorg.apache.spark.sql.SparkSessionimportorg.apache.spark.streaming.kafka._importorg.apache.spark.streaming.{Seconds,StreamingContext}objectMyAppextendsApp{valtopic=args(

scala - Spark Streaming dropDuplicates

Spark2.1.1(scalaapi)从s3位置流式传输json文件。我想根据在json中为每条记录找到的ID列(“event_id”)对所有传入记录进行重复数据删除。我不在乎保留了哪份记录,即使记录只是部分重复。我正在使用追加模式,因为数据只是通过spark.sql()方法被丰富/过滤,没有分组依据/窗口聚合。然后我使用追加模式将Parquet文件写入s3。根据文档,我应该能够使用不加水印的dropDuplicates来进行重复数据删除(显然这在长时间运行的生产中无效)。但是,这失败并出现错误:用户类抛出异常:org.apache.spark.sql.AnalysisExcepti

scala - 由 : java. lang.ClassNotFoundException : org. apache.hadoop.hbase.HBaseConfiguration 引起

我正在尝试在Hbase上(在指定的集群上)创建一个表,我尝试了以下代码:importorg.apache.hadoop.hbase.client.{HTable,Put,HBaseAdmin}importorg.apache.hadoop.hbase.util.Bytesimportorg.apache.hadoop.hbase.mapreduce.TableInputFormatimportorg.apache.hadoop.hbase.{HBaseConfiguration,HTableDescriptor,HColumnDescriptor}objectImportData{va

scala - 星火图和节目一起播出

我正在尝试加入两个数据集。下面是两个数据集。1/2/20096:17,iphone,800,Mastercard,carolina1/2/20094:53,cloth,200,Visa,Betina1/2/200913:08,cloth,100,Mastercard,FedericaeAndrea1/3/200914:44,blender,160,Visa,Gouya1/4/200912:56,samsung,3600,Visa,GerdW1/4/200913:19,htc,1200,Visa,LAURENCE1/4/200920:11,iphone,999,Mastercard,Fl

scala - 星火 SQL : access file in current worker node directory

我需要使用spark-sql读取一个文件,该文件在当前目录中。我使用此命令解压缩存储在HDFS上的文件列表。valdecompressCommand=Seq(laszippath,"-i",inputFileName,"-o","out.las").!!该文件在当前工作节点目录中输出,我知道这一点是因为通过scala执行"ls-a"!!我可以看到该文件在那里。然后我尝试使用以下命令访问它:valdataFrame=sqlContext.read.las("out.las")我假设sql上下文会尝试在当前目录中查找文件,但事实并非如此。此外,它不会抛出错误,而是会发出一条警告,指出找不到

scala - 循环遍历文件行并通过 Spark 中的每次迭代执行函数

我在HDFS中有一个名为file1的文件,其中包含以下几行:(每一行都是一个目录路径)this/is/path1this/is/path2this/is/path3...this/is/path1000ormore我有一个ScalaSpark函数如下:valresultset=sc.hadoopFile(inputpath,classOf[TextInputFormat],classOf[LongWritable],classOf[Text]).flatMap{case(k,v)=>if(k.get==0)Seq(v.toString)elseSeq.empty[String]}我想传

scala - Scala Spark 作业需要多长时间才能处理一个文件中的一百万行?

我在HDFS中有一个名为file1的文件,其中包含多个文件的路径:this/is/path1this/is/path2this/is/path3...this/is/path1000000如果我通过在Scala中执行以下行从该文件中获取所有行作为列表,vallines=Source.fromFile("/my/path/file1.txt").getLines.toList如果我如下使用“for”循环,在一个单独的函数中处理file1的每一行,该函数涉及每一行的一些映射功能,for(iif(k.get==0)Seq(v.toString)elseSeq.empty[String]}}假

scala - 使用 Spark sc.textFile 读取文件时如何捕获 BlockMissingException?

当读取存储在HDFS上的文本文件时,如果我在使用sc.textFile读取这些文件时遇到BlockMissingException(或其他一些异常),我该如何捕获错误并继续执行emptyRDD?我可能遇到BlockMissingException的原因是,例如,如果文件存储在复制因子为1的HDFS上并且数据节点出现故障。考虑以下最小示例代码:valmyRDD:RDD[String]=try{sc.textFile("hdfs:///path/to/fileWithMissingBlock")}catch{casee:BlockMissingException=>println("mis

scala - 在配置单元中发布插入数据创建小零件文件

我正在处理超过1000000条json文件记录我正在逐行读取文件并提取所需的键值(json是混合结构未修复。所以我正在解析并生成所需的json元素)并生成类似于json_string变量的json字符串并推送到配置单元表数据正确存储但在hadoopapps/hive/warehouse/jsondb.myjson_table文件夹中包含小零件文件。每次插入查询都会创建新的(.1到.20kb)零件文件。因为如果我在配置单元上运行简单查询,因为它需要超过30分钟。显示我的逻辑示例代码,此迭代多次将新记录插入到配置单元中。importorg.apache.spark.sql.SparkSes

scala - 小于 spark scala rdd 中日期的比较

我想打印1991年以前入职的员工数据,下面是我的示例数据:69062,FRANK,ANALYST,5646,1991-12-03,3100.00,,200163679,SANDRINE,CLERK,69062,1990-12-18,900.00,,2001用于加载数据的初始RDD:valrdd=sc.textFile("file:////home/hduser/Desktop/Employees/employees.txt").filter(p=>{p!=null&&p.trim.length>0})用于将字符串列转换为日期列的UDF:defconvertStringToDate(s: