草庐IT

dataframe

全部标签

scala - 在 Spark (HDFS) 中写入 CSV 文件时选择哪个选项?

我必须比较CSV文件,然后我必须删除所有重复的行。所以,我的情况就像我有一个文件夹,我必须将每个过滤结果放在该文件夹中,当一些新文件出现时,我必须将文件夹中的现有文件与新文件进行比较,最后,我必须把将结果返回到同一文件夹。eg:/data/ingestion/file1.csva1b1c1a2b2c2a3b3c3/data/ingestion/file2.csva4b4c4a5b5c5a6b6c6newupcomingfile(upcoming_file.csv):a1b1c1a5b5c5a7b7c7现在我的方法是从/data/ingestion/*中存在的所有文件创建一个数据帧。然后

scala - 无法解决 Spark 作业中的符号拆分

我在我的IntelliJIDE上运行一个spark应用程序作为Maven项目,我正在尝试创建一个rowRDD并将它们转换为数据帧并将其存储在hdfs中。SPARKVERSION:1.5.2SCALAVERSION:2.10.4我的代码:valrowRDD=dataframename.map(_.split("\t")).map(p=>Row(p(0),p(1),p(2),p(3)))它报告值拆分不是我的类包的成员并且报告应用程序不采用任何参数。存在一些依赖性问题,我需要这方面的帮助。注意:我已经完成了rowRDD的模式定义感谢支持 最佳答案

scala - 如何优化 spark 函数以将 double 值舍入为 2 位小数?

下面是我的Spark函数,它很简单defdoubleToRound(df:DataFrame,roundColsList:Array[String]):DataFrame={vary:DataFrame=dffor(colDF这按预期工作,通过使给定DF的多个列的值将小数值四舍五入到2个位置。但是我循环遍历DataFramey直到Array[Sting].length()列。有更好的方法来完成上述操作吗?谢谢大家 最佳答案 您可以简单地使用select和map,如下例所示:importorg.apache.spark.sql.fun

DataFrame转化为json的方法教程

网络上有好多的教程,讲得不太清楚和明白,我用实际的例子说明了一下内容,附档代码,方便理解和使用 DataFrame.to_json(path_or_buf=None, orient=None, date_format=None, double_precision=10, force_ascii=True, date_unit='ms', default_handler=None, lines=False, compression='infer', index=True, indent=None) [source]将对象转换为JSON字符串。注意:NaN和None将被转换为null, datet

scala - 将 RDD[String] 转换为 RDD[Row] 到 Dataframe Spark Scala

我正在读取一个有很多空格的文件,需要过滤掉空格。之后我们需要将其转换为数据框。下面的示例输入。2017123¦¦10¦running¦00000¦111¦-EXAMPLE我的解决方案是使用以下函数来解析所有空格并修剪文件。deftruncateRDD(fileName:String):RDD[String]={valexample=sc.textFile(fileName)example.map(lines=>lines.replaceAll("""[\t\p{Zs}]+""",""))}但是,我不确定如何将它放入数据框中。sc.textFile返回一个RDD[String]。我尝试了

hadoop - 将 Dataframe 存储到 spark 中的配置单元分区表

我正在尝试将从kafka主题传入的数据流存储到配置单元分区表中。我能够将dstream转换为数据帧并创建一个配置单元上下文。我的代码看起来像这样valhiveContext=newHiveContext(sc)hiveContext.setConf("hive.exec.dynamic.partition","true")hiveContext.setConf("hive.exec.dynamic.partition.mode","nonstrict")newdf.registerTempTable("temp")//newdfismydataframenewdf.write.mode

scala - 从数据框中选择时重命名列名

我有2个数据框:df1和df2,我将它们都加入到id列并将其保存到另一个名为df3的数据框。下面是我正在使用的代码,它按预期工作正常。valdf3=df1.alias("tab1").join(df2.alias("tab2"),Seq("id"),"left_outer").select("tab1.*","tab2.name","tab2.dept","tab2.descr");我想在上述语句中将tab2.descr列重命名为dept_full_description。我知道我可以像下面这样创建一个seqval并使用toDF方法valcolumnsRenamed=Seq("id",

hadoop - Apache Spark : Apply existing mllib model on Incoming DStreams/DataFrames

使用ApacheSpark的mllib,我有一个存储在HDFS中的逻辑回归模型。此逻辑回归模型是根据来自某些传感器的历史数据进行训练的。我有另一个spark程序,它使用来自这些传感器的流数据。我希望能够使用预先存在的训练模型对传入的数据流进行预测。注意:我不希望我的模型被这些数据更新。要加载训练模型,我必须在我的代码中使用以下行:vallogisticModel=LogisticRegressionModel.load(sc,)sc:Spark上下文。但是,这个应用程序是一个流应用程序,因此已经有一个“StreamingContext”设置。现在,根据我的阅读,在同一个程序中有两个上下

hadoop - 在 Spark 中保存有序数据框

我正在尝试将有序数据帧保存到HDFS中。我的代码如下所示:dataFrame.orderBy("index").write().mode(SaveMode.Overwrite).parquet(getPath());我在两个不同的集群上运行相同的代码,一个集群使用Spark1.5.0,另一个-1.6.0。当使用Spark1.5.0在集群上运行时,它不会在保存到光盘后保留排序。是否有任何特定的集群设置可以在将数据保存到光盘时保留排序?还是spark版本的已知问题?我搜索了spark文档,但找不到任何相关信息。更新:我检查过parquet中的文件,在这两种情况下文件都已排序。所以在读取时出

hadoop - 如何从 sequenceFile 创建一个 spark DataFrame

我正在使用spark1.5。我想从HDFS中的文件创建一个dataframe。HDFS文件包含json数据,其中包含大量序列输入文件格式的字段。有没有办法在java中优雅地做到这一点?事先不知道json的结构/字段。我能够从序列文件中将输入作为RDD,如下所示:JavaPairRDDinputRDD=jsc.sequenceFile("s3n://key_id:secret_key@file/path",LongWritable.class,BytesWritable.class);JavaRDDevents=inputRDD.map(newFunction,String>(){pub