草庐IT

hadoop - Spark Streaming - HBase 批量加载

coder 2024-01-07 原文

我目前正在使用 Python 将 CSV 数据批量加载到 HBase 表中,目前我在使用 saveAsNewAPIHadoopFile 编写适当的 HFile 时遇到了问题

我的代码目前如下所示:

def csv_to_key_value(row):
    cols = row.split(",")
    result = ((cols[0], [cols[0], "f1", "c1", cols[1]]),
              (cols[0], [cols[0], "f2", "c2", cols[2]]),
              (cols[0], [cols[0], "f3", "c3", cols[3]]))
    return result

def bulk_load(rdd):
    conf = {#Ommitted to simplify}

    keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
    valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"

    load_rdd = rdd.flatMap(lambda line: line.split("\n"))\
                  .flatMap(csv_to_key_value)
    if not load_rdd.isEmpty():
        load_rdd.saveAsNewAPIHadoopFile("file:///tmp/hfiles" + startTime,
                                        "org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2",
                                        conf=conf,
                                        keyConverter=keyConv,
                                        valueConverter=valueConv)
    else:
        print("Nothing to process")

当我运行这段代码时,出现以下错误:

java.io.IOException:添加了一个词法上不大于以前的键。当前单元格 = 10/f1:c1/1453891407213/Minimum/vlen=1/seqid=0,lastCell =/f1:c1/1453891407212/Minimum/vlen=1/seqid=0 在 org.apache.hadoop.hbase.io。 hfile.AbstractHFileWriter.checkKey(AbstractHFileWriter.java:204)

由于错误表明是键的问题,所以我从我的 RDD 中抓取了元素,它们如下(为了便于阅读而格式化)

[(u'1', [u'1', 'f1', 'c1', u'A']),
 (u'1', [u'1', 'f2', 'c2', u'1A']),
 (u'1', [u'1', 'f3', 'c3', u'10']),
 (u'2', [u'2', 'f1', 'c1', u'B']),
 (u'2', [u'2', 'f2', 'c2', u'2B']),
 (u'2', [u'2', 'f3', 'c3', u'9']),

. . .

 (u'9', [u'9', 'f1', 'c1', u'I']),
 (u'9', [u'9', 'f2', 'c2', u'3C']),
 (u'9', [u'9', 'f3', 'c3', u'2']),
 (u'10', [u'10', 'f1', 'c1', u'J']),
 (u'10', [u'10', 'f2', 'c2', u'1A']),
 (u'10', [u'10', 'f3', 'c3', u'1'])]

这与我的 CSV 完全匹配,顺序正确。据我了解,在 HBase 中,一个键由 {row, family, timestamp} 定义。行和族的组合对于我数据中的所有条目来说都是唯一且单调递增的,而且我无法控制时间戳(这是我能想象到的唯一问题)

谁能告诉我如何避免/预防此类问题?

最佳答案

嗯,这只是我的一个愚蠢错误,我觉得有点愚蠢。按字典顺序,顺序应为 1、10、2、3 ... 8、9。在加载前保证正确排序的最简单方法是:

rdd.sortByKey(true);

我希望我至少可以让一个人摆脱我的头痛。

关于hadoop - Spark Streaming - HBase 批量加载,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35035493/

有关hadoop - Spark Streaming - HBase 批量加载的更多相关文章

  1. ruby - 如何在续集中重新加载表模式? - 2

    鉴于我有以下迁移:Sequel.migrationdoupdoalter_table:usersdoadd_column:is_admin,:default=>falseend#SequelrunsaDESCRIBEtablestatement,whenthemodelisloaded.#Atthispoint,itdoesnotknowthatusershaveais_adminflag.#Soitfails.@user=User.find(:email=>"admin@fancy-startup.example")@user.is_admin=true@user.save!ende

  2. ruby - RuntimeError(自动加载常量 Apps 多线程时检测到循环依赖 - 2

    我收到这个错误:RuntimeError(自动加载常量Apps时检测到循环依赖当我使用多线程时。下面是我的代码。为什么会这样?我尝试多线程的原因是因为我正在编写一个HTML抓取应用程序。对Nokogiri::HTML(open())的调用是一个同步阻塞调用,需要1秒才能返回,我有100,000多个页面要访问,所以我试图运行多个线程来解决这个问题。有更好的方法吗?classToolsController0)app.website=array.join(',')putsapp.websiteelseapp.website="NONE"endapp.saveapps=Apps.order("

  3. ruby-on-rails - 使用 config.threadsafe 时从 lib/加载模块/类的正确方法是什么!选项? - 2

    我一直致力于让我们的Rails2.3.8应用程序在JRuby下正确运行。一切正常,直到我启用config.threadsafe!以实现JRuby提供的并发性。这导致lib/中的模块和类不再自动加载。使用config.threadsafe!启用:$rubyscript/runner-eproduction'pSim::Sim200Provisioner'/Users/amchale/.rvm/gems/jruby-1.5.1@web-services/gems/activesupport-2.3.8/lib/active_support/dependencies.rb:105:in`co

  4. ruby-on-rails - 从应用程序中自定义文件夹内的命名空间自动加载 - 2

    我们目前正在为ROR3.2开发自定义cms引擎。在这个过程中,我们希望成为我们的rails应用程序中的一等公民的几个类类型起源,这意味着它们应该驻留在应用程序的app文件夹下,它是插件。目前我们有以下类型:数据源数据类型查看我在app文件夹下创建了多个目录来保存这些:应用/数据源应用/数据类型应用/View更多类型将随之而来,我有点担心应用程序文件夹被这么多目录污染。因此,我想将它们移动到一个子目录/模块中,该子目录/模块包含cms定义的所有类型。所有类都应位于MyCms命名空间内,目录布局应如下所示:应用程序/my_cms/data_source应用程序/my_cms/data_ty

  5. hadoop安装之保姆级教程(二)之YARN的配置 - 2

    1.1.1 YARN的介绍 为克服Hadoop1.0中HDFS和MapReduce存在的各种问题⽽提出的,针对Hadoop1.0中的MapReduce在扩展性和多框架⽀持⽅⾯的不⾜,提出了全新的资源管理框架YARN. ApacheYARN(YetanotherResourceNegotiator的缩写)是Hadoop集群的资源管理系统,负责为计算程序提供服务器计算资源,相当于⼀个分布式的操作系统平台,⽽MapReduce等计算程序则相当于运⾏于操作系统之上的应⽤程序。 YARN被引⼊Hadoop2,最初是为了改善MapReduce的实现,但是因为具有⾜够的通⽤性,同样可以⽀持其他的分布式计算模

  6. HBase Region 简介和建议数量&大小 - 2

    Region是HBase数据管理的基本单位,region有一点像关系型数据的分区。region中存储这用户的真实数据,而为了管理这些数据,HBase使用了RegionSever来管理region。Region的结构hbaseregion的大小设置默认情况下,每个Table起初只有一个Region,随着数据的不断写入,Region会自动进行拆分。刚拆分时,两个子Region都位于当前的RegionServer,但处于负载均衡的考虑,HMaster有可能会将某个Region转移给其他的RegionServer。RegionSplit时机:当1个region中的某个Store下所有StoreFile

  7. ruby-on-rails - 使用 gmaps4rails 动态加载谷歌地图标记 - 2

    如何只加载map边界内的标记gmaps4rails?当然,在平移和/或缩放后加载新的。与此直接相关的是,如何获取map的当前边界和缩放级别? 最佳答案 我是这样做的,我只在用户完成平移或缩放后替换标记,如果您需要不同的行为,请使用不同的事件监听器:在你看来(index.html.erb):{"zoom"=>15,"auto_adjust"=>false,"detect_location"=>true,"center_on_user"=>true}},false,true)%>在View的底部添加:functiongmaps4rail

  8. ruby-on-rails - 是否可以让 ActiveRecord 为使用 :joins option? 加载的行创建对象 - 2

    我需要做这样的事情classUser'User',:foreign_key=>'abuser_id'belongs_to:gameendclassGame['JOINabuse_reportsONusers.id=abuse_reports.abuser_id','JOINgamesONgames.id=abuse_reports.game_id'],:group=>'users.id',:select=>'users.*,count(distinctgames.id)ASgame_count,count(abuse_reports.id)asabuse_report_count',:

  9. ruby - 运行 rackup private_pub.ru -s thin -E production 命令时无法加载此类文件 -- thin (LoadError) - 2

    我指的是pubrailscasttutorial并已正确执行所有步骤,但在运行最后一个命令时,即rackupprivate_pub.ru-sthin-Eproduction为了架设faye服务器,我收到以下错误:/usr/lib/ruby/1.9.1/rubygems/custom_require.rb:36:in`require':cannotloadsuchfile--thin(LoadError)from/usr/lib/ruby/1.9.1/rubygems/custom_require.rb:36:in`require'from/var/lib/gems/1.9.1/gems

  10. ruby - libxml-ruby 无法在 x86_64 上加载 - 2

    我们在服务器端遇到libxml-rubygem的问题可能是因为它使用x86_64架构:$uname-aLinuxip-10-228-171-642.6.21.7-2.fc8xen-ec2-v1.0#1SMPTueSep110:25:30EDT2009x86_64GNU/Linuxrequire'libxml'LoadError:/usr/local/ruby-enterprise/lib/ruby/gems/1.8/gems/libxml-ruby-1.1.4/lib/libxml_ruby.so:invalidELFheader-/usr/local/ruby-enterprise/

随机推荐