草庐IT

json - Apache Spark 和 MongoDB Hadoop-Connector 创建的 BSON 结构

coder 2023-10-31 原文

我正在尝试使用 MongoDB Hadoop-Connector 将一些 JSON 从 Spark (Scala) 保存到 MongoDB。我遇到的问题是这个 API 似乎总是将您的数据保存为“{_id: ..., value: {your JSON document}}”。

在下面的代码示例中,我的文档是这样保存的:

{
    "_id" : ObjectId("55e80cfea9fbee30aa703261"),
    "value" : {
        "_id" : "55e6c65da9fbee285f2f9175",
        "year" : 2014,
        "month" : 5,
        "day" : 6,
        "hour" : 18,
        "user_id" : 246
    }
}

有什么方法可以说服 MongoDB Hadoop 连接器以您指定的结构编写 JSON/BSON,而不是将其嵌套在这些 _id/value 字段下?

这是我的 Scala Spark 代码:

  val jsonstr = List("""{
        "_id" : "55e6c65da9fbee285f2f9175",
        "year" : 2014,
        "month" : 5,
        "day" : 6,
        "hour" : 18,
        "user_id" : 246}""")

  val conf = new SparkConf().setAppName("Mongo Dummy").setMaster("local[*]")
  val sc = new SparkContext(conf)

  // DB params
  val host = "127.0.0.1"
  val port = "27017"
  val database = "dummy"
  val collection = "fubar"

  // input is collection we want to read (not doing so here)
  val mongo_input = s"mongodb://$host/$database.$collection"
  // output is collection we want to write
  val mongo_output = s"mongodb://$host/$database.$collection"

  // Set up extra config for Hadoop connector
  val hadoopConfig = new Configuration()

  //hadoopConfig.set("mongo.input.uri", mongo_input)
  hadoopConfig.set("mongo.output.uri", mongo_output)

  // convert JSON to RDD
  val rdd = sc.parallelize(jsonstr)

  // write JSON data to DB
  val saveRDD = rdd.map { json =>
    (null, Document.parse(json))
  }

  saveRDD.saveAsNewAPIHadoopFile("file:///bogus",
    classOf[Object],
    classOf[BSONObject],
    classOf[MongoOutputFormat[Object, BSONObject]],
    hadoopConfig)
  // Finished  
  sc.stop

这是我的 SBT:

name := "my-mongo-test"

version := "1.0"

scalaVersion := "2.10.4"

// Spark needs to appear in SBT BEFORE Mongodb connector!
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.4.0"

// MongoDB-Hadoop connector
libraryDependencies += "org.mongodb.mongo-hadoop" % "mongo-hadoop-core" % "1.4.0"

老实说,我对从 Spark 中保存 JSON --> BSON --> MongoDB 似乎有多难感到困惑。因此,欢迎就如何更灵活地保存我的 JSON 数据提出任何建议。

最佳答案

好吧,我刚刚找到了解决方案。结果是 MongoOutputFormat 使用的 MongoRecordWriter 插入了任何继承自 BSONWritable 的值>MongoOutputvalue 字段下的 BSONObject

因此,最简单的解决方案是创建包含 BSONObject 作为值的 RDD,而不是 Document

我在 Java 中尝试过这个解决方案,但我确信它在 Scala 中也能正常工作。这是一个示例代码:

JavaPairRDD<Object, BSONObject> bsons = values.mapToPair(lineValues -> {
    BSONObject doc = new BasicBSONObject();
    doc.put("field1", lineValues.get(0));
    doc.put("field2", lineValues.get(1));
    return new Tuple2<Object, BSONObject>(UUID.randomUUID().toString(), doc);
});

Configuration outputConfig = new Configuration();
outputConfig.set("mongo.output.uri",
                 "mongodb://localhost:27017/my_db.lines");
bsons.saveAsNewAPIHadoopFile("file:///this-is-completely-unused"
        , Object.class
        , BSONObject.class
        , MongoOutputFormat.class
        , outputConfig);

关于json - Apache Spark 和 MongoDB Hadoop-Connector 创建的 BSON 结构,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32371620/

有关json - Apache Spark 和 MongoDB Hadoop-Connector 创建的 BSON 结构的更多相关文章

  1. ruby - 如何在 Ruby 中顺序创建 PI - 2

    出于纯粹的兴趣,我很好奇如何按顺序创建PI,而不是在过程结果之后生成数字,而是让数字在过程本身生成时显示。如果是这种情况,那么数字可以自行产生,我可以对以前看到的数字实现垃圾收集,从而创建一个无限系列。结果只是在Pi系列之后每秒生成一个数字。这是我通过互联网筛选的结果:这是流行的计算机友好算法,类机器算法:defarccot(x,unity)xpow=unity/xn=1sign=1sum=0loopdoterm=xpow/nbreakifterm==0sum+=sign*(xpow/n)xpow/=x*xn+=2sign=-signendsumenddefcalc_pi(digits

  2. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

  3. ruby - 使用 ruby​​ 将 HTML 转换为纯文本并维护结构/格式 - 2

    我想将html转换为纯文本。不过,我不想只删除标签,我想智能地保留尽可能多的格式。为插入换行符标签,检测段落并格式化它们等。输入非常简单,通常是格式良好的html(不是整个文档,只是一堆内容,通常没有anchor或图像)。我可以将几个正则表达式放在一起,让我达到80%,但我认为可能有一些现有的解决方案更智能。 最佳答案 首先,不要尝试为此使用正则表达式。很有可能你会想出一个脆弱/脆弱的解决方案,它会随着HTML的变化而崩溃,或者很难管理和维护。您可以使用Nokogiri快速解析HTML并提取文本:require'nokogiri'h

  4. ruby - 使用 Vim Rails,您可以创建一个新的迁移文件并一次性打开它吗? - 2

    使用带有Rails插件的vim,您可以创建一个迁移文件,然后一次性打开该文件吗?textmate也可以这样吗? 最佳答案 你可以使用rails.vim然后做类似的事情::Rgeneratemigratonadd_foo_to_bar插件将打开迁移生成的文件,这正是您想要的。我不能代表textmate。 关于ruby-使用VimRails,您可以创建一个新的迁移文件并一次性打开它吗?,我们在StackOverflow上找到一个类似的问题: https://sta

  5. ruby-on-rails - 无法使用 Rails 3.2 创建插件? - 2

    我对最新版本的Rails有疑问。我创建了一个新应用程序(railsnewMyProject),但我没有脚本/生成,只有脚本/rails,当我输入ruby./script/railsgeneratepluginmy_plugin"Couldnotfindgeneratorplugin.".你知道如何生成插件模板吗?没有这个命令可以创建插件吗?PS:我正在使用Rails3.2.1和ruby​​1.8.7[universal-darwin11.0] 最佳答案 随着Rails3.2.0的发布,插件生成器已经被移除。查看变更日志here.现在

  6. ruby - 如何使用 RSpec::Core::RakeTask 创建 RSpec Rake 任务? - 2

    如何使用RSpec::Core::RakeTask初始化RSpecRake任务?require'rspec/core/rake_task'RSpec::Core::RakeTask.newdo|t|#whatdoIputinhere?endInitialize函数记录在http://rubydoc.info/github/rspec/rspec-core/RSpec/Core/RakeTask#initialize-instance_method没有很好的记录;它只是说:-(RakeTask)initialize(*args,&task_block)AnewinstanceofRake

  7. ruby-on-rails - Rails HTML 请求渲染 JSON - 2

    在我的Controller中,我通过以下方式在我的index方法中支持HTML和JSON:respond_todo|format|format.htmlformat.json{renderjson:@user}end在浏览器中拉起它时,它会自然地以HTML呈现。但是,当我对/user资源进行内容类型为application/json的curl调用时(因为它是索引方法),我仍然将HTML作为响应。如何获取JSON作为响应?我还需要说明什么? 最佳答案 您应该将.json附加到请求的url,提供的格式在routes.rb的路径中定义。这

  8. ruby - 为什么 SecureRandom.uuid 创建一个唯一的字符串? - 2

    关闭。这个问题需要detailsorclarity.它目前不接受答案。想改进这个问题吗?通过editingthispost添加细节并澄清问题.关闭8年前。Improvethisquestion为什么SecureRandom.uuid创建一个唯一的字符串?SecureRandom.uuid#=>"35cb4e30-54e1-49f9-b5ce-4134799eb2c0"SecureRandom.uuid方法创建的字符串从不重复?

  9. ruby - 有人可以帮助解释类创建的 post_initialize 回调吗 (Sandi Metz) - 2

    我正在阅读SandiMetz的POODR,并且遇到了一个我不太了解的编码原则。这是代码:classBicycleattr_reader:size,:chain,:tire_sizedefinitialize(args={})@size=args[:size]||1@chain=args[:chain]||2@tire_size=args[:tire_size]||3post_initialize(args)endendclassMountainBike此代码将为其各自的属性输出1,2,3,4,5。我不明白的是查找方法。当一辆山地自行车被实例化时,因为它没有自己的initialize方法

  10. ruby - 使用多个数组创建计数 - 2

    我正在尝试按0-9和a-z的顺序创建数字和字母列表。我有一组值value_array=['0','1','2','3','4','5','6','7','8','9','a','b','光盘','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','','u','v','w','x','y','z']和一个组合列表的数组,按顺序,这些数字可以产生x个字符,比方说三个list_array=[]和一个当前字母和数字组合的数组(在将它插入列表数组之前我会把它变成一个字符串,]current_combo['0','0','0']

随机推荐