草庐IT

mongodb - 将 mongodb oplog.rs 加载到 spark dataframe

coder 2023-11-07 原文

我正在尝试将 MongoDB 中的 oplog.rs 加载到 spark DataFrame 中,它加载了元数据并通过 printSchema 函数对其进行了验证,但是当我尝试执行诸如 show 或 count 之类的操作时它给了我这个错误 scala.MatchError: ((BsonMinKey,null),0) (of class scala.Tuple2)。我也尝试将其注册为 temptable,但仍然出现相同的错误。

val customReadConfig = ReadConfig(Map(
  "uri" -> 
    "mongodb://username:password@host_name:port/local.oplog.rs?authSource=xxxxx"
))

val dataframe = sqlContext.read.format("com.mongodb.spark.sql").
  options(customReadConfig.asOptions).load

最佳答案

为了后代:

Mongo >= 3.2 版本的默认分区器是 MongoSamplePartitioner,它使用(像所有其他分区器一样)partitionKey 并且在创建分区时使用BsonMinKeyBsonMaxKey 来定义每个分区的边界。您遇到的匹配错误可能发生在此处:

  def createPartitions(partitionKey: String, splitKeys: Seq[BsonValue], 
      locations: Seq[String] = Nil, addMinMax: Boolean = true): 
      Array[MongoPartition] = {
        val minKeyMaxKeys = (new BsonMinKey(), new BsonMaxKey())
        val minToMaxSplitKeys: Seq[BsonValue] = if (addMinMax) minKeyMaxKeys._1 +: splitKeys :+ minKeyMaxKeys._2 else splitKeys
        val minToMaxKeysToPartition = if (minToMaxSplitKeys.length == 1) minToMaxSplitKeys else minToMaxSplitKeys.tail
        val partitionPairs: Seq[(BsonValue, BsonValue)] = minToMaxSplitKeys zip minToMaxKeysToPartition
        partitionPairs.zipWithIndex.map({
           case ((min: BsonValue, max: BsonValue), i: Int) => MongoPartition(i, createBoundaryQuery(partitionKey, min, max), locations)
      }).toArray
}

该错误告诉您的是您的 max 被设置为 null,正如您在代码中看到的那样,只处理了一种情况。如果您没有设置要使用的partitionKey,分区程序将默认使用_id,您可以阅读它here

默认情况下,oplog.rs 集合没有_id 键,oplog 记录的唯一id 是惊人的h ,它是一个数字。因此,为了让分区程序做正确的事情,您需要在 SparkConfReadConfig 中设置 spark.mongodb.input.partitionerOptions.partitionKeyh

 new SparkConf()
   //all of your other settings
   .set("spark.mongodb.input.partitionerOptions.partitionKey", "h")

关于mongodb - 将 mongodb oplog.rs 加载到 spark dataframe,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42584984/

有关mongodb - 将 mongodb oplog.rs 加载到 spark dataframe的更多相关文章

  1. ruby - 如何在续集中重新加载表模式? - 2

    鉴于我有以下迁移:Sequel.migrationdoupdoalter_table:usersdoadd_column:is_admin,:default=>falseend#SequelrunsaDESCRIBEtablestatement,whenthemodelisloaded.#Atthispoint,itdoesnotknowthatusershaveais_adminflag.#Soitfails.@user=User.find(:email=>"admin@fancy-startup.example")@user.is_admin=true@user.save!ende

  2. ruby - RuntimeError(自动加载常量 Apps 多线程时检测到循环依赖 - 2

    我收到这个错误:RuntimeError(自动加载常量Apps时检测到循环依赖当我使用多线程时。下面是我的代码。为什么会这样?我尝试多线程的原因是因为我正在编写一个HTML抓取应用程序。对Nokogiri::HTML(open())的调用是一个同步阻塞调用,需要1秒才能返回,我有100,000多个页面要访问,所以我试图运行多个线程来解决这个问题。有更好的方法吗?classToolsController0)app.website=array.join(',')putsapp.websiteelseapp.website="NONE"endapp.saveapps=Apps.order("

  3. ruby-on-rails - 使用 config.threadsafe 时从 lib/加载模块/类的正确方法是什么!选项? - 2

    我一直致力于让我们的Rails2.3.8应用程序在JRuby下正确运行。一切正常,直到我启用config.threadsafe!以实现JRuby提供的并发性。这导致lib/中的模块和类不再自动加载。使用config.threadsafe!启用:$rubyscript/runner-eproduction'pSim::Sim200Provisioner'/Users/amchale/.rvm/gems/jruby-1.5.1@web-services/gems/activesupport-2.3.8/lib/active_support/dependencies.rb:105:in`co

  4. ruby-on-rails - 从应用程序中自定义文件夹内的命名空间自动加载 - 2

    我们目前正在为ROR3.2开发自定义cms引擎。在这个过程中,我们希望成为我们的rails应用程序中的一等公民的几个类类型起源,这意味着它们应该驻留在应用程序的app文件夹下,它是插件。目前我们有以下类型:数据源数据类型查看我在app文件夹下创建了多个目录来保存这些:应用/数据源应用/数据类型应用/View更多类型将随之而来,我有点担心应用程序文件夹被这么多目录污染。因此,我想将它们移动到一个子目录/模块中,该子目录/模块包含cms定义的所有类型。所有类都应位于MyCms命名空间内,目录布局应如下所示:应用程序/my_cms/data_source应用程序/my_cms/data_ty

  5. ruby-on-rails - 使用 gmaps4rails 动态加载谷歌地图标记 - 2

    如何只加载map边界内的标记gmaps4rails?当然,在平移和/或缩放后加载新的。与此直接相关的是,如何获取map的当前边界和缩放级别? 最佳答案 我是这样做的,我只在用户完成平移或缩放后替换标记,如果您需要不同的行为,请使用不同的事件监听器:在你看来(index.html.erb):{"zoom"=>15,"auto_adjust"=>false,"detect_location"=>true,"center_on_user"=>true}},false,true)%>在View的底部添加:functiongmaps4rail

  6. ruby-on-rails - 是否可以让 ActiveRecord 为使用 :joins option? 加载的行创建对象 - 2

    我需要做这样的事情classUser'User',:foreign_key=>'abuser_id'belongs_to:gameendclassGame['JOINabuse_reportsONusers.id=abuse_reports.abuser_id','JOINgamesONgames.id=abuse_reports.game_id'],:group=>'users.id',:select=>'users.*,count(distinctgames.id)ASgame_count,count(abuse_reports.id)asabuse_report_count',:

  7. ruby - 运行 rackup private_pub.ru -s thin -E production 命令时无法加载此类文件 -- thin (LoadError) - 2

    我指的是pubrailscasttutorial并已正确执行所有步骤,但在运行最后一个命令时,即rackupprivate_pub.ru-sthin-Eproduction为了架设faye服务器,我收到以下错误:/usr/lib/ruby/1.9.1/rubygems/custom_require.rb:36:in`require':cannotloadsuchfile--thin(LoadError)from/usr/lib/ruby/1.9.1/rubygems/custom_require.rb:36:in`require'from/var/lib/gems/1.9.1/gems

  8. ruby - libxml-ruby 无法在 x86_64 上加载 - 2

    我们在服务器端遇到libxml-rubygem的问题可能是因为它使用x86_64架构:$uname-aLinuxip-10-228-171-642.6.21.7-2.fc8xen-ec2-v1.0#1SMPTueSep110:25:30EDT2009x86_64GNU/Linuxrequire'libxml'LoadError:/usr/local/ruby-enterprise/lib/ruby/gems/1.8/gems/libxml-ruby-1.1.4/lib/libxml_ruby.so:invalidELFheader-/usr/local/ruby-enterprise/

  9. Ruby 不从 stdlib 加载 CSV - 2

    我不太确定为什么这不起作用,我一直在寻找解决方案。很简单,我正在运行一个执行require'CSV'的小脚本。,它在我的Mac1.9.3-p327上运行良好,但在p374上的服务器上无法运行。我得到的错误是/home/deployer/.rbenv/versions/1.9.3-p374/lib/ruby/1.9.1/rubygems/custom_require.rb:36:inrequire':cannotloadsuchfile--CSV(LoadError)from/home/deployer/.rbenv/versions/1.9.3-p374/lib/ruby/1.9.1/

  10. ruby - 无法加载此类文件——脚本/rails : Getting this error while remote debugging through RubyMine - 2

    我在通过RubyMineIDE进行远程调试时遇到以下错误。$bundleexecrdebug-ide--port1234--script/railsserverFastDebugger(ruby-debug-ide0.4.9)listenson:1234/home/amit/.rvm/gems/ruby-1.9.3-p125/gems/ruby-debug-ide19-0.4.12/lib/ruby-debug-ide.rb:123:in`debug_load'/home/amit/.rvm/gems/ruby-1.9.3-p125/gems/ruby-debug-ide19-0.4.

随机推荐