草庐IT

mongodb - 从多个 MongoDB 中读取以形成数据集

coder 2023-10-30 原文

我想从 2 个不同的 Mongo 数据库制作 2 个数据集。我目前正在使用官方 MongoSpark 连接器。 sparkSession 的启动方式如下。

SparkConf sparkConf = new SparkConf().setMaster("yarn").setAppName("test")
                        .set("spark.mongodb.input.partitioner", "MongoShardedPartitioner")
                        .set("spark.mongodb.input.uri", "mongodb://192.168.77.62/db1.coll1")
                        .set("spark.sql.crossJoin.enabled", "true");
SparkSession sparkSession = sparkSession.builder().appName("test1").config(sparkConf).getOrCreate();

如果我想更改 spark.mongodb.input.uri,我该怎么做?我已经尝试更改 sparkSession 的 runtimeConfig 以及将 ReadConfig 与 readOverrides 一起使用,但这些都不起作用。

方法一:

sparkSession.conf().set("spark.mongodb.input.uri", "mongodb://192.168.77.63/db1.coll2");

方法二:

Map<String, String> readOverrides = new HashMap<String, String>();
readoverrides.put("uri","192.168.77.63/db1.coll2");
ReadConfig readConfig = ReadConfig.create(sparkSession).withOptions(readOverrides);
Dataset<Position> ds = MongoSpark.load(sparkSession, readConfig, Position.class);

编辑 1:按照 Karol 的建议,我尝试了以下方法

  SparkConf sparkConf = new SparkConf().setMaster("yarn").setAppName("test");
SparkSession sparkSession = SparkSession.builder().appName("test1").config(sparkConf).getOrCreate();
    Map<String, String> readOverrides1 = new HashMap<String, String>();
            readOverrides1.put("uri", "mongodb://192.168.77.62:27017");
            readOverrides1.put("database", "db1");
            readOverrides1.put("collection", "coll1");
            ReadConfig readConfig = ReadConfig.create(sparkSession).withOptions(readOverrides1);

这在运行时失败说:

Exception in thread "main" java.lang.IllegalArgumentException: Missing database name. Set via the 'spark.mongodb.input.uri' or 'spark.mongodb.input.database' property

编辑 2:

public static void main(String[] args) {
    SparkSession sparkSession = SparkSession.builder().appName("test")
            .config("spark.worker.cleanup.enabled", "true").config("spark.scheduler.mode", "FAIR").getOrCreate();
    String mongoURI2 = "mongodb://192.168.77.63:27017/db1.coll1";
    Map<String, String> readOverrides1 = new HashMap<String, String>();
    readOverrides1.put("uri", mongoURI2);
    ReadConfig readConfig1 = ReadConfig.create(sparkSession).withOptions(readOverrides1);
    MongoSpark.load(sparkSession,readConfig1,Position.class).show();
}

这仍然给出了与之前编辑相同的异常(exception)。

最佳答案

内置.sbt: libraryDependencies += "org.mongodb.spark"% "mongo-spark-connector_2.11"% "2.0.0"

 package com.example.app

 import com.mongodb.spark.config.{ReadConfig, WriteConfig}
 import com.mongodb.spark.sql._

object App {


 def main(args: Array[String]): Unit = {

    val MongoUri1 = args(0).toString
    val MongoUri2 = args(1).toString
    val SparkMasterUri= args(2).toString

     def makeMongoURI(uri:String,database:String,collection:String) = (s"${uri}/${database}.${collection}")

   val mongoURI1 = s"mongodb://${MongoUri1}:27017"
   val mongoURI2 = s"mongodb://${MongoUri2}:27017"

   val CONFdb1 = makeMongoURI(s"${mongoURI1}","MyColletion1,"df")
   val CONFdb2 = makeMongoURI(s"${mongoURI2}","MyColletion2,"df")

   val WRITEdb1: WriteConfig =  WriteConfig(scala.collection.immutable.Map("uri"->CONFdb1))
   val READdb1: ReadConfig = ReadConfig(Map("uri" -> CONFdb1))

   val WRITEdb2: WriteConfig =  WriteConfig(scala.collection.immutable.Map("uri"->CONFdb2))
   val READdb2: ReadConfig = ReadConfig(Map("uri" -> CONFdb2))

   val spark = SparkSession
  .builder
  .appName("AppMongo")
  .config("spark.worker.cleanup.enabled", "true")
  .config("spark.scheduler.mode", "FAIR")
  .getOrCreate()

   val df1 = spark.read.mongo(READdb1)
   val df2 = spark.read.mongo(READdb2)
   df1.write.mode("overwrite").mongo(WRITEdb1)
   df2.write.mode("overwrite").mongo(WRITEdb2)

 }

}

您现在可以将 uri1uri2 传递到 /usr/local/spark/bin/spark-submit pathToMyjar.app.jar MongoUri1 MongoUri2 sparkMasterUri作为参数,然后为每个 uri

创建 config
spark.read.mongo(READdb)

关于mongodb - 从多个 MongoDB 中读取以形成数据集,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42273613/

有关mongodb - 从多个 MongoDB 中读取以形成数据集的更多相关文章

  1. ruby-on-rails - Rails 3 中的多个路由文件 - 2

    Rails2.3可以选择随时使用RouteSet#add_configuration_file添加更多路由。是否可以在Rails3项目中做同样的事情? 最佳答案 在config/application.rb中:config.paths.config.routes在Rails3.2(也可能是Rails3.1)中,使用:config.paths["config/routes"] 关于ruby-on-rails-Rails3中的多个路由文件,我们在StackOverflow上找到一个类似的问题

  2. ruby-on-rails - 在 Ruby 中循环遍历多个数组 - 2

    我有多个ActiveRecord子类Item的实例数组,我需要根据最早的事件循环打印。在这种情况下,我需要打印付款和维护日期,如下所示:ItemAmaintenancerequiredin5daysItemBpaymentrequiredin6daysItemApaymentrequiredin7daysItemBmaintenancerequiredin8days我目前有两个查询,用于查找maintenance和payment项目(非排他性查询),并输出如下内容:paymentrequiredin...maintenancerequiredin...有什么方法可以改善上述(丑陋的)代

  3. ruby - 如何将脚本文件的末尾读取为数据文件(Perl 或任何其他语言) - 2

    我正在寻找执行以下操作的正确语法(在Perl、Shell或Ruby中):#variabletoaccessthedatalinesappendedasafileEND_OF_SCRIPT_MARKERrawdatastartshereanditcontinues. 最佳答案 Perl用__DATA__做这个:#!/usr/bin/perlusestrict;usewarnings;while(){print;}__DATA__Texttoprintgoeshere 关于ruby-如何将脚

  4. ruby - 解析 RDFa、微数据等的最佳方式是什么,使用统一的模式/词汇(例如 schema.org)存储和显示信息 - 2

    我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i

  5. ruby-on-rails - Rails - 一个 View 中的多个模型 - 2

    我需要从一个View访问多个模型。以前,我的links_controller仅用于提供以不同方式排序的链接资源。现在我想包括一个部分(我假设)显示按分数排序的顶级用户(@users=User.all.sort_by(&:score))我知道我可以将此代码插入每个链接操作并从View访问它,但这似乎不是“ruby方式”,我将需要在不久的将来访问更多模型。这可能会变得很脏,是否有针对这种情况的任何技术?注意事项:我认为我的应用程序正朝着单一格式和动态页面内容的方向发展,本质上是一个典型的网络应用程序。我知道before_filter但考虑到我希望应用程序进入的方向,这似乎很麻烦。最终从任何

  6. Ruby 写入和读取对象到文件 - 2

    好的,所以我的目标是轻松地将一些数据保存到磁盘以备后用。您如何简单地写入然后读取一个对象?所以如果我有一个简单的类classCattr_accessor:a,:bdefinitialize(a,b)@a,@b=a,bendend所以如果我从中非常快地制作一个objobj=C.new("foo","bar")#justgaveitsomerandomvalues然后我可以把它变成一个kindaidstring=obj.to_s#whichreturns""我终于可以将此字符串打印到文件或其他内容中。我的问题是,我该如何再次将这个id变回一个对象?我知道我可以自己挑选信息并制作一个接受该信

  7. ruby - 多个属性的 update_column 方法 - 2

    我有一个具有一些属性的模型:attr1、attr2和attr3。我需要在不执行回调和验证的情况下更新此属性。我找到了update_column方法,但我想同时更新三个属性。我需要这样的东西:update_columns({attr1:val1,attr2:val2,attr3:val3})代替update_column(attr1,val1)update_column(attr2,val2)update_column(attr3,val3) 最佳答案 您可以使用update_columns(attr1:val1,attr2:val2

  8. ruby-on-rails - 在 ruby​​ .gemspec 文件中,如何指定依赖项的多个版本? - 2

    我正在尝试修改当前依赖于定义为activeresource的gem:s.add_dependency"activeresource","~>3.0"为了让gem与Rails4一起工作,我需要扩展依赖关系以与activeresource的版本3或4一起工作。我不想简单地添加以下内容,因为它可能会在以后引起问题:s.add_dependency"activeresource",">=3.0"有没有办法指定可接受版本的列表?~>3.0还是~>4.0? 最佳答案 根据thedocumentation,如果你想要3到4之间的所有版本,你可以这

  9. ruby - Ruby 有 `Pair` 数据类型吗? - 2

    有时我需要处理键/值数据。我不喜欢使用数组,因为它们在大小上没有限制(很容易不小心添加超过2个项目,而且您最终需要稍后验证大小)。此外,0和1的索引变成了魔数(MagicNumber),并且在传达含义方面做得很差(“当我说0时,我的意思是head...”)。散列也不合适,因为可能会不小心添加额外的条目。我写了下面的类来解决这个问题:classPairattr_accessor:head,:taildefinitialize(h,t)@head,@tail=h,tendend它工作得很好并且解决了问题,但我很想知道:Ruby标准库是否已经带有这样一个类? 最佳

  10. ruby - 使用多个数组创建计数 - 2

    我正在尝试按0-9和a-z的顺序创建数字和字母列表。我有一组值value_array=['0','1','2','3','4','5','6','7','8','9','a','b','光盘','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','','u','v','w','x','y','z']和一个组合列表的数组,按顺序,这些数字可以产生x个字符,比方说三个list_array=[]和一个当前字母和数字组合的数组(在将它插入列表数组之前我会把它变成一个字符串,]current_combo['0','0','0']

随机推荐