草庐IT

spark-ml

全部标签

apache-spark - 如何在转换期间测试数据类型转换

我们有一个将数据映射到数据框的脚本(我们使用的是pyspark)。数据以字符串形式传入,并且对它进行了一些其他有时代价高昂的操作,但作为操作的一部分(调用withColumn),我们对其最终数据类型进行了强制转换。我需要判断是否发生了截断,但我们不想在截断发生时失败。我们只想要一个数字来知道每个翻译列(大约有300列)中有多少行失败。我的第一个想法是让每一列通过一个UDF来进行测试,输出将是一个包含值的数组,以及一个关于它是否通过数据类型检查的值。然后我会做2个选择。一个从数组中选择原始值,另一个聚合未命中的值。但这似乎是一个草率的解决方案。我是pyspark/hadoop世界的新手.

java - 如何使用 spark Java API 将科学格式的 double 转换为 String?

我是sparkJavaAPI的新手。我想用科学格式示例转换double:1.7E7---->17000000,00。我的数据集是:+---------+------------+|account|amount|+---------+------------+|c1|1.7E7||c2|1.5E8||c3|142.0|我想将我的数据集转换成这样的东西。+---------+----------------------+|account|amount|+---------+----------------------+|c1|17000000,00||c2|1500000000,00||c

apache-spark - 如何根据数据大小重新分区rdd

我正在开发SparkStreaming项目,该项目从Kafka获取数据并应用一些规则并将数据保存在Hive中。我的问题是数据摄取率不固定。60秒可能是100万条消息到来,也可能是1条。我想在Dstream上添加重新分区。因为Dstream只有3个分区,无法在一分钟内处理百万条记录。重新分区在少于20条记录时会出现问题。它在Hive中创建多个小文件。dataStream.map(_._2).repartition(20)我的问题是如何根据rdd大小对rdd进行重新分区。这样它就可以处理一条消息或100万条消息。 最佳答案 你无法以任何

apache-spark - Spark thrift 服务器仅使用 2 个内核

Googledataproc一个节点集群,VCoresTotal=8。我已经从用户spark尝试过:/usr/lib/spark/sbin/start-thriftserver.sh--num-executors2--executor-cores4试图改变/usr/lib/spark/conf/spark-defaults.conf试图执行exportSPARK_WORKER_INSTANCES=6exportSPARK_WORKER_CORES=8在启动thriftserver.sh之前没有成功。在yarnUI中,我可以看到thrift应用程序仅使用2个内核和6个可用内核。更新1:s

scala - spark - select 中的条件语句

我正在从Dataframecol1和col2中选择两个Column。df.select((col("a")+col("b")).as("sum_col")现在用户希望此sum_col的空格固定为4。所以a和b的长度是2因此最大值可以小于100(二)或大于100(三)所以需要有条件地添加1或2个空格。任何人都可以告诉我如何在selectblock中使用条件逻辑将Column转换为concat并决定一个或两个空格被添加 最佳答案 只需使用format_string函数importorg.apache.spark.sql.function

apache-spark - 如何将托管在 HDFS 中的配置文件传递给 Spark 应用程序?

我正在使用SparkStructuredStreaming。另外,我正在使用Scala。我想将配置文件传递给我的spark应用程序。此配置文件托管在HDFS中。例如;spark_job.conf(HOCON)spark{appName:"",master:"",shuffle.size:4etc..}kafkaSource{servers:"",topic:"",etc..}redisSink{host:"",port:999,timeout:2000,checkpointLocation:"hdfslocation",etc..}如何将它传递给Spark应用程序?我如何在Spark中

scala - 内存不足异常或工作节点在 spark scala 作业期间丢失

我正在使用spark-shell执行一个spark-scala作业,我面临的问题是,在最后阶段和最终映射器结束时,就像在第5阶段,它分配50并很快完成49,在第50个它需要5分钟,并说内存不足并失败。我正在使用SPARK_MAJOR_VERSION=2我正在使用下面的命令spark-shell--masteryarn--confspark.driver.memory=30G--confspark.executor.memory=40G--confspark.shuffle.service.enabled=true--confspark.dynamicAllocation.enabled

apache-spark - 如何在 Spark 中使用 ASCII 函数

我需要帮助使用scala来使用Spark函数ASCII(sparkSQl字符串函数)importorg.apache.spark.sql.SparkSessionimportspark.implicits._importorg.apache.spark.sql.functionsvala=sc.parallelize(Array("Santosh","Adithya"))selectascii('Santosh')我需要santosh的ascii值和rdda的ascii值 最佳答案 ascii是spark-sqlapi的一部分,只能

apache-spark - 将 JSON 字符串列拆分为多列

我正在寻找一种通用解决方案,以将所有json字段提取为JSON字符串列中的列。df=spark.read.load(path)df.show()'path'中文件的文件格式为parquet示例数据|id|json_data|1|{"name":"abc","depts":["dep01","dep02"]}|2|{"name":"xyz","depts":["dep03"],"sal":100}|3|{"name":"pqr","depts":["dep02"],"address":{"city":"SF","state":"CA"}}预期输出|id|name|depts|sal|ad

hadoop - 使用 globStatus 和 Google Cloud Storage 存储桶作为输入时无法运行 Spark 作业

我正在使用Spark1.1。我有一个Spark作业,它只在存储桶下寻找特定模式的文件夹(即以...开头的文件夹),并且应该只处理那些。我通过执行以下操作实现了这一点:FileSystemfs=FileSystem.get(newConfiguration(true));FileStatus[]statusArr=fs.globStatus(newPath(inputPath));ListstatusList=Arrays.asList(statusArr);ListpathsStr=convertFileStatusToPath(statusList);JavaRDDpaths=sc.