草庐IT

canal实时同步mysql数据到elasticsearch(部署,配置,测试)(一)

周全全 2023-07-18 原文

canal实时同步mysql数据到elasticsearch

简介


canal基于MySQL数据库增量日志解析,提供增量数据订阅和消费,是阿里开源CDC工具,它可以获取MySQL binlog数据并解析,然后将数据变动传输给下游。基于canal,可以实现从MySQL到其他数据库的实时同步

工作原理

MySQL主备复制原理

MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

canal 工作原理

canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
canal 解析 binary log 对象(原始为 byte 流)

以上来自canal的github介绍,链接:https://github.com/alibaba/canal

canal 使用流程

  1. 部署Deployer服务,该服务负责从上游拉取binlog数据、记录位点等
  2. 部署Client-Adapter服务,该服务负责对接Deployer解析过的数据,并将数据传输到目标库中。
  3. 部署完成后,canal默认会自动同步MySQL增量数据
  4. 如果需要同步MySQL全量数据,请手动调用Client-Adapter服务的方法触发同步任务。
    待全量数据同步完成后,canal会自动开始增量同步。

环境使用版本

需要注意版本对应,canal1.1.6版本需要jdk11,canal1.1.5版本支持jdk8

应用版本
mysql8.0.28
elasticsearch7.9.2
canal1.1.5
jdk8

MySQL环境搭建

1.修改mysql配置文件

配置数据库my.cnf文件,如果是windows则配置my.ini文件

#开启二进制日志功能
log-bin=mall-mysql-bin
#设置使用的二进制日志格式(mixed,statement,row)
binlog_format=row

配置完成重启mysql实例

# 重启mysql命令
systemctl restart mysqld

登录mysql后校验是否开启

# 登录mysql,需要密码输入
mysql -uroot -p
# 查看日志开启的sql
show variables like '%log_bin%'; 
mysql> show variables like '%log_bin%';
+---------------------------------+-------------------------------------+
| Variable_name                   | Value                               |
+---------------------------------+-------------------------------------+
| log_bin                         | ON                                  |
| log_bin_basename                | /var/lib/mysql/mall-mysql-bin       |
| log_bin_index                   | /var/lib/mysql/mall-mysql-bin.index |
| log_bin_trust_function_creators | OFF                                 |
| log_bin_use_v1_row_events       | OFF                                 |
| sql_log_bin                     | ON                                  |
+---------------------------------+-------------------------------------+
6 rows in set (0.12 sec)

查看是否为row模式

mysql> show variables like '%binlog_format%';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW   |
+---------------+-------+
1 row in set (0.02 sec)

2.创建并赋权从库账号

创建从库权限账号canal,用于订阅binlog

#创建用户,密码自己填写,由于创建用户时默认的密码加密方式为caching_sha2_password,所以修改为mysql_native_password,否则服务端启动时可能会报错
create user 'canal'@'%' identified with mysql_native_password by 'Password@123';

# 给新创建账户赋予从库权限
 grant select, replication slave, replication client on *.* to 'canal'@'%';
 
# 刷新权限
flush privileges;

3.创建测试数据库

 CREATE DATABASE IF NOT EXISTS canal default charset utf8 COLLATE utf8_general_ci;

创建测试数据表

CREATE TABLE `test_book` (
  `id` bigint unsigned NOT NULL AUTO_INCREMENT,
  `title` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '题名',
  `isbn` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT 'isbn',
  `author` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '作者',
  `publisher_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '出版社名',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB  DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC

ES环境搭建

1.创建索引

2.建立映射

{
  "properties": {
    "id": {
      "type": "long"
    },
    "title": {
      "type": "text"
    },
    "isbn": {
      "type": "text"
    },
    "author": {
      "type": "text"
    },
    "publisherName": {
      "type": "text"
    }
  }
}

canal的下载部署

下载canal

下载地址:https://github.com/alibaba/canal/releases

下载解压到服务器指定目录,并给文件夹及其子文件赋权

chmod -R 755 /home/canal-1.1.5/

配置服务端 canal-deployer

canal-deployer伪装成mysql的从库,监听binlog接收数据,目录结构如下:

1.修改配置/conf/canal.properties

#canal的server地址:127.0.0.1,除了ip和port外,其他配置可不改动
canal.ip =127.0.0.1
#canal端口,用于客户端监听
canal.port = 11111

2.修改配置/conf/example/instance.properties

#被同步的mysql地址
canal.instance.master.address=127.0.0.1:3306
#数据库从库权限账号
canal.instance.dbUsername=canal
#数据库从库权限账号的密码
canal.instance.dbPassword=Password@123
#数据库连接编码 
canal.instance.connectionCharset = UTF-8 
#需要订阅binlog的表过滤正则表达式
#canal.instance.filter.regex=.*\\..*
#我们只监听数据同步表
canal.instance.filter.regex=cxstar_oa.data_sync_es
#这里与文件夹名保持一致,后面会用到
canal.mq.topic=example

3.启动canal-deployer

进入bin目录,执行启动命令:

./startup.sh

查看日志:/logs/canal/canal.log

2023-02-02 15:28:16.016 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2023-02-02 15:28:16.043 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2023-02-02 15:28:16.054 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2023-02-02 15:28:16.112 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[127.0.0.1(127.0.0.1):11111]
2023-02-02 15:28:17.824 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......

查看日志:/logs/canal/canal.log

2023-02-02 15:28:17.590 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2023-02-02 15:28:17.619 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
2023-02-02 15:28:17.619 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
2023-02-02 15:28:17.757 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2023-02-02 15:28:17.776 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2023-02-02 15:28:17.776 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status
2023-02-02 15:28:18.382 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mall-mysql-bin.000008,position=12380,serverId=101,gtid=,timestamp=1675309792000] cost : 610ms , the next step is binlog dump

日志如上就已经成功启动

可能的问题: caching_sha2_password Auth failed
原因:
使用mysql版本为8.0,而创建用户时默认的密码加密方式为caching_sha2_password,所以修改为mysql_native_password

ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY '密码'; #更新一下用户密码
FLUSH PRIVILEGES; #刷新权限

配置客户端canal-adapter

canal-adapter:作为canal的客户端,会从canal-server中获取数据,然后同步数据到MySQL、Elasticsearch等存储中去。目录结构如下:

1.替换client-adapter.es7的jar文件

下载v1.1.5-alpha-2,解压后找到plugin目录下的
client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar



增加执行权限

chmod -R 755 /home/canal-1.1.5/canal.adapter-1.1.5/plugin

关于为何这么做的解释,如果不替换,在启动的时候会报错:
java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource

2.修改配置/conf/application.yml

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null
    
canal.conf:
  mode: tcp # 客户端的模式,可选tcp kafka rocketMQ
  flatMessage: true # 扁平message开关, 是否以json字符串形式投递数据, 仅在kafka/rocketMQ模式下有效
  zookeeperHosts:    # 对应集群模式下的zk地址
  syncBatchSize: 1000 # 每次同步的批数量
  retries: 0 # 重试次数, -1为无限重试
  timeout: # 同步超时时间, 单位毫秒
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111 #设置canal-server的地址
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
 
  srcDataSources: # 源数据库配置
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/canal?useUnicode=true&useSSL=true #测试数据库连接
      username: root #数据库账号
      password: Cxstar@2014 #数据库密码
  canalAdapters: # 适配器列表
  - instance: example # canal实例名或者MQ topic名
    groups: # 分组列表
    - groupId: g1 # 分组id, 如果是MQ模式将用到该值
      outerAdapters:
      - name: logger # 日志打印适配器
      - name: es7 # ES同步适配器
        hosts: 192.168.0.182:9200 # ES连接地址
        properties:
          mode: rest # 模式可选transport(9300) 或者 rest(9200)
          #security.auth: elastic:123456 #  连接es的用户和密码,仅rest模式使用
          cluster.name: elasticsearch # ES集群名称, 与es目录下 elasticsearch.yml文件cluster.name对应

3.增加mysql同步es的映射文件

进入/conf/es7目录下,复制mytest_user.yml命名为test_book.yml,同时修改:

dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值
destination: example  # canal的instance或者MQ的topic
groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:
  _index: test_book # es 的索引名称
  _id: _id  # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配
  sql: "SELECT
         tb.id AS _id,
         tb.title,
         tb.isbn,
         tb.author,
         tb.publisher_name as publisherName
        FROM
         test_book tb"        # sql映射
  etlCondition: "where p.id>={}"   #etl的条件参数
  commitBatch: 3000   # 提交批大小

4.启动canal-adapter

启动canal-adapter,进入bin目录,执行启动命令:

./startup.sh

日志如下即表示启动成功

2023-04-02 13:22:52.337 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## syncSwitch refreshed.
2023-04-02 13:22:52.337 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## start the canal client adapters.
2023-04-02 13:22:52.338 [main] INFO  c.a.otter.canal.client.adapter.support.ExtensionLoader - extension classpath dir: /home/canal-1.1.5/canal.adapter-1.1.5/plugin
2023-04-02 13:22:52.372 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: logger succeed
2023-04-02 13:22:52.679 [main] INFO  c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## Start loading es mapping config ... 
2023-04-02 13:22:52.751 [main] INFO  c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## ES mapping config loaded
2023-04-02 13:22:52.951 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es7 succeed
2023-04-02 13:22:52.960 [main] INFO  c.alibaba.otter.canal.connector.core.spi.ExtensionLoader - extension classpath dir: /home/canal-1.1.5/canal.adapter-1.1.5/plugin
2023-04-02 13:22:52.982 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Start adapter for canal-client mq topic: cxstar_oa-g1 succeed
2023-04-02 13:22:52.983 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## the canal client adapters are running now ......
2023-04-02 13:22:52.983 [Thread-4] INFO  c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Start to connect destination: cxstar_oa <=============
2023-04-02 13:22:52.990 [main] INFO  org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8081"]
2023-04-02 13:22:52.991 [main] INFO  org.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
2023-04-02 13:22:53.013 [main] INFO  o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8081 (http) with context path ''
2023-04-02 13:22:53.028 [main] INFO  c.a.otter.canal.adapter.launcher.CanalAdapterApplication - Started CanalAdapterApplication in 5.259 seconds (JVM running for 6.378)
2023-04-02 13:22:53.081 [Thread-4] INFO  c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Subscribe destination: cxstar_oa succeed <=============


5.canal-adapter启动可能的报错问题

1.com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

[main] ERROR com.alibaba.druid.pool.DruidDataSource - init datasource error, url: jdbc:mysql://127.0.0.1:3306/canal?useUnicode=true&useSSL=true
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
The last packet successfully received from the server was 216 milliseconds ago.  The last packet sent successfully to the server was 210 milliseconds ago.

解决方法:/conf/application.yml 中的mysql连接去除&useSSL=true

2.com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource

ERROR c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es7 failed
java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource
	at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:54) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]

原因:druid 包冲突
解决方法:
方法1.下载源码包 ,修改client-adapter/escore/pom.xml

<dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <scope>provided</scope>
 </dependency>

打包后将client-adapter/es7x/target/client-adapter.es7x-1.1.5-jar-with-dependencies.jar上传到服务器,替换adataper/plugin下的同名jar文件

方法2.下载v1.1.5-alpha-2,

找到plugin目录下的client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar
上传到服务器 canal.adapter-1.1.5/plugin目录下,同时删除client-adapter.es7x-1.1.5-jar-with-dependencies.jar

3.Load canal adapter: es7 failed,Name or service not known

 ERROR c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es7 failed
java.lang.RuntimeException: java.net.UnknownHostException: http: Name or service not known
	at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:54) ~[client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar:na]

解决方案:/conf/application.yml配置中 ,hosts不要带http://
4.java.lang.NullPointerException: esMapping._type

ERROR c.a.o.c.client.adapter.es.core.monitor.ESConfigMonitor - esMapping._type
java.lang.NullPointerException: esMapping._type
        at com.alibaba.otter.canal.client.adapter.es.core.config.ESSyncConfig.validate(ESSyncConfig.java:35) ~[client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar:na]
        at com.alibaba.otter.canal.client.adapter.es.core.monitor.ESConfigMonitor$FileListener.onFileChange(ESConfigMonitor.java:102) ~[client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar:na]
        at org.apache.commons.io.monitor.FileAlterationObserver.doMatch(FileAlterationObserver.java:400) [commons-io-2.4.jar:2.4]
        at org.apache.commons.io.monitor.FileAlterationObserver.checkAndNotify(FileAlterationObserver.java:334) [commons-io-2.4.jar:2.4]
        at org.apache.commons.io.monitor.FileAlterationObserver.checkAndNotify(FileAlterationObserver.java:304) [commons-io-2.4.jar:2.4]
        at org.apache.commons.io.monitor.FileAlterationMonitor.run(FileAlterationMonitor.java:182) [commons-io-2.4.jar:2.4]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_221]

解决方案:canal.adapter-1.1.5/conf/es7目录下的yml中增加一个官方配置的属性

hosts: 192.168.0.182:9200 # ES连接地址

验证canal-adapter是否启动成功
查看日志 canal.adapter-1.1.5/logs/adapter/adapter.log

[org.springframework.cloud.context.properties:name=configurationPropertiesRebinder,context=2b76ff4e,type=ConfigurationPropertiesRebinder]
2023-02-03 09:34:13.373 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## syncSwitch refreshed.
2023-02-03 09:34:13.374 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## start the canal client adapters.
2023-02-03 09:34:13.375 [main] INFO  c.a.otter.canal.client.adapter.support.ExtensionLoader - extension classpath dir: /home/canal/canal-test/canal.adapter-1.1.5/plugin
2023-02-03 09:34:13.418 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: logger succeed
2023-02-03 09:34:13.643 [main] INFO  c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## Start loading es mapping config ... 
2023-02-03 09:34:13.726 [main] INFO  c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## ES mapping config loaded
2023-02-03 09:34:13.995 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es7 succeed
2023-02-03 09:34:14.005 [main] INFO  c.alibaba.otter.canal.connector.core.spi.ExtensionLoader - extension classpath dir: /home/canal/canal-test/canal.adapter-1.1.5/plugin
2023-02-03 09:34:14.029 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Start adapter for canal-client mq topic: example-g1 succeed
2023-02-03 09:34:14.029 [Thread-4] INFO  c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Start to connect destination: example <=============
2023-02-03 09:34:14.029 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## the canal client adapters are running now ......
2023-02-03 09:34:14.037 [main] INFO  org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8081"]
2023-02-03 09:34:14.039 [main] INFO  org.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
2023-02-03 09:34:14.067 [main] INFO  o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8081 (http) with context path ''
2023-02-03 09:34:14.080 [main] INFO  c.a.otter.canal.adapter.launcher.CanalAdapterApplication - Started CanalAdapterApplication in 5.221 seconds (JVM running for 5.807)
2023-02-03 09:34:14.169 [Thread-4] INFO  c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Subscribe destination: example succeed <=============

同步测试

建立es索引和mysql表的映射

在客户端目录canal.adapter-1.1.5/conf/es7下配置字段的映射,adapter默认会加载es路径下的所有yml文件。一个配置文件表示一张表的mapping。
建立es和mysql的映射文件test_book.yml

dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值
destination: example  # canal的instance或者MQ的topic
groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据
esVersion: es7
esMapping:
  _index: test_book # es 的索引名称
  _id: _id  # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配
  sql: "SELECT
         b.id AS _id, 
         b.title,
         b.author,
         b.isbn,
         b.publisher_name as publisherName
        FROM
         test_book b"        # sql映射
  etlCondition: "where p.id>={}"   #etl的条件参数
  commitBatch: 5000   # 提交批大小

插入mysql数据验证同步

INSERT INTO `canal`.`test_book`( `title`, `isbn`, `author`, `publisher_name`) VALUES (  '三体', '98741254125', '刘慈欣', '工业出版社');

查看日志 canal.adapter-1.1.5/logs/adapter/adapter.log

2023-02-03 10:18:21.988 [pool-2-thread-1] INFO  c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":3,"title":"三体","isbn":"98741254125","author":"刘慈欣","publisher_name":"工业出版社"}],"database":"canal","destination":"example","es":1675390701000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"test_book","ts":1675390701977,"type":"INSERT"}
2023-02-03 10:18:22.225 [pool-2-thread-1] DEBUG c.a.o.canal.client.adapter.es.core.service.ESSyncService - DML: {"data":[{"id":3,"title":"三体","isbn":"98741254125","author":"刘慈欣","publisher_name":"工业出版社"}],"database":"canal","destination":"example","es":1675390701000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"test_book","ts":1675390701977,"type":"INSERT"} 
Affected indexes: test_book 

查看es数据,已经成功同步

数据同步

canal开启前的数据如何同步

canal-adapter提供一个REST接口可全量同步数据到ES,调用Client-Adapter服务的方法触发同步任务。此时,canal会先中止增量数据传输,然后同步全量数据。待全量数据同步完成后,canal会自动进行增量数据同步。
注意:如果数据是binlog开启前存在,则不可以使用此种方式

curl http://127.0.0.1:8081/etl/es7/test_book.yml -X POST

同步日志:

2023-02-03 10:41:35.043 [http-nio-8081-exec-1] INFO  c.a.otter.canal.client.adapter.es7x.etl.ESEtlService - start etl to import data to index: test_book
2023-02-03 10:41:35.130 [http-nio-8081-exec-1] INFO  c.a.otter.canal.client.adapter.es7x.etl.ESEtlService - 数据全量导入完成, 一共导入 3 条数据, 耗时: 85

binlog未开启前的历史数据如何同步?

因为canal是基于binlog实现全量同步的,那么未开启binlog之前的历史数据就无法被同步,将数据库中的数据导出再重新导入一遍,这样就可以生成binlog

es数组类型同步

adapter配置文件中添加配置

  objFields:
    author: array:, #代表字段以,分割

配置更新后会监听到配置改变,无需重启

2023-02-03 11:33:24.098 [Thread-3] INFO  c.a.o.c.client.adapter.es.core.monitor.ESConfigMonitor - Change a es mapping config: test_book.yml of canal adapter

更新数据,author字段

UPDATE `canal`.`test_book` SET `title` = '三体', `isbn` = '98741254125', `author` = '刘慈欣,刘电工', `publisher_name` = '工业出版社' WHERE `id` = 1;

es中的数据已改变

多张表数据同步到一个索引中

yml映射文件中,主表一定要在最左侧,从表的数据改变也会自动同步到es中
示例:journal_volume 表中的数据改变,也会自动同步到journal_paper 表对应的es索引中

SELECT  
    jp.id AS _id, 
    jp.sid AS sid, 
    jp.import_id AS importId, 
    jp.journal_id AS journalId, 
    jp.journal_volume_id AS journalVolumeId, 
    jv.`year` as year,
    jv.volume as volume,
    jv.issue as issue,
    j.publisher_name as publisherName
FROM journal_paper jp 
left join journal_volume jv on jp.journal_volume_id=jv.id
left join journal j on j.id=jp.journal_id

可参考,待亲自实现
https://blog.csdn.net/qq_24950043/article/details/122643889

有关canal实时同步mysql数据到elasticsearch(部署,配置,测试)(一)的更多相关文章

  1. ruby-on-rails - 使用 Ruby on Rails 进行自动化测试 - 最佳实践 - 2

    很好奇,就使用ruby​​onrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提

  2. ruby - 解析 RDFa、微数据等的最佳方式是什么,使用统一的模式/词汇(例如 schema.org)存储和显示信息 - 2

    我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i

  3. ruby - 使用 C 扩展开发 ruby​​gem 时,如何使用 Rspec 在本地进行测试? - 2

    我正在编写一个包含C扩展的gem。通常当我写一个gem时,我会遵循TDD的过程,我会写一个失败的规范,然后处理代码直到它通过,等等......在“ext/mygem/mygem.c”中我的C扩展和在gemspec的“扩展”中配置的有效extconf.rb,如何运行我的规范并仍然加载我的C扩展?当我更改C代码时,我需要采取哪些步骤来重新编译代码?这可能是个愚蠢的问题,但是从我的gem的开发源代码树中输入“bundleinstall”不会构建任何native扩展。当我手动运行rubyext/mygem/extconf.rb时,我确实得到了一个Makefile(在整个项目的根目录中),然后当

  4. ruby - Ruby 的 Hash 在比较键时使用哪种相等性测试? - 2

    我有一个围绕一些对象的包装类,我想将这些对象用作散列中的键。包装对象和解包装对象应映射到相同的键。一个简单的例子是这样的:classAattr_reader:xdefinitialize(inner)@inner=innerenddefx;@inner.x;enddef==(other)@inner.x==other.xendenda=A.new(o)#oisjustanyobjectthatallowso.xb=A.new(o)h={a=>5}ph[a]#5ph[b]#nil,shouldbe5ph[o]#nil,shouldbe5我试过==、===、eq?并散列所有无济于事。

  5. ruby - RSpec - 使用测试替身作为 block 参数 - 2

    我有一些Ruby代码,如下所示:Something.createdo|x|x.foo=barend我想编写一个测试,它使用double代替block参数x,这样我就可以调用:x_double.should_receive(:foo).with("whatever").这可能吗? 最佳答案 specify'something'dox=doublex.should_receive(:foo=).with("whatever")Something.should_receive(:create).and_yield(x)#callthere

  6. ruby-on-rails - 独立 ruby​​ 脚本的配置文件 - 2

    我有一个在Linux服务器上运行的ruby​​脚本。它不使用rails或任何东西。它基本上是一个命令行ruby​​脚本,可以像这样传递参数:./ruby_script.rbarg1arg2如何将参数抽象到配置文件(例如yaml文件或其他文件)中?您能否举例说明如何做到这一点?提前谢谢你。 最佳答案 首先,您可以运行一个写入YAML配置文件的独立脚本:require"yaml"File.write("path_to_yaml_file",[arg1,arg2].to_yaml)然后,在您的应用中阅读它:require"yaml"arg

  7. ruby - Sinatra:运行 rspec 测试时记录噪音 - 2

    Sinatra新手;我正在运行一些rspec测试,但在日志中收到了一堆不需要的噪音。如何消除日志中过多的噪音?我仔细检查了环境是否设置为:test,这意味着记录器级别应设置为WARN而不是DEBUG。spec_helper:require"./app"require"sinatra"require"rspec"require"rack/test"require"database_cleaner"require"factory_girl"set:environment,:testFactoryGirl.definition_file_paths=%w{./factories./test/

  8. ruby-on-rails - 迷你测试错误 : "NameError: uninitialized constant" - 2

    我遵循MichaelHartl的“RubyonRails教程:学习Web开发”,并创建了检查用户名和电子邮件长度有效性的测试(名称最多50个字符,电子邮件最多255个字符)。test/helpers/application_helper_test.rb的内容是:require'test_helper'classApplicationHelperTest在运行bundleexecraketest时,所有测试都通过了,但我看到以下消息在最后被标记为错误:ERROR["test_full_title_helper",ApplicationHelperTest,1.820016791]test

  9. Ruby Sinatra 配置用于生产和开发 - 2

    我已经在Sinatra上创建了应用程序,它代表了一个简单的API。我想在生产和开发上进行部署。我想在部署时选择,是开发还是生产,一些方法的逻辑应该改变,这取决于部署类型。是否有任何想法,如何完成以及解决此问题的一些示例。例子:我有代码get'/api/test'doreturn"Itisdev"end但是在部署到生产环境之后我想在运行/api/test之后看到ItisPROD如何实现? 最佳答案 根据SinatraDocumentation:EnvironmentscanbesetthroughtheRACK_ENVenvironm

  10. ruby - 即使失败也继续进行多主机测试 - 2

    我已经构建了一些serverspec代码来在多个主机上运行一组测试。问题是当任何测试失败时,测试会在当前主机停止。即使测试失败,我也希望它继续在所有主机上运行。Rakefile:namespace:specdotask:all=>hosts.map{|h|'spec:'+h.split('.')[0]}hosts.eachdo|host|begindesc"Runserverspecto#{host}"RSpec::Core::RakeTask.new(host)do|t|ENV['TARGET_HOST']=hostt.pattern="spec/cfengine3/*_spec.r

随机推荐