草庐IT

scala-cats

全部标签

scala - 如何使用 Spark 读取不断更新的 HDFS 目录并根据字符串(行)将输出拆分为多个 HDFS 文件?

详细场景->HDFS目录,其中“提供”了多种类型的银行账户事件的新日志数据。每行代表一个随机事件类型,每行(String)包含文本“ActivityType=”。在Spark-Scala中,读取HDFS目录中的输入文件并输出多个HDFS文件的最佳方法是什么,其中每个ActivityType都写入其自己的新文件? 最佳答案 改编为声明的第一个答案:Thelocationofthe"key"stringisrandomwithintheparentString,theonlythingthatisguaranteedisthatitco

apache-spark - 使用 Spark/Scala 读取序列文件时,无法序列化结果 : org. apache.hadoop.io.IntWritable

从逻辑上读取带有Int和String的序列文件,然后如果我这样做:valsequence_data=sc.sequenceFile("/seq_01/seq-directory/*",classOf[IntWritable],classOf[Text]).map{case(x,y)=>(x.toString(),y.toString().split("/")(0),y.toString().split("/")(1))}.collect这没问题,因为IntWritable已转换为String。如果我这样做:valsequence_data=sc.sequenceFile("/seq_0

sql - 如何将架构添加到 spark Scala 中另一个文件的文件

我正在使用Spark并使用Scala我有两个csv文件,一个有列名,另一个有数据,我如何整合它们,以便我可以制作一个包含架构和数据的结果文件,然后我必须对该文件应用操作,例如groupby、cout等,因为我需要计算这些列中的不同值。所以任何人都可以在这里提供帮助,这将非常有帮助我写了下面的代码,在读取它们之后从两个文件中创建了两个DF,而不是我现在使用union加入两个DF如何将第一行作为模式,或者任何其他方式来继续这个。任何人都可以提出建议。valsparkConf=newSparkConf().setMaster("local[4]").setAppName("hbasesql"

scala - 在 Spark 中将数据帧写入 Teradata

我在dataframe中有值,我在Teradata中创建了一个表结构。我的要求是将数据框加载到Teradata。但是我收到错误:我试过下面的代码:df.write.format("jdbc").option("driver","com.teradata.jdbc.TeraDriver").option("url","organization.td.intranet").option("dbtable",s"select*fromtd_s_zm_brainsdb.emp").option("user","userid").option("password","password").mod

scala - 带 3 个参数的 zip 函数

我想转置SparkSQL表中的多个列我发现这个解决方案只有两列,我想知道如何使用三列varA、varB和varC的zip函数。importorg.apache.spark.sql.functions.{udf,explode}valzip=udf((xs:Seq[Long],ys:Seq[Long])=>xs.zip(ys))df.withColumn("vars",explode(zip($"varA",$"varB"))).select($"userId",$"someString",$"vars._1".alias("varA"),$"vars._2".alias("varB")

scala - CreatePairedStream 不是 MQTTUtils 的成员

当我声明MQTTUTils.createPairedStream()时,我得到了类似这样的错误valuecreatePairedStreamisnotamemberofobjectorg.apache.spark.streaming.mqtt.MQTTUtils我的spark和Scala版本是SCALA版本-2.11.8Spark版本-2.3.0 最佳答案 您可以在apacheBahir中看到以下拉取请求:BahirPullRequest您可以在哪里看到正在添加MQTTUtils.createPairedStream。您使用以下工件

scala - 使用临时凭证从 AWS 外部通过 spark 从 s3 读取

我正在尝试通过IntelliJ从我的笔记本电脑读取s3中的文件,这样我就可以更轻松地开发我的spark作业。textFileRDD代码在EMR集群内的Zeppelin中工作,但当我在本地尝试时却不行。在Zeppelin中,我不需要设置任何spark上下文,大概是因为Zeppelin实例在AWS环境中,它为我做了这件事。我编写了代码来创建临时AWS凭证(使用我的IAM用户key),以便我可以向spark上下文提供sessiontoken。访问key和secretkey也来自临时凭证。valsqlContext=sparkSession.sqlContextsqlContext.spark

scala - Spark : split only one column in dataframe and keep remaining columns as it is

我正在读取spark数据框中的文件。在第一列中,我将得到两个用“_”连接的值。我需要将第一列拆分为两列,并保持其余列不变。我将Scala与Spark结合使用例如:col1col2col3a_1xyzabcb_1lmnopq我需要有新的DF作为:col1_1col1_2col2col3a1xyzabcb1lmnopq只有一列需要拆分成两列。我尝试使用带有df.select的拆分函数,但我需要为剩余的列编写选择并考虑具有100列的不同文件,我想对所有文件使用可重用代码。 最佳答案 你可以这样做:importspark.implicits

scala - 异常线程 "main"scala.MatchError :Map() (of class org. apache.spark.sql.catalyst.util.CaseInsensitiveMap)

我正在尝试将数据从Excel工作表加载到Hive表。它在下面抛出错误.Map(treatemptyvaluesasnulls->true,location->"input",useheader->true,inferschema->true,addcolorcolumns->false,sheetname->"INPUT")(ofclassorg.apache.spark.sql.catalyst.util.CaseInsensitiveMap)使用的代码:valdf=spark.read.format("com.crealytics.spark.excel").option("loc

scala - `saveAsTable` 之后无法从 Hive 查询 Spark DF - Spark SQL 特定格式,与 Hive 不兼容

我正在尝试将数据框另存为外部表,它将使用spark和可能使用hive进行查询,但不知何故,我无法使用hive查询或查看任何数据。它适用于spark。重现问题的方法如下:scala>println(spark.conf.get("spark.sql.catalogImplementation"))hivescala>spark.conf.set("hive.exec.dynamic.partition","true")scala>spark.conf.set("hive.exec.dynamic.partition.mode","nonstrict")scala>spark.conf.s