我正在尝试将数据框存储到外部配置单元表中。当我执行以下操作时:recordDF.write.option("path","hdfs://quickstart.cloudera:8020/user/cloudera/hadoop/hive/warehouse/VerizonProduct").saveAsTable("productstoreHTable")在本应存在表的hdfs位置,我得到了这个:-rw-r--r--3clouderacloudera02016-12-2518:58hadoop/hive/warehouse/VerizonProduct/_SUCCESS-rw-r--r
我想在sparkdataframe中转换下面的query:sqlContext.sql("SELECTd.dep_name,count(*)FROMemployeese,departmentdWHEREe.dep_id=d.dep_idGROUPBYd.dep_nameHAVINGcount(*)>=2").show输出:+---------+---+|dep_name|_c1|+---------+---+|FINANCE|3||AUDIT|5||MARKETING|6|我尝试使用以下查询:scala>finalEmployeesDf.as("df1").join(depDf.as(
这是我的函数应用规则,colmdp_codcat,mdp_idregl,usedRefchangechangesaccordingtothedatainarraybRef.defwithMdpCodcat(bRef:Broadcast[Array[RefRglSDC]])(dataFrame:DataFrame):DataFrame={varmatchRule=falsevari=0while(i示例-我的数据框:valDF=Seq(("tt","aa","bb"),("tt1","aa1","bb2"),("tt1","aa1","bb2")).toDF("t","a","b)+--
我有一个spark数据框,我需要如下所示的键值对。我特别需要键中的列名。我想使用单个映射器传递来执行此操作。原始数据集:预期键值对:(Attribute_Name,Attribute_Value,Class),1单次映射后的预期结果:预期数据集 最佳答案 这应该有帮助:importorg.apache.spark.{SparkConf,SparkContext}importorg.apache.spark.sql.functions.{explode,udf,typedLit}importorg.apache.spark.sql.S
我有以下代码:publicclassIPCCodes{publicstaticclassIPCCountimplementsSerializable{publicIPCCount(longpermid,intyear,intcount,Stringipc){this.permid=permid;this.year=year;this.count=count;this.ipc=ipc;}publiclongpermid;publicintyear;publicintcount;publicStringipc;}publicstaticvoidmain(String[]args){Spar
我正在寻找更好的方法将Dataframe转换为RDD。现在我正在将数据帧转换为集合和循环集合以准备RDD。但我们知道循环不是好的做法。valrandomProduct=scala.collection.mutable.MutableList[Product]()valresults=hiveContext.sql("selectid,valuefromdetails");valcollection=results.collect();vari=0;results.collect.foreach(t=>{valproduct=newProduct(collection(i)(0).asI
我有一个像这样的json文件:{"employeeDetails":{"name":"xxxx","num":"415"},"work":[{"monthYear":"01/2007","workdate":"1|2|3|....|31","workhours":"8|8|8....|8"},{"monthYear":"02/2007","workdate":"1|2|3|....|31","workhours":"8|8|8....|8"}]}我必须从这个json数据中获取工作日期和工作时间。我正在使用Spark2.1.1我试过这样的:valspark=SparkSession.bu
有两个json,第一个json有更多的列,并且总是超集。valdf1=spark.read.json(sqoopJson)valdf2=spark.read.json(kafkaJson)除了操作:我喜欢在df1和df2上应用except操作,但是df1有10列,而df2只有8列。如果手动从df1中删除2列,则except将起作用。但是我有50多个表/json,需要对所有50组表/json执行EXCEPT。问题:如何从DF1中仅选择DF2(8)列中可用的列并创建新的df3?所以df3将拥有来自df1的有限列的数据,并且它将与df2列匹配。 最佳答案
我有一个四节点hadoop集群(mapr),每个集群有40GB内存。我需要在大数据集(5亿行)的其中一个字段上“应用”一个函数。我的代码流程是,我从配置单元表中读取数据作为spark数据帧,并在其中一列上应用所需的函数,如下所示:schema=StructType([StructField("field1",IntegerType(),False),StructField("field2",StringType(),False),StructField("field3",FloatType(),False)])udfCos=udf(lambdarow:function_call(row
我在配置单元上运行sparksql。我需要在创建新的配置单元表时添加auto.purge表属性。我尝试使用以下代码在调用saveAsTable方法时添加选项:inputDF.write.option("auto.purge"->"true").saveAsTable(hiveTableName)上面的代码行在表的WITHSERDEPROPERTIES下添加了一个属性。我需要在配置单元DDL的TBLPROPERTIES部分下添加此属性。 最佳答案 最后我找到了一个解决方案,我不确定这是否是最好的解决方案。不幸的是,Spark1.5sq