草庐IT

scala-compiler

全部标签

scala - Spark Hadoop 广播失败

运行spark-submit作业并收到“无法获取broadcast_58_piece0...”错误。我真的不确定我做错了什么。我是否过度使用了UDF?功能太复杂?作为我的目标的总结,我正在解析pdf中的文本,这些文本作为base64编码的字符串存储在JSON对象中。我正在使用ApacheTika获取文本,并尝试大量使用数据帧来简化操作。我写了一段代码,通过tika将文本提取作为“主”之外的一个函数在数据上作为RDD运行,并且运行完美。但是,当我尝试将提取作为数据帧上的UDF引入main时,它会以各种不同的方式出现问题。在我到达这里之前,我实际上是在尝试将最终数据框编写为:valid.t

scala - Spark(Scala)从驱动程序写入(和读取)本地文件系统

第一个问题:我有一个带有hadoop的2节点虚拟集群。我有一个运行Spark作业的jar。此jar接受作为cli参数:commands.txt文件的路径,该文件告诉jar运行哪些命令。我使用spark-submit运行作业,我注意到我的从节点没有运行,因为它找不到主节点本地的commands.txt文件。这是我用来运行它的命令:./spark-1.6.1-bin-hadoop2.6/bin/spark-submit--classuniv.bigdata.course.MainRunner--masteryarn\--deploy-modecluster--executor-memory

scala - 将 hdfs 文件加载到 spark 上下文中

我是spark/scala的新手,需要从hdfs加载一个文件到spark。我在hdfs(/newhdfs/abc.txt)中有一个文件,我可以使用hdfsdfs-cat/newhdfs/abc.txt/查看我的文件内容p>我按照以下顺序将文件加载到spark上下文中spark-shell#Itenteredintoscalaconsolewindowscala>importorg.apache.spark._;//Line1scala>valconf=newSparkConf().setMaster("local[*]");scala>valsc=newSparkContext(con

scala - 从 HDFS 读取文件并将内容分配给字符串

在Scala中,如何读取HDFS中的文件并将内容分配给变量。我知道如何读取文件并且能够打印它。但是如果我尝试将内容分配给一个字符串,它将输出作为Unit()。以下是我尝试过的代码。valdfs=org.apache.hadoop.fs.FileSystem.get(config);valsnapshot_file="/path/to/file/test.txt"valstream=dfs.open(newPath(snapshot_file))defreadLines=Stream.cons(stream.readLine,Stream.continually(stream.readL

scala - 从数据框中选择时重命名列名

我有2个数据框:df1和df2,我将它们都加入到id列并将其保存到另一个名为df3的数据框。下面是我正在使用的代码,它按预期工作正常。valdf3=df1.alias("tab1").join(df2.alias("tab2"),Seq("id"),"left_outer").select("tab1.*","tab2.name","tab2.dept","tab2.descr");我想在上述语句中将tab2.descr列重命名为dept_full_description。我知道我可以像下面这样创建一个seqval并使用toDF方法valcolumnsRenamed=Seq("id",

scala - 如何使用配置单元上下文有效地查询 spark 中的配置单元表?

我有一个包含时间序列数据的1.6THive表。我正在使用Hive1.2.1和scala中的Spark1.6.1。以下是我在代码中的查询。但我总是遇到Java内存不足错误。valsid_data_df=hiveContext.sql(s"SELECTtime,total_field,sid,year,dateFROMtablenameWHEREsid='$stationId'ORDERBYtimeLIMIT4320000")通过一次从配置单元表中迭代选择几条记录,我试图在结果dataframe上做一个滑动窗口我有一个由4个节点组成的集群,具有122GB内存和44个vCore。我正在使用4

scala - 使用 spark 在 hive 中流式传输数据存储

我正在创建一个应用程序,在其中获取流式数据,这些数据进入kafka,然后在spark上。使用数据,应用一些登录,然后将处理后的数据保存到配置单元中。数据速度非常快。我在1分钟内获得50K条记录。Spark流中有1分钟的窗口,它处理数据并将数据保存在配置单元中。我的问题是生产前瞻性架构可以吗?如果是,我如何将流数据保存到配置单元中。我正在做的是,创建1分钟窗口数据的数据框,并使用将其保存在配置单元中results.write.mode(org.apache.spark.sql.SaveMode.Append).insertInto("stocks")我还没有创建管道。可以吗,还是我必须修

scala - 在 Apache Spark 中按列分区到 S3

有我们想要从具有JSON的S3读取文件的用例。然后,基于特定的JSON节点值,我们希望对数据进行分组并将其写入S3。我能够读取数据,但找不到关于如何根据JSONkey对数据进行分区然后上传到S3的好例子。任何人都可以提供任何示例或指向可以帮助我处理此用例的教程吗?创建数据框后我得到了我的数据模式:root|--customer:struct(nullable=true)||--customerId:string(nullable=true)|--experiment:string(nullable=true)|--expiryTime:long(nullable=true)|--par

scala - 与文件名中的冒号 ':' 作斗争

我有以下代码用于加载大量“csv.gz”并将它们转储到其他文件夹中,源文件名作为一列。objectDailyMergerextendsApp{defallFiles(path:File):List[File]={valparts=path.listFiles.toList.partition(_.isDirectory)parts._2:::parts._1.flatMap(allFiles)}valsqlContext=SparkSession.builder().appName("DailyMerger").master("local").getOrCreate()valfiles

scala - 从 spark 中列出/检索 HDFS 分区作为 Map(String,List(String))

我想知道是否有某种方法可以利用spark.hdfs文件夹结构中已经存在的元数据信息。例如,我正在使用以下代码将数据写入hdfs,valcolumns=Seq("country","state")dataframe1.write.partitionBy(columns:_*).mode("overwrite").save(path)这会生成类似的目录结构,path/country=xyz/state=1path/country=xyz/state=2path/country=xyz/state=3path/country=abc/state=4我想知道的是使用spark,有没有办法将所有