草庐IT

ES增量同步方案

涂有 2023-04-20 原文

1 基于业务代码嵌入式的增量同步方式

在Java业务代码要修改业务数据的地方,增加调用写入ES数据的方法

优点:1、实现方式简单,可控粒度高;2、不依赖第三方数据同步框架;3、数据库不用做特殊配置和部署;

缺点:1:所有修改数据的地方都要添加同步ES逻辑,新增接口或者修改代码要同时关注ES的同步逻辑,否则可能导致数据库和ES的数据不一致;

2:如果直接提sql修数据,会造成数据库和ES数据不一致;

3:要自己处理同步失败的重试问题

2 基于数据库离线日志的增量同步方式

通过监控oracle的redo log日志和在线日志的方式,把日志文件还原成sql,把sql发送到MQ中间件,Java消费MQ数据把数据的变更同步到ES,同步流程参考下图。

优化:1、对数据库性能损耗最小;2、对业务代码侵入性最低;3、不容易造成数据库和ES数据不一致情况

缺点:1、解析日志方式较为复杂,只能采用第三方开源框架,引入第三方开源框架会增加技术学习成本和运维成本,并且其稳定性不敢保证;

2、需要公司DBA团队配合,需要Oracle开启日志记录等配置,需要开DBA系统账号,另外可能还需要开启Oracle的ASM实例

基于数据日志的增量同步开源框架O2K:https://hub.docker.com/r/woqutech/o2k

3 基于数据库触发器的增量同步方式

通过数据库的触发器监控数据的增加、修改、删除,然后把新增、修改、删除的数据备份一条数据到变更记录表里,通过Java定时器定时同步方式的把变更的数据同步到ES中,具体同步流程参数下图。

优点:1、不会造成数据库和ES数据不一致情况;2、业务代码侵入性低;3、技术实现简单、便捷,就算不依赖第三方框架也能轻松实现(当然也有基于触发器实现的数据同步开源框架,比如:SymmetricDS)

缺点:触发器对insert、update、delete有一定性能损耗

基于触发器实现的开源框架,SymmetricDS文档参考:https://www.symmetricds.org/doc/3.14/html/user-guide.html#_kafka

4 基于数据库通知的增量同步方式

启动应用程序,通过OracleConnection.registerDatabaseChangeNotification(Properties prop).addListener(DatabaseChangeListener listener)的方式接收oracle数据变更通知

优点:1、同步延时低;2、代码侵入性低;

缺点:1、通知特性是Oracle的实验特性,并不稳定,有些版本并不支持;

2、要自己处理数据同步失败的缓冲问题,前面的方案中的MQ和变更记录表就是中间缓冲

基于Oracle通知实现的增量同步开源框架DBSyncer,参考文档:https://gitee.com/ghi/dbsyncer?_from=gitee_search#%E4%BB%8B%E7%BB%8D

import oracle.jdbc.OracleConnection;

import oracle.jdbc.OracleStatement;

import oracle.jdbc.dcn.*;

import oracle.jdbc.pool.OracleDataSource;

import java.sql.ResultSet;

import java.sql.SQLException;

import java.util.Arrays;

import java.util.Properties;

/**

* @author tuyou

* @date 2023/1/11 17:08

*/

public class MyTest {

public static void main(String[] args) throws SQLException {

String tableName = "EXPENSE_GENERAL_ORDER";

OracleDataSource dataSource = new OracleDataSource();

dataSource.setUser("xxx");

dataSource.setPassword("xxx");

dataSource.setURL("jdbc:oracle:thin:@ip:1521/xxx");

final OracleConnection conn = (OracleConnection) dataSource.getConnection();

Properties prop = new Properties();

// prop.setProperty(OracleConnection.DCN_QUERY_CHANGE_NOTIFICATION, "true");

// prop.setProperty(OracleConnection.DCN_NOTIFY_CHANGELAG, "1");

prop.setProperty(OracleConnection.DCN_NOTIFY_ROWIDS, "true");

prop.setProperty(OracleConnection.NTF_QOS_PURGE_ON_NTFN, "true");

prop.setProperty(OracleConnection.NTF_TIMEOUT, "0");

final DatabaseChangeRegistration databaseChangeRegistration = conn.registerDatabaseChangeNotification(prop);

databaseChangeRegistration.addListener(new DatabaseChangeListener() {

@Override

public void onDatabaseChangeNotification(DatabaseChangeEvent databaseChangeEvent) {

long regId = databaseChangeEvent.getRegId();

System.out.println("change notify: " + Arrays.toString(databaseChangeEvent.getTableChangeDescription()));

if (regId == databaseChangeRegistration.getRegId()) {

TableChangeDescription[] tds = databaseChangeEvent.getTableChangeDescription();

System.out.println("'TableChangeDescription change count:" + tds.length);

for (TableChangeDescription td : tds) {

System.out.println("table id: " + td.getObjectNumber());

System.out.println("table name: " + td.getTableName());

RowChangeDescription[] rds = td.getRowChangeDescription();

for (RowChangeDescription rd : rds) {

System.out.println("row id: " + rd.getRowid().stringValue());

System.out.println("row change count: " + rd.getRowOperation().toString());

}

}

}

});

OracleStatement statement = (OracleStatement) conn.createStatement();

statement.setDatabaseChangeRegistration(databaseChangeRegistration);

ResultSet resultSet = statement.executeQuery("select * from " + tableName + " where 1=2");

statement.close();

System.err.println("started");

}

}

5 基于扫描表更新时间的增量同步方式

在数据库表更新时自动更新表的更新时间,然后通过定时任务扫描更新时间大于最后同步时间的数据,拿到更新的数据

优点:1、实现相对简单;2、可以在从库上做扫描,不会影响主库压力;

缺点:1、oracle还是要建立触发器才能自动更新时间,也会存在触发器的缺点;2、对于删除的数据无法根据更新时间发现,只能用ES的数据和数据库的数据做全量对比才能发现删除数据

6 基于Oracle的闪回查询方式的增量同步方式

oracle的闪回查询可以查询最近一段时间的数据的增删改记录,利用这种特性也可以做增量同步

优点:1、实现原理相对简单;2、不用采用触发器

缺点:1、闪回查询有时间保留限制,如果应用由于停机原因可能造成部分数据变动没有同步;2、闪回查询数据量和实际表数据量相关,如果表数据量较大,查询会非常慢

闪回查询sql参考:

(SELECT F_DJBH FROM BF_BIZ_INFO AS OF TIMESTAMP to_timestamp('2023-02-02 10:30:00', 'yyyy-mm-dd hh24:mi:ss')

minus

SELECT F_DJBH FROM BF_BIZ_INFO)

union

(SELECT F_DJBH FROM BF_BIZ_INFO

minus

SELECT F_DJBH FROM BF_BIZ_INFO AS OF TIMESTAMP to_timestamp('2023-02-02 10:30:00', 'yyyy-mm-dd hh24:mi:ss'))

7 基于Oracle的SCN机制方式的增量同步方式

同步思想5类似,但是更新时间是基于oracle的SCN机制,利用oracle的隐藏的ora_rowscn列来发现更新的数据,参考sql:select ora_rowscn, F_DJBH from BF_BIZ_INFO where ora_rowscn >= '169639743457'

优点:1、实现相对简单;2、可以在从库上做扫描,不会影响主库压力;

缺点:1、对于删除的数据无法根据更新时间发现,只能用ES的数据和数据库的数据做全量对比才能发现删除数据或者建立一个delete触发器

8 其他ETL开源平台

DataX:采用写sql的扫描数据,不适合做线上数据同步,只适合离线数据同步。官方文档:https://github.com/alibaba/DataX/blob/master/introduction.md

同步案例: https://blog.csdn.net/weixin_42418589/article/details/126019261

Tapdata:开源版本并不支持Oracle,可以支持Mysql

最终选择

基于对我们系统现状的分析,最终确定的方案选择是基于Oracle的SCN机制 + 触发器来实现增量同步数据到ES,不依赖开源框架,基于Oracle的SCN机制实现insert/update数据的增量同步,基于delete触发器实现delete数据的增量同步。具体同步逻辑如下图。

有关ES增量同步方案的更多相关文章

  1. ruby - 在 jRuby 中使用 'fork' 生成进程的替代方案? - 2

    在MRIRuby中我可以这样做:deftransferinternal_server=self.init_serverpid=forkdointernal_server.runend#Maketheserverprocessrunindependently.Process.detach(pid)internal_client=self.init_client#Dootherstuffwithconnectingtointernal_server...internal_client.post('somedata')ensure#KillserverProcess.kill('KILL',

  2. 使用canal同步MySQL数据到ES - 2

    文章目录一、概述简介原理模块二、配置Mysql使用版本环境要求1.操作系统2.mysql要求三、配置canal-server离线下载在线下载上传解压修改配置单机配置集群配置分库分表配置1.修改全局配置2.实例配置垂直分库水平分库3.修改group-instance.xml4.启动监听四、配置canal-adapter1修改启动配置2配置映射文件3启动ES数据同步查询所有订阅同步数据同步开关启动4.验证五、配置canal-admin一、概述简介canal是Alibaba旗下的一款开源项目,Java开发。基于数据库增量日志解析,提供增量数据订阅&消费。Git地址:https://github.co

  3. ES基础入门 - 2

    ES一、简介1、ElasticStackES技术栈:ElasticSearch:存数据+搜索;QL;Kibana:Web可视化平台,分析。LogStash:日志收集,Log4j:产生日志;log.info(xxx)。。。。使用场景:metrics:指标监控…2、基本概念Index(索引)动词:保存(插入)名词:类似MySQL数据库,给数据Type(类型)已废弃,以前类似MySQL的表现在用索引对数据分类Document(文档)真正要保存的一个JSON数据{name:"tcx"}二、入门实战{"name":"DESKTOP-1TSVGKG","cluster_name":"elasticsear

  4. ruby-on-rails - 有没有一种工具可以在编码时自动保存对文件的增量更改? - 2

    我最喜欢的Google文档功能之一是它会在我工作时不断自动保存我的文档版本。这意味着即使我在进行关键更改之前忘记在某个点进行保存,也很有可能会自动创建一个保存点。至少,我可以将文档恢复到错误更改之前的状态,并从该点继续工作。对于在MacOS(或UNIX)上运行的Ruby编码器,是否有具有等效功能的工具?例如,一个工具会每隔几分钟自动将Gitcheckin我的本地存储库以获取我正在处理的文件。也许我有点偏执,但这点小保险可以让我在日常工作中安心。 最佳答案 虚拟机有些人可能讨厌我对此的回应,但我在编码时经常使用VIM,它具有自动保存功

  5. Ruby 守护进程和 JRuby - 备选方案 - 2

    我有一个应用程序正在从Ruby迁移到JRuby(由于需要通过Java提供更好的Web服务安全支持)。我使用的gem之一是daemons创建后台作业。问题在于它使用fork+exec来创建后台进程,但这对JRuby来说是禁忌。那么-是否有用于创建后台作业的替代gem/wrapper?我目前的想法是只从shell脚本调用rake并让rake任务永远运行......提前致谢,克里斯。更新我们目前正在使用几个与Java线程相关的包装器,即https://github.com/jmettraux/rufus-scheduler和https://github.com/philostler/acts

  6. ruby-on-rails - 用一系列时间增量填充选择,加上其他选项 - 2

    使用RubyonRails,我使用给定的增量(例如每30分钟)用时间填充“选择”。目前我正在YAML文件中写出所有的可能性,但我觉得有一种更巧妙的方法。我想我想提供一个开始时间、一个结束时间、一个增量,并且目前只提供一个名为“关闭”的选项(想想“business_hours”)。所以,我的选择可能会显示:'Closed'5:00am5:30am6:00am...[allthewayto]...11:30pm谁能想出更好的方法,或者只是将它们全部“拼写”出来的最佳方法? 最佳答案 此答案基于@emh的答案。defcreate_hour

  7. ruby-on-rails - 能够处理 rar/tar/zip/7z 的 Ruby/rubyzip 替代方案? - 2

    关闭。这个问题不符合StackOverflowguidelines.它目前不接受答案。要求我们推荐或查找工具、库或最喜欢的场外资源的问题对于StackOverflow来说是偏离主题的,因为它们往往会吸引自以为是的答案和垃圾邮件。相反,describetheproblem以及迄今为止为解决该问题所做的工作。关闭9年前。Improvethisquestion我想知道是否有人知道Ruby的ruby​​zip替代品,它可以处理各种格式,特别是zip/rar/7z?我知道libarchive,但它对我的目的来说并不完整(它是一个很好的gem)。(澄清一下,libarchive-对我不起作用-因为

  8. ruby-on-rails - 对于 Ruby 应用程序,是否有比 Sanitize 更好的替代方案? - 2

    我爱Sanitize.这是一个了不起的实用程序。我遇到的唯一问题是,它需要永远准备一个开发环境,因为它使用Nokogiri,这对编译时间来说是一种痛苦。是否有任何程序可以在不使用Nokogiri的情况下执行Sanitize的操作(如果没有别的,只是温和地执行它的操作)?这将以指数方式提供帮助! 最佳答案 Rails有自己的SanitizeHelper。根据http://api.rubyonrails.org/classes/ActionView/Helpers/SanitizeHelper.html,它将Thissanitizehe

  9. ruby-on-rails - 本地 yaml key 的 i18n 同步 - 2

    类似的问题,但对于java,Keepingi18nresourcessynced如何保持i18nyamllocals的key同步?即,当将key添加到en.yml时,如何将它们添加到nb.yml或ru.yml?如果我在my_title:"atitle"旁边添加键my_label:"sometextinenglish"我想把它给我的其他本地人我指定,因为我不能做所有的翻译,它应该回到其他语言的英语例如en.ymlsomegroup:my_tile:"atitleinenglish"my_label:"sometextinenglish"othergroup:...我想发出命令,将整个键和

  10. ruby-on-rails - rails3 中 cron 作业的解决方案 - 2

    我尝试每天在我的Rails应用程序中自动记录一些数据。我想知道是否有人知道一个好的解决方案?我找到了https://github.com/javan/whenever,但我想确保在选择之前了解所有选项。谢谢!艾略特 最佳答案 我真的很喜欢whenever-这是一个很棒的Gem,我已经在生产中使用了它。关于它还有一个很好的Railscasts插曲:http://railscasts.com/episodes/164-cron-in-ruby 关于ruby-on-rails-rails3中c

随机推荐