草庐IT

spark-ml

全部标签

scala - 使用临时凭证从 AWS 外部通过 spark 从 s3 读取

我正在尝试通过IntelliJ从我的笔记本电脑读取s3中的文件,这样我就可以更轻松地开发我的spark作业。textFileRDD代码在EMR集群内的Zeppelin中工作,但当我在本地尝试时却不行。在Zeppelin中,我不需要设置任何spark上下文,大概是因为Zeppelin实例在AWS环境中,它为我做了这件事。我编写了代码来创建临时AWS凭证(使用我的IAM用户key),以便我可以向spark上下文提供sessiontoken。访问key和secretkey也来自临时凭证。valsqlContext=sparkSession.sqlContextsqlContext.spark

apache-spark - 如何在 NiFi 中从 GetFilesProcessor 读取文件

下面是我的流程:GetFile>ExecuteSparkInteractive>PutFile我想从ExecuteSparkInteractive处理器中的GetFile处理器读取文件,应用一些转换并将其放在某个位置。下面是我的流程我在spark处理器的code部分写了sparkscala代码:valsc1=sc.textFile("local_path")sc1.foreach(println)流程中没有任何事情发生。那么如何使用GetFile处理器读取spark处理器中的文件。第二部分:我尝试了以下流程只是为了练习:ExecuteScript>PutFile>LogMessage我

apache-spark - 从 Spark 访问 Openstack Swift - SwiftAuthenticationFailedException

我正尝试从Spark2.4访问OpenstackSwift,但出现错误。org.apache.hadoop.fs.swift.exceptions.SwiftAuthenticationFailedException:Authenticateastenant'78axxxxxxxxxxxxxxxxxxxxxxxxxxxx'PasswordCredentials{username='xxxxxxxxxxxx'}sc.hadoopConfiguration.set(s"fs.swift.service.ovh.auth.url","https://auth.cloud.ovh.net/v3

apache-spark - Pyspark - 按组添加行

在Pyspark2.2中,我实际上是在尝试按用户添加行。如果我的主Dataframe如下所示:main_list=[["a","bb",5],["d","cc",10],["d","bb",11]]main_pd=pd.DataFrame(main_list,columns=['user',"group",'value'])main_df=spark.createDataFrame(main_pd)main_df.show()+----+-----+-----+|user|group|value|+----+-----+-----+|a|bb|5||d|cc|10||d|bb|11|+

scala - Spark : split only one column in dataframe and keep remaining columns as it is

我正在读取spark数据框中的文件。在第一列中,我将得到两个用“_”连接的值。我需要将第一列拆分为两列,并保持其余列不变。我将Scala与Spark结合使用例如:col1col2col3a_1xyzabcb_1lmnopq我需要有新的DF作为:col1_1col1_2col2col3a1xyzabcb1lmnopq只有一列需要拆分成两列。我尝试使用带有df.select的拆分函数,但我需要为剩余的列编写选择并考虑具有100列的不同文件,我想对所有文件使用可重用代码。 最佳答案 你可以这样做:importspark.implicits

scala - 异常线程 "main"scala.MatchError :Map() (of class org. apache.spark.sql.catalyst.util.CaseInsensitiveMap)

我正在尝试将数据从Excel工作表加载到Hive表。它在下面抛出错误.Map(treatemptyvaluesasnulls->true,location->"input",useheader->true,inferschema->true,addcolorcolumns->false,sheetname->"INPUT")(ofclassorg.apache.spark.sql.catalyst.util.CaseInsensitiveMap)使用的代码:valdf=spark.read.format("com.crealytics.spark.excel").option("loc

python - 如何找到当前 spark 上下文中加载的所有文本文件?

例如,当我在SparkShell中使用PySpark时,我可能会使用以下命令将文件加载到spark上下文中:readme=sc.textFile("/home/data/README.md")然后我可以像下面这样对这个RDD(?)执行操作来计算文件中的行数:readme.count()但是我想知道的是,我如何才能获得已加载到sc中的所有sc.textFile(s)的列表(spark上下文)?例如,下面有一些命令可以获取所有配置,但它没有列出我加载的所有文本文件。sc._conf.getAll()有什么方法可以找到所有已加载到spark上下文中的文本文件吗?列表?

apache-spark - 如何增加在 Yarn UI 上显示的 "memory total"?

我在EMR(emr-5.20.0)上有一个集群,其中一个m5.2xlarge作为NodeMaster,两个m4.large作为core,三个m4.large作为nodeworker。该集群的内存内存总和为62GB,但在YARNUI中显示的总内存为30GB。有人可以帮助我了解这个值是如何计算的吗?我已经检查了Yarn-site.xml和spark-default.conf中的配置,它们是根据AWS推荐配置的:https://docs.aws.amazon.com/pt_br/emr/latest/ReleaseGuide/emr-hadoop-task-config.html#emr-h

unit-testing - 测试 Spark : how to create a clean environment for each test

在测试我的ApacheSpark应用程序时,我想进行一些集成测试。出于这个原因,我创建了一个本地spark应用程序(启用了配置单元支持),在其中执行测试。如何在每次测试后清除derbyMetastore,以便下一次测试再次拥有干净的环境。我不想做的是在每次测试后重新启动spark应用程序。是否有任何最佳实践可以实现我想要的? 最佳答案 我认为为集成测试引入一些应用程序级逻辑打破了集成测试的概念。从我的角度来看,正确的方法是为每个测试重新启动应用程序。无论如何,我相信另一种选择是为每个测试启动/停止SparkContext。它应该清除

apache-spark - 如何从 spark2.3 访问 us-east-2 区域上的 Parquet 文件(使用 hadoop aws 2.7)

我们可以从当前代码访问us-east-1,但无法访问us-east-2上的parquet文件。请注意“us-east-2”连接,创建datafream在intellij上工作正常,但当我们从spark-shell尝试时它会给出400错误。我试图在sparkshell上工作/Users/test/Downloads/spark-2.3.3-bin-hadoop2.7/bin/spark-shell--jars/Users/test/Downloads/hadoop-aws-2.7.3.jar,/Users/测试/下载/aws-java-sdk-1.7.4.jarval配置=sc.hado