草庐IT

spark-csv

全部标签

mongodb - Spark scala 使用 spark-mongo connector 进行 upsert

有什么方法可以根据数据框中的某个字段使用spark-mongo连接器更新MongoCollection吗? 最佳答案 要根据唯一键约束替换文档,请使用replaceDocument和shardKey选项。默认的shardKey是{_id:1}。https://docs.mongodb.com/spark-connector/master/configuration/df.write.format('com.mongodb.spark.sql')\.option('collection','target_collection')\.o

mongodb - 使用 Apache Spark 更新/替换 Mongo 文档

这是我们使用MongoSpark连接器处理Spark和MongoDB时的一个常见问题。此连接器旨在以批处理方式将文档插入/更新到MongoDB中。使用Spark可以通过三种方式插入/更新文档。RDD[文档]DataFrame[CaseClass]数据集[CaseClass]数据集和数据框都支持使用MangoSpark.save()方法插入/更新文档,而RDD[Document]仅支持插入。所以我们在使用MongoSpark更新RDD[Document]时遇到了问题。是否有使用Spark将RDD[Document]更新/替换为MongoDB的解决方案? 最佳答

mongodb - 解析 spark 中几乎没有模式的 mongo 集合时出现问题

我正在使用Spark将数据从一个集合移动到另一个集群中的另一个集合。数据的架构不一致(我的意思是在具有不同数据类型的单个集合中几乎没有架构,变化很小)。当我尝试从spark读取数据时,采样无法获取数据的所有模式并抛出以下错误。(我有一个复杂的模式,我无法明确提及,而不是通过采样获取spark。)com.mongodb.spark.exceptions.MongoTypeConversionException:无法将ARRAY转换为NullType(值:BsonArray{values=[{“type”:“GUEST_FEE”,“appliesPer”:“GUEST_PER_NIGHT”

mongodb - 如何将MongoDb查询转换成csv(excel格式)

这是我的查询。我想将此查询的输出导出为csv(excel格式),这样我就可以获得一个数据表。我怎样才能做到这一点?我正在使用Robo3t。(db.getCollection('sentimentOpinions').aggregate([{$match:{objectType:"Security"}},{$lookup:{from:"securities",localField:"objectId",foreignField:"id",as:"StockID"}},{$unwind:"$StockID"},{$lookup:{from:"users",localField:"userI

mongodb - Mongo从CSV中的独立纬度和经度列导入位置

我有一个包含3个字段的CSV:名称、纬度、经度。一排看起来像这样:Place1,73.992964,40.739037mongoimport纬度和经度到loc字段的正确方法是什么?我知道位置索引字段需要是经度、纬度并且是单个数组而不是纬度和经度的2个离散字段但是如果有一种方法可以通过mongoimport处理从离散值到数组的处理,我就不知道了我是否需要先转换为包含经度和纬度的单列loc的CSV?Place1,[-73.992964,40.739037]我经常要处理将纬度和经度存储在独立列中的CSV,因此我希望找到一种使用mongoimport执行此操作的方法。

Linux安装 spark 教程详解

目录一准备安装包二安装scala三修改配置文件1)修改workers文件2)修改spark-env.sh文件四进入spark交互式平台一准备安装包    可以自行去spark官网下载想要的版本    这里准备了spark3.1.2的网盘资源链接:https://pan.baidu.com/s/1Brm6XqaqYQnXQwOd8mUt7A?pwd=2bye提取码:2bye    下载后上传至linux服务器上    这里放在了/opt/install目录    解压至/opt/soft目录tar-zxf/opt/install/spark-3.1.2-bin-hadoop3.2.tgz-C/o

python - 将 CSV 文件导入 MongoDB 时,文件大小要求是否会发生变化?

背景:我正在尝试关注tutorial我正在其中导入一个大约324MB的CSV文件MongoLab的沙箱计划(上限为500MB),通过Python3.4中的pymongo。该文件包含约770,000条记录,插入后约164,000条我达到了配额并收到:raiseOperationFailure(error.get("errmsg"),error.get("code"),error)OperationFailure:quotaexceeded问题:NoSQL的类JSON结构需要更多空间来保存与CSV文件相同的数据是否准确?或者我在这里做了什么古怪的事?更多信息:以下是数据库指标:这是我使用的

结构化数据处理与分析:Spark SQL 教程

作者:禅与计算机程序设计艺术1.简介1.1概述ApacheSpark是由Apache基金会开发的开源分布式计算框架,最初用于对大规模数据进行快速的处理,在大数据计算领域占据重要地位。其独特的高性能处理能力及丰富的数据处理功能使得Spark在各个行业应用广泛。SparkSQL是Spark提供的用于结构化数据的查询语言,具有灵活的数据处理能力、易用性、可移植性等优点。本教程将带领读者了解SparkSQL的基础知识、语法、使用方法和实践经验。1.2目标受众本教程面向对ApacheSpark有一定了解但对SparkSQL并不熟悉的读者,包括Spark用户、程序员和数据科学家。希望通过本教程能够帮助读者

mongodb - Spark - 如何在 map() 中创建新的 RDD? (执行者的 SparkContext 为空)

我有以下应用程序,它通过MongoDBSpark连接器使用到MongoDB的连接。我的代码崩溃是因为执行程序的SparkContext为空。基本上我从MongoDB读取数据,处理这些数据,这会导致需要发送到MongoDB的额外查询。最后一步是保存这些额外查询的数据。我使用的代码:JavaMongoRDDrdd=MongoSpark.load(sc);JavaMongoRDDaggregatedRdd=rdd.withPipeline(...);JavaPairRDDpairRdd=aggregatedRdd.mapToPair((document)->newTuple2(documen

Spark系列之Spark的RDD详解

title:Spark系列第五章Spark的RDD详解5.1RDD概述​RDD是Spark的基石,是实现Spark数据处理的核心抽象。那么RDD为什么会产生呢?​Hadoop的MapReduce是一种基于数据集的工作模式,面向数据,这种工作模式一般是从存储上加载数据集,然后操作数据集,最后写入物理存储设备。数据更多面临的是一次性处理。​MapReduce的这种方式对数据领域两种常见的操作不是很高效。第一种是迭代式的算法。比如机器学习中ALS、凸优化梯度下降等。这些都需要基于数据集或者数据集的衍生数据反复查询反复操作。MapReduce这种模式不太合适,即使多MapReduce串行处理,性能和时