我想从 2 个不同的 Mongo 数据库制作 2 个数据集。我目前正在使用官方 MongoSpark 连接器。 sparkSession 的启动方式如下。
SparkConf sparkConf = new SparkConf().setMaster("yarn").setAppName("test")
.set("spark.mongodb.input.partitioner", "MongoShardedPartitioner")
.set("spark.mongodb.input.uri", "mongodb://192.168.77.62/db1.coll1")
.set("spark.sql.crossJoin.enabled", "true");
SparkSession sparkSession = sparkSession.builder().appName("test1").config(sparkConf).getOrCreate();
如果我想更改 spark.mongodb.input.uri,我该怎么做?我已经尝试更改 sparkSession 的 runtimeConfig 以及将 ReadConfig 与 readOverrides 一起使用,但这些都不起作用。
方法一:
sparkSession.conf().set("spark.mongodb.input.uri", "mongodb://192.168.77.63/db1.coll2");
方法二:
Map<String, String> readOverrides = new HashMap<String, String>();
readoverrides.put("uri","192.168.77.63/db1.coll2");
ReadConfig readConfig = ReadConfig.create(sparkSession).withOptions(readOverrides);
Dataset<Position> ds = MongoSpark.load(sparkSession, readConfig, Position.class);
编辑 1:按照 Karol 的建议,我尝试了以下方法
SparkConf sparkConf = new SparkConf().setMaster("yarn").setAppName("test");
SparkSession sparkSession = SparkSession.builder().appName("test1").config(sparkConf).getOrCreate();
Map<String, String> readOverrides1 = new HashMap<String, String>();
readOverrides1.put("uri", "mongodb://192.168.77.62:27017");
readOverrides1.put("database", "db1");
readOverrides1.put("collection", "coll1");
ReadConfig readConfig = ReadConfig.create(sparkSession).withOptions(readOverrides1);
这在运行时失败说:
Exception in thread "main" java.lang.IllegalArgumentException: Missing database name. Set via the 'spark.mongodb.input.uri' or 'spark.mongodb.input.database' property
编辑 2:
public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder().appName("test")
.config("spark.worker.cleanup.enabled", "true").config("spark.scheduler.mode", "FAIR").getOrCreate();
String mongoURI2 = "mongodb://192.168.77.63:27017/db1.coll1";
Map<String, String> readOverrides1 = new HashMap<String, String>();
readOverrides1.put("uri", mongoURI2);
ReadConfig readConfig1 = ReadConfig.create(sparkSession).withOptions(readOverrides1);
MongoSpark.load(sparkSession,readConfig1,Position.class).show();
}
这仍然给出了与之前编辑相同的异常(exception)。
最佳答案
内置.sbt:
libraryDependencies += "org.mongodb.spark"% "mongo-spark-connector_2.11"% "2.0.0"
package com.example.app
import com.mongodb.spark.config.{ReadConfig, WriteConfig}
import com.mongodb.spark.sql._
object App {
def main(args: Array[String]): Unit = {
val MongoUri1 = args(0).toString
val MongoUri2 = args(1).toString
val SparkMasterUri= args(2).toString
def makeMongoURI(uri:String,database:String,collection:String) = (s"${uri}/${database}.${collection}")
val mongoURI1 = s"mongodb://${MongoUri1}:27017"
val mongoURI2 = s"mongodb://${MongoUri2}:27017"
val CONFdb1 = makeMongoURI(s"${mongoURI1}","MyColletion1,"df")
val CONFdb2 = makeMongoURI(s"${mongoURI2}","MyColletion2,"df")
val WRITEdb1: WriteConfig = WriteConfig(scala.collection.immutable.Map("uri"->CONFdb1))
val READdb1: ReadConfig = ReadConfig(Map("uri" -> CONFdb1))
val WRITEdb2: WriteConfig = WriteConfig(scala.collection.immutable.Map("uri"->CONFdb2))
val READdb2: ReadConfig = ReadConfig(Map("uri" -> CONFdb2))
val spark = SparkSession
.builder
.appName("AppMongo")
.config("spark.worker.cleanup.enabled", "true")
.config("spark.scheduler.mode", "FAIR")
.getOrCreate()
val df1 = spark.read.mongo(READdb1)
val df2 = spark.read.mongo(READdb2)
df1.write.mode("overwrite").mongo(WRITEdb1)
df2.write.mode("overwrite").mongo(WRITEdb2)
}
}
您现在可以将 uri1 和 uri2 传递到 /usr/local/spark/bin/spark-submit pathToMyjar.app.jar MongoUri1 MongoUri2 sparkMasterUri作为参数,然后为每个 uri
config
spark.read.mongo(READdb)
关于mongodb - 从多个 MongoDB 中读取以形成数据集,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42273613/
Rails2.3可以选择随时使用RouteSet#add_configuration_file添加更多路由。是否可以在Rails3项目中做同样的事情? 最佳答案 在config/application.rb中:config.paths.config.routes在Rails3.2(也可能是Rails3.1)中,使用:config.paths["config/routes"] 关于ruby-on-rails-Rails3中的多个路由文件,我们在StackOverflow上找到一个类似的问题
我有多个ActiveRecord子类Item的实例数组,我需要根据最早的事件循环打印。在这种情况下,我需要打印付款和维护日期,如下所示:ItemAmaintenancerequiredin5daysItemBpaymentrequiredin6daysItemApaymentrequiredin7daysItemBmaintenancerequiredin8days我目前有两个查询,用于查找maintenance和payment项目(非排他性查询),并输出如下内容:paymentrequiredin...maintenancerequiredin...有什么方法可以改善上述(丑陋的)代
我正在寻找执行以下操作的正确语法(在Perl、Shell或Ruby中):#variabletoaccessthedatalinesappendedasafileEND_OF_SCRIPT_MARKERrawdatastartshereanditcontinues. 最佳答案 Perl用__DATA__做这个:#!/usr/bin/perlusestrict;usewarnings;while(){print;}__DATA__Texttoprintgoeshere 关于ruby-如何将脚
我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i
我需要从一个View访问多个模型。以前,我的links_controller仅用于提供以不同方式排序的链接资源。现在我想包括一个部分(我假设)显示按分数排序的顶级用户(@users=User.all.sort_by(&:score))我知道我可以将此代码插入每个链接操作并从View访问它,但这似乎不是“ruby方式”,我将需要在不久的将来访问更多模型。这可能会变得很脏,是否有针对这种情况的任何技术?注意事项:我认为我的应用程序正朝着单一格式和动态页面内容的方向发展,本质上是一个典型的网络应用程序。我知道before_filter但考虑到我希望应用程序进入的方向,这似乎很麻烦。最终从任何
好的,所以我的目标是轻松地将一些数据保存到磁盘以备后用。您如何简单地写入然后读取一个对象?所以如果我有一个简单的类classCattr_accessor:a,:bdefinitialize(a,b)@a,@b=a,bendend所以如果我从中非常快地制作一个objobj=C.new("foo","bar")#justgaveitsomerandomvalues然后我可以把它变成一个kindaidstring=obj.to_s#whichreturns""我终于可以将此字符串打印到文件或其他内容中。我的问题是,我该如何再次将这个id变回一个对象?我知道我可以自己挑选信息并制作一个接受该信
我有一个具有一些属性的模型:attr1、attr2和attr3。我需要在不执行回调和验证的情况下更新此属性。我找到了update_column方法,但我想同时更新三个属性。我需要这样的东西:update_columns({attr1:val1,attr2:val2,attr3:val3})代替update_column(attr1,val1)update_column(attr2,val2)update_column(attr3,val3) 最佳答案 您可以使用update_columns(attr1:val1,attr2:val2
我正在尝试修改当前依赖于定义为activeresource的gem:s.add_dependency"activeresource","~>3.0"为了让gem与Rails4一起工作,我需要扩展依赖关系以与activeresource的版本3或4一起工作。我不想简单地添加以下内容,因为它可能会在以后引起问题:s.add_dependency"activeresource",">=3.0"有没有办法指定可接受版本的列表?~>3.0还是~>4.0? 最佳答案 根据thedocumentation,如果你想要3到4之间的所有版本,你可以这
有时我需要处理键/值数据。我不喜欢使用数组,因为它们在大小上没有限制(很容易不小心添加超过2个项目,而且您最终需要稍后验证大小)。此外,0和1的索引变成了魔数(MagicNumber),并且在传达含义方面做得很差(“当我说0时,我的意思是head...”)。散列也不合适,因为可能会不小心添加额外的条目。我写了下面的类来解决这个问题:classPairattr_accessor:head,:taildefinitialize(h,t)@head,@tail=h,tendend它工作得很好并且解决了问题,但我很想知道:Ruby标准库是否已经带有这样一个类? 最佳
我正在尝试按0-9和a-z的顺序创建数字和字母列表。我有一组值value_array=['0','1','2','3','4','5','6','7','8','9','a','b','光盘','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','','u','v','w','x','y','z']和一个组合列表的数组,按顺序,这些数字可以产生x个字符,比方说三个list_array=[]和一个当前字母和数字组合的数组(在将它插入列表数组之前我会把它变成一个字符串,]current_combo['0','0','0']