草庐IT

spark-csv

全部标签

python - Spark 流作业性能改进

有一个sparkstreaming作业一直在运行,计算流中的单词,并且只应计算并返回给定词汇表中的单词。但是,这个词汇表不是固定的,而是存储在redis中,并且可以随时间变化。这是这项工作的简单实现:sc=SparkContext(appName="WordCount")ssc=StreamingContext(sc,10)#batchintervalis10sdefcheck_if_in_vocab(word):vocab=redis_client.smembers()#getallvocabularyfromredisreturnwordinvocablines=ssc.socke

java - 使用 CSV 文件更新 mongoDB 集合

我有一个具有以下值(value)的mongoDB集合latlongband123.45456.45467我想更新那个集合以便我得到后续集合latlongband1band223.45456.4546773我想用于更新的csv包含34.656,87.7565,78此csv的大小为5gb我正在使用以下命令来更新集合mongoimport--dbtest--collectionnct_test--upsert--upsertFieldsband2--filegdalexportedb8.csv但是我遇到了以下错误..Failed:errorprocessingdocument#2:inval

mongodb - 将 mongodb oplog.rs 加载到 spark dataframe

我正在尝试将MongoDB中的oplog.rs加载到sparkDataFrame中,它加载了元数据并通过printSchema函数对其进行了验证,但是当我尝试执行诸如show或count之类的操作时它给了我这个错误scala.MatchError:((BsonMinKey,null),0)(ofclassscala.Tuple2)。我也尝试将其注册为temptable,但仍然出现相同的错误。valcustomReadConfig=ReadConfig(Map("uri"->"mongodb://username:password@host_name:port/local.oplog.r

python-3.x - 使用 pymongo 从 csv 插入数组

我有一个csv文件,其中包含如下字符串格式的数组:date,name,criteria2018-05-16,John,"[{'age':35},{'birthyear':1983}]"2018-05-16,Jane,"[{'age':36},{'birthyear':1982}]"我正在使用带有pandas和numpy的Python来处理这个问题我需要按以下格式将此文件导入MongoDB集合:{"date":'2018-05-16',"name":"John","criteria":[{"age":35},{"birthyear":1983}]},{"date":'2018-05-16

java - 将 Spark 流数据帧写入 MongoDB

我在Spark中有一个具有特定模式的流式数据集。当我想计算一个查询时,我调用:StreamingQueryquery=querydf.writeStream().outputMode(OutputMode.Update()).format("console").start();query.awaitTermination();通过这种方式,我可以在控制台中看到每个触发器的查询结果。如何在Mongo中写入结果DataFrame?对于StramingDataset是不可能的。我是否应该在每次触发时将流式Dataset转换为静态Dataset然后保存?我该怎么做?

MongoDB-Spark 连接器在启动后立即关闭连接

我使用的是MongoDB3.4.10版、Spark2.2.1版和MongoDB-Spark连接器2.2.1版。我有一个scalaspark程序,它通过MongoDB-Spark连接器不断轮询MongoDB以获取新数据。我注意到,只有在我们的实时环境中,在第一次或第二次轮询之后程序才会挂起(不会崩溃只是停止,不再有日志输出)。查看日志输出这是最后一行是:18/08/0211:13:59INFOMongoClientCache:ClosingMongoClient:[localhost:27017]18/08/0211:13:59INFOconnection:Closedconnectio

node.js - 在 node-csv-parser : RangeError: Maximum call stack size exceeded 中调用 node-mongodb-native

我正在使用node-csv-parser读取csv数据并使用mongoose将其存储在mongodb中。但是,我正在尝试加快导入速度,并且我想使用node-mongodb-native公开的native保存方法进行评估,使用Model.collection在mongoose中访问.(这是我在Mongo总部的办公时间与一位mongo工程师交谈的建议)。node-csv-parser每次读取csv的新行时都会触发data事件。在这个事件中,我读入了数据行,从中创建了一个新的数据点,并将其保存在mongo中。我可以使用Mongoose模型TestDataPoint将数据点保存在data事件中

jquery - 使用 Node.js 和 async.queue 将大型 CSV 插入 MongoDB

我正在尝试将大型csv文件(100K行;10-100M+)上传并插入到mongo中。下面的代码是我用来接受来自表单的输入并首先将记录插入到我所有csv的元数据集合中,然后将csv的记录插入到它自己的集合中的路径。它适用于较小的文件(数千行),但当它达到50K+的顺序时会花费很长时间。下一个片段是将csv流用于较大的文件(见下文),但在尝试使用该流时出现错误。问题:有人可以帮助将第一个示例修改为流,以便它可以处理大型csv而不会挂起。exports.addCSV=function(req,res){varbody=req.body;fileSystem.renameSync(req.fi

14 | Spark SQL 的 DataFrame API 读取CSV 操作

sales.csv内容date,category,product,full_name,sales2023-01-01,Electronics,Laptop,JohnSmith,1200.02023-01-02,Electronics,Smartphone,JaneDoe,800.02023-01-03,Books,Novel,MichaelJohnson,15.02023-01-04,Electronics,Tablet,EmilyWilson,450.02023-01-05,Books,Textbook,JamesBrown,40.0当使用SparkSQL的DataFrameAPI读取CSV

javascript - 使用 angularjs , nodejs , expressjs 单击按钮后在前端下载 Csv 文件

我想在前端下载.csv文件。这是我的代码:$http.get('/entity/consultations/_/registerationReport').success(function(data){myWindow=window.open('../entity/consultations/_/registerationReport','_parent');myWindow.close();});我使用json2csv转换器写入csv文件。json2csv({data:report,fields:fields},function(err,csv){if(err)throwerr;re