草庐IT

mysql - Sqoop 增量加载 lastmodified 不适用于更新的记录

coder 2023-10-24 原文

我正在做一个 sqoop 增量作业,将数据从 mysql 加载到 hdfs。以下是以下场景。

场景 1: 下面是插入到mysql示例表中的记录。

select * from sample;
+-----+--------+--------+---------------------+
| id  | policy | salary | updated_time        |
+-----+--------+--------+---------------------+
| 100 |      1 |   4567 | 2017-08-02 01:58:28 |
| 200 |      2 |   3456 | 2017-08-02 01:58:29 |
| 300 |      3 |   2345 | 2017-08-02 01:58:29 |
+-----+--------+--------+---------------------+

mysql中示例表的表结构如下:

create table sample (id int not null primary key, policy int, salary int, updated_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP);

我正在尝试通过如下创建 sqoop 作业将其导入 hdfs

sqoop job --create incjob -- import --connect jdbc:mysql://localhost/retail_db --username root -P --table sample --merge-key id --split-by id --target-dir /user/cloudera --append --incremental lastmodified --check-column updated_time -m 1

下面是执行sqoop job后在hdfs中的输出记录

$ hadoop fs -cat /user/cloudera/par*
100,1,4567,2017-08-02 01:58:28.0
200,2,3456,2017-08-02 01:58:29.0
300,3,2345,2017-08-02 01:58:29.0

场景 2:在示例表中插入少量新记录并更新现有记录后。下面是示例表。

select * from sample;
+-----+--------+--------+---------------------+
| id  | policy | salary | updated_time        |
+-----+--------+--------+---------------------+
| 100 |      6 |   5638 | 2017-08-02 02:01:09 |
| 200 |      2 |   7654 | 2017-08-02 02:01:10 |
| 300 |      3 |   2345 | 2017-08-02 01:58:29 |
| 400 |      4 |   1234 | 2017-08-02 02:01:17 |
| 500 |      5 |   6543 | 2017-08-02 02:01:18 |
+-----+--------+--------+---------------------+

运行相同的 sqoop 作业后,下面是 hdfs 中的记录。

hadoop fs -cat /user/cloudera/par*
100,1,4567,2017-08-02 01:58:28.0
200,2,3456,2017-08-02 01:58:29.0
300,3,2345,2017-08-02 01:58:29.0
100,6,5638,2017-08-02 02:01:09.0
200,2,7654,2017-08-02 02:01:10.0
400,4,1234,2017-08-02 02:01:17.0
500,5,6543,2017-08-02 02:01:18.0

这里将mysql中更新的记录作为新记录插入到hdfs中,而不是更新hdfs中已有的记录。我在我的 sqoop 作业 conf 中同时使用了 --merge-key 和 --append。任何人都可以帮助我解决这个问题。

最佳答案

您正在使用 --merge-key --appendlastmodified 一起使用。这也不正确。

  • --incremental append 模式将数据附加到 HDFS 中的现有数据集。在导入不断添加新行并增加行 ID 值的表时,您应该指定追加模式

  • --incremental lastmodified 模式 - 当源表的行可能被更新时,你应该使用它,并且每次这样的更新都会将最后修改的列的值设置为当前时间戳。

  • --merge-key - 合并工具运行一个 MapReduce 作业,该作业将两个目录作为输入:一个较新的数据集和一个较旧的数据集。这些分别用 --new-data 和 --onto 指定。 MapReduce 作业的输出将放置在 --target-dir 指定的 HDFS 目录中。

  • --last-value (value) 指定上一次导入的检查列的最大值。如果你从命令行运行 sqoop,没有 Sqoop 作业,那么你必须添加 --last-value 参数

在您的情况下,有一些新记录并且一些记录也已更新,因此您需要使用 lastmodified 模式。

您的 Sqoop 命令将是:

sqoop 作业 --create incjob -- import --connect jdbc:mysql://localhost/retail_db --username root -P --table sample --merge-key id --target-dir/user/cloudera --incremental lastmodified --check-column updated_time -m 1

因为您只指定了一个映射器,所以不需要 --split-by

关于mysql - Sqoop 增量加载 lastmodified 不适用于更新的记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45457423/

有关mysql - Sqoop 增量加载 lastmodified 不适用于更新的记录的更多相关文章

  1. ruby-on-rails - 如何验证 update_all 是否实际在 Rails 中更新 - 2

    给定这段代码defcreate@upgrades=User.update_all(["role=?","upgraded"],:id=>params[:upgrade])redirect_toadmin_upgrades_path,:notice=>"Successfullyupgradeduser."end我如何在该操作中实际验证它们是否已保存或未重定向到适当的页面和消息? 最佳答案 在Rails3中,update_all不返回任何有意义的信息,除了已更新的记录数(这可能取决于您的DBMS是否返回该信息)。http://ar.ru

  2. ruby-on-rails - Rails 常用字符串(用于通知和错误信息等) - 2

    大约一年前,我决定确保每个包含非唯一文本的Flash通知都将从模块中的方法中获取文本。我这样做的最初原因是为了避免一遍又一遍地输入相同的字符串。如果我想更改措辞,我可以在一个地方轻松完成,而且一遍又一遍地重复同一件事而出现拼写错误的可能性也会降低。我最终得到的是这样的:moduleMessagesdefformat_error_messages(errors)errors.map{|attribute,message|"Error:#{attribute.to_s.titleize}#{message}."}enddeferror_message_could_not_find(obje

  3. ruby - 如何在续集中重新加载表模式? - 2

    鉴于我有以下迁移:Sequel.migrationdoupdoalter_table:usersdoadd_column:is_admin,:default=>falseend#SequelrunsaDESCRIBEtablestatement,whenthemodelisloaded.#Atthispoint,itdoesnotknowthatusershaveais_adminflag.#Soitfails.@user=User.find(:email=>"admin@fancy-startup.example")@user.is_admin=true@user.save!ende

  4. ruby-on-rails - 使用 rails 4 设计而不更新用户 - 2

    我将应用程序升级到Rails4,一切正常。我可以登录并转到我的编辑页面。也更新了观点。使用标准View时,用户会更新。但是当我添加例如字段:name时,它​​不会在表单中更新。使用devise3.1.1和gem'protected_attributes'我需要在设备或数据库上运行某种更新命令吗?我也搜索过这个地方,找到了许多不同的解决方案,但没有一个会更新我的用户字段。我没有添加任何自定义字段。 最佳答案 如果您想允许额外的参数,您可以在ApplicationController中使用beforefilter,因为Rails4将参数

  5. ruby - Sinatra:运行 rspec 测试时记录噪音 - 2

    Sinatra新手;我正在运行一些rspec测试,但在日志中收到了一堆不需要的噪音。如何消除日志中过多的噪音?我仔细检查了环境是否设置为:test,这意味着记录器级别应设置为WARN而不是DEBUG。spec_helper:require"./app"require"sinatra"require"rspec"require"rack/test"require"database_cleaner"require"factory_girl"set:environment,:testFactoryGirl.definition_file_paths=%w{./factories./test/

  6. ruby - RuntimeError(自动加载常量 Apps 多线程时检测到循环依赖 - 2

    我收到这个错误:RuntimeError(自动加载常量Apps时检测到循环依赖当我使用多线程时。下面是我的代码。为什么会这样?我尝试多线程的原因是因为我正在编写一个HTML抓取应用程序。对Nokogiri::HTML(open())的调用是一个同步阻塞调用,需要1秒才能返回,我有100,000多个页面要访问,所以我试图运行多个线程来解决这个问题。有更好的方法吗?classToolsController0)app.website=array.join(',')putsapp.websiteelseapp.website="NONE"endapp.saveapps=Apps.order("

  7. Ruby Sinatra 配置用于生产和开发 - 2

    我已经在Sinatra上创建了应用程序,它代表了一个简单的API。我想在生产和开发上进行部署。我想在部署时选择,是开发还是生产,一些方法的逻辑应该改变,这取决于部署类型。是否有任何想法,如何完成以及解决此问题的一些示例。例子:我有代码get'/api/test'doreturn"Itisdev"end但是在部署到生产环境之后我想在运行/api/test之后看到ItisPROD如何实现? 最佳答案 根据SinatraDocumentation:EnvironmentscanbesetthroughtheRACK_ENVenvironm

  8. ruby-on-rails - Rails 5 Active Record 记录无效错误 - 2

    我有两个Rails模型,即Invoice和Invoice_details。一个Invoice_details属于Invoice,一个Invoice有多个Invoice_details。我无法使用accepts_nested_attributes_forinInvoice通过Invoice模型保存Invoice_details。我收到以下错误:(0.2ms)BEGIN(0.2ms)ROLLBACKCompleted422UnprocessableEntityin25ms(ActiveRecord:4.0ms)ActiveRecord::RecordInvalid(Validationfa

  9. ruby-on-rails - 使用 config.threadsafe 时从 lib/加载模块/类的正确方法是什么!选项? - 2

    我一直致力于让我们的Rails2.3.8应用程序在JRuby下正确运行。一切正常,直到我启用config.threadsafe!以实现JRuby提供的并发性。这导致lib/中的模块和类不再自动加载。使用config.threadsafe!启用:$rubyscript/runner-eproduction'pSim::Sim200Provisioner'/Users/amchale/.rvm/gems/jruby-1.5.1@web-services/gems/activesupport-2.3.8/lib/active_support/dependencies.rb:105:in`co

  10. ruby - inverse_of 是否适用于 has_many? - 2

    当我使用has_one时,它​​工作得很好,但在has_many上却不行。在这里您可以看到object_id不同,因为它运行了另一个SQL来再次获取它。ruby-1.9.2-p290:001>e=Employee.create(name:'rafael',active:false)ruby-1.9.2-p290:002>b=Badge.create(number:1,employee:e)ruby-1.9.2-p290:003>a=Address.create(street:"123MarketSt",city:"SanDiego",employee:e)ruby-1.9.2-p290

随机推荐