草庐IT

使用canal实现mysql数据同步到elasticsearch的功能

HUWD 2023-09-04 原文

使用canal实现mysql数据同步到elasticsearch的功能.

需要实现一个场景, 优化搜索体验, 我们使用elasticsearch, 那该如何监听mysql数据修改时,也将elasticsearch的数据进行修改呢?

这里介绍一个组件 canal. 功能如下图:

正如图所示, canl伪装成一个mysql的从节点,去订阅主节点的binlog日志. canal可以将数据同步给mysql,kafka,elasticsearch,hbase,rocketmq,pulsar.

所以我们使用canal

工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

github地址 https://github.com/alibaba/canal/wiki

准备开始

我们所有中间件全部使用docker进行部署

  1. 安装mysql, 我这里使用的是mysql8.0

    docker run -p 3306:3306 --restart=always --name mysql-8.0 \
    -v /usr/local/docker/mysql/log:/var/log/mysql \
    -v /usr/local/docker/mysql/data:/var/lib/mysql \
    -v /usr/local/docker/mysql/conf/conf.d/my.cnf:/etc/my.cnf \
    -e MYSQL_ROOT_PASSWORD=root \
    -d mysql:8.0
    

    my.cnf文件

    [mysqld]
    user=mysql
    character-set-server=utf8
    default_authentication_plugin=mysql_native_password
    secure_file_priv=/var/lib/mysql
    expire_logs_days=7
    sql_mode=STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION
    max_connections=1000
    # 注意设置主节点的ID ,不能和从节点重复,8.0 默认是开启binlog的, 5.7版本的需要手动开启
    server-id=1
    [client]
    default-character-set=utf8
    [mysql]
    default-character-set=utf8
    
  2. 安装elasticsearch, 使用的是elasticsearch:7.2.0

    docker  run \
    --name elasticsearch -d \
    --privileged=true \
    --restart=always \
    -p 9200:9200 \
    -p 9300:9300 \
    -e discovery.type=single-node \
    -e TZ=Asia/Shanghai \
    -e LANG=en_US.UTF-8 \
    -v /usr/local/docker/elasticsearch/data:/usr/share/elasticsearch/data \
    -v /usr/local/docker/elasticsearch/plugins:/usr/share/elasticsearch/plugins \
    -v /usr/local/docker/elasticsearch/config:/usr/share/elasticsearch/config \
    -v /usr/local/docker/elasticsearch/logs:/usr/share/elasticsearch/logs \
    elasticsearch:7.2.0
    
    • /usr/local/docker/elasticsearch/config目录下文件

      可以先随便创建一个elasticsearch容器, 使用命令 docker cp [容器索引]:[内部路径] [外部路径]将上述脚本中的几个目录拷贝出来.

    • 修改jvm.options文件. 可以指定jvm的运行内存大小, 服务器太小可以调整.

    • 修改elasticsearch.yml

      cluster.name: "wdhcr-es" #节点名称
      network.host: 0.0.0.0
      
      node.name: node-1
      http.port: 9200
      http.cors.enabled: true
      http.cors.allow-origin: "*"
      node.master: true
      node.data: true
      
    • 安装ik分词器

      # 方式一
      docker exec -it 759e1521e66e /bin/bash # 进入容器
      bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.2.0/elasticsearch-analysis-ik-7.2.0.zip  #安装对应版本的分词器
      
      
      # 方式二
      # 1. 下载分词器
      wget https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.2.0/elasticsearch-analysis-ik-7.2.0.zip #在docker安装es的设置的plugins目录中下载
      # 2. 创建ik文件夹
      mkdir ik
      # 3.解压到ik文件夹中
      unzip elasticsearch-analysis-ik-7.2.0.zip -d ik
      # 4. 重启docker
      # 5. docker logs es容器id ,查询关键字analysis-ik
      
  3. 安装canal

    • 首先拉取canal

      docker pull canal/canal-server
      
    • 启动canal

      docker run --name canal -d canal/canal-server
      
    • 将容器内部配置文件拷贝到外部 docker cp [容器索引]:[内部路径] [外部路径]

      docker cp canal:/home/admin/canal-server/conf/canal.properties /usr/local/docker/canal/conf/canal.properties
      
      docker cp canal:/home/admin/canal-server/conf/example/instance.properties /usr/local/docker/canal/conf/instance.properties
      # 拷贝启动脚本
      docker cp canal:/home/admin/canal-server/bin /usr/local/docker/canal/bin
      
    • 拷贝/home/admin/canal-server/bin目录是为了调整启动的jvm内存大小. startup.sh

      #!/bin/bash 
      
      current_path=`pwd`
      case "`uname`" in
          Linux)
      		bin_abs_path=$(readlink -f $(dirname $0))
      		;;
      	*)
      		bin_abs_path=`cd $(dirname $0); pwd`
      		;;
      esac
      base=${bin_abs_path}/..
      canal_conf=$base/conf/canal.properties
      canal_local_conf=$base/conf/canal_local.properties
      logback_configurationFile=$base/conf/logback.xml
      export LANG=en_US.UTF-8
      export BASE=$base
      
      if [ -f $base/bin/canal.pid ] ; then
      	echo "found canal.pid , Please run stop.sh first ,then startup.sh" 2>&2
          exit 1
      fi
      
      if [ ! -d $base/logs/canal ] ; then 
      	mkdir -p $base/logs/canal
      fi
      
      ## set java path
      if [ -z "$JAVA" ] ; then
        JAVA=$(which java)
      fi
      
      ALIBABA_JAVA="/usr/alibaba/java/bin/java"
      TAOBAO_JAVA="/opt/taobao/java/bin/java"
      if [ -z "$JAVA" ]; then
        if [ -f $ALIBABA_JAVA ] ; then
        	JAVA=$ALIBABA_JAVA
        elif [ -f $TAOBAO_JAVA ] ; then
        	JAVA=$TAOBAO_JAVA
        else
        	echo "Cannot find a Java JDK. Please set either set JAVA or put java (>=1.5) in your PATH." 2>&2
          exit 1
        fi
      fi
      
      case "$#" 
      in
      0 ) 
      	;;
      1 )	
      	var=$*
      	if [ "$var" = "local" ]; then
      		canal_conf=$canal_local_conf
      	else
      		if [ -f $var ] ; then 
      			canal_conf=$var
      		else
      			echo "THE PARAMETER IS NOT CORRECT.PLEASE CHECK AGAIN."
      			exit
      		fi
      	fi;;
      2 )	
      	var=$1
      	if [ "$var" = "local" ]; then
      		canal_conf=$canal_local_conf
      	else
      		if [ -f $var ] ; then
      			canal_conf=$var
      		else 
      			if [ "$1" = "debug" ]; then
      				DEBUG_PORT=$2
      				DEBUG_SUSPEND="n"
      				JAVA_DEBUG_OPT="-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=$DEBUG_PORT,server=y,suspend=$DEBUG_SUSPEND"
      			fi
      		fi
           fi;;
      * )
      	echo "THE PARAMETERS MUST BE TWO OR LESS.PLEASE CHECK AGAIN."
      	exit;;
      esac
      
      JavaVersion=`$JAVA -version 2>&1 |awk 'NR==1{ gsub(/"/,""); print $3 }' | awk  -F '.' '{print $1}'`
      str=`file -L $JAVA | grep 64-bit`
      JAVA_OPTS="$JAVA_OPTS -Xss256k -XX:+AggressiveOpts -XX:-UseBiasedLocking -XX:-OmitStackTraceInFastThrow -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=$base/logs"
      
      if [ $JavaVersion -ge 11 ] ; then
        #JAVA_OPTS="$JAVA_OPTS -Xlog:gc*:$base_log/gc.log:time "
        JAVA_OPTS="$JAVA_OPTS"
      else
        #JAVA_OPTS="$JAVA_OPTS -Xloggc:$base/logs/canal/gc.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime"
        JAVA_OPTS="$JAVA_OPTS -XX:+UseFastAccessorMethods -XX:+PrintAdaptiveSizePolicy -XX:+PrintTenuringDistribution"
      fi
      
      if [ -n "$str" ]; then
        if [ $JavaVersion -ge 11 ] ; then
          # For G1
          JAVA_OPTS="-server -Xms256m -Xmx256m -XX:+UseG1GC -XX:MaxGCPauseMillis=250 -XX:+UseGCOverheadLimit -XX:+ExplicitGCInvokesConcurrent $JAVA_OPTS"
        else
      	  JAVA_OPTS="-server -Xms256m -Xmx256m -Xmn1g -XX:SurvivorRatio=2 -XX:PermSize=96m -XX:MaxPermSize=256m -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC $JAVA_OPTS"
      	fi
      else
      	JAVA_OPTS="-server -Xms256m -Xmx256m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:MaxPermSize=128m $JAVA_OPTS"
      fi
      
      JAVA_OPTS=" $JAVA_OPTS -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dfile.encoding=UTF-8"
      CANAL_OPTS="-DappName=otter-canal -Dlogback.configurationFile=$logback_configurationFile -Dcanal.conf=$canal_conf"
      
      if [ -e $canal_conf -a -e $logback_configurationFile ]
      then 
      	
      	for i in $base/lib/*;
      		do CLASSPATH=$i:"$CLASSPATH";
      	done
       	CLASSPATH="$base/conf:$CLASSPATH";
       	
       	echo "cd to $bin_abs_path for workaround relative path"
        	cd $bin_abs_path
       	
      	echo LOG CONFIGURATION : $logback_configurationFile
      	echo canal conf : $canal_conf 
      	echo CLASSPATH :$CLASSPATH
      	$JAVA $JAVA_OPTS $JAVA_DEBUG_OPT $CANAL_OPTS -classpath .:$CLASSPATH com.alibaba.otter.canal.deployer.CanalLauncher 1>>$base/logs/canal/canal_stdout.log 2>&1 &
      	echo $! > $base/bin/canal.pid 
      	
      	echo "cd to $current_path for continue"
        	cd $current_path
      else 
      	echo "canal conf("$canal_conf") OR log configration file($logback_configurationFile) is not exist,please create then first!"
      fi
      
    • 关闭容器

      docker stop canal
      
    • 移除容器

      docker rm canal
      
    • 修改 instance.properties文件

      #################################################
      ## mysql serverId , v1.0.26+ will autoGen
      # 修改从节点id
      canal.instance.mysql.slaveId=10
      
      # enable gtid use true/false
      canal.instance.gtidon=false
      
      # position info
      # 修改主节点连接地址
      canal.instance.master.address=mysql:3306 # --link mysql:mysql
      canal.instance.master.journal.name=
      canal.instance.master.position=
      canal.instance.master.timestamp=
      canal.instance.master.gtid=
      
      # rds oss binlog
      canal.instance.rds.accesskey=
      canal.instance.rds.secretkey=
      canal.instance.rds.instanceId=
      
      # table meta tsdb info
      canal.instance.tsdb.enable=true
      #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
      #canal.instance.tsdb.dbUsername=canal
      #canal.instance.tsdb.dbPassword=canal
      
      #canal.instance.standby.address =
      #canal.instance.standby.journal.name =
      #canal.instance.standby.position =
      #canal.instance.standby.timestamp =
      #canal.instance.standby.gtid=
      
      
      # 修改主节点的数据库账号密码
      canal.instance.dbUsername=root # username/password
      canal.instance.dbPassword=root
      canal.instance.connectionCharset = UTF-8
      # enable druid Decrypt database password
      canal.instance.enableDruid=false
      #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
      
      # table regex
      canal.instance.filter.regex=.*\\..*
      # table black regex
      canal.instance.filter.black.regex=mysql\\.slave_.*
      # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
      #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
      # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
      #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
      
      # mq config
      canal.mq.topic=example
      # dynamic topic route by schema or table regex
      #canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
      canal.mq.partition=0
      # hash partition config
      #canal.mq.enableDynamicQueuePartition=false
      #canal.mq.partitionsNum=3
      #canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
      #canal.mq.partitionHash=test.table:id^name,.*\\..*
      #################################################
      
    • 启动命令

      docker run --name canal -p 11111:11111 -d \
      --link mysql-8.0:mysql \
      -v /usr/local/docker/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties \
      -v /usr/local/docker/canal/conf/canal.properties:/home/admin/canal-server/conf/canal.properties \
      -v /usr/local/docker/canal/bin:/home/admin/canal-server/bin \
      canal/canal-server
      
  4. 安装canal-adapter实现自动化接受canal推送,并将数据同步进elasticsearch中

    • 首先拉取canal-adapter

      docker pull slpcat/canal-adapter:v1.1.5
      
    • 启动canal

      docker run --name canal-adapter -p 8081:8081 -d slpcat/canal-adapter:v1.1.5
      
    • 将容器内部配置文件拷贝到外部 docker cp [容器索引]:[内部路径] [外部路径]

      docker cp canal-adapter:/opt/canal-adapter/bin /usr/local/docker/canal-adapter/bin
      docker cp canal-adapter:/opt/canal-adapter/conf /usr/local/docker/canal-adapter/conf
      
    • 拷贝/opt/canal-adapter/bin目录是为了调整启动的jvm内存大小. startup.sh, 否则他默认的jvm启动内存是2GB.可能导致无法启动.

      #!/bin/bash
      
      current_path=`pwd`
      case "`uname`" in
          Linux)
      		bin_abs_path=$(readlink -f $(dirname $0))
      		;;
      	*)
      		bin_abs_path=`cd $(dirname $0); pwd`
      		;;
      esac
      base=${bin_abs_path}/..
      export LANG=en_US.UTF-8
      export BASE=$base
      
      if [ -f $base/bin/adapter.pid ] ; then
      	echo "found adapter.pid , Please run stop.sh first ,then startup.sh" 2>&2
          exit 1
      fi
      
      if [ ! -d $base/logs ] ; then
      	mkdir -p $base/logs
      fi
      
      ## set java path
      if [ -z "$JAVA" ] ; then
        JAVA=$(which java)
      fi
      
      ALIBABA_JAVA="/usr/alibaba/java/bin/java"
      TAOBAO_JAVA="/opt/taobao/java/bin/java"
      if [ -z "$JAVA" ]; then
        if [ -f $ALIBABA_JAVA ] ; then
        	JAVA=$ALIBABA_JAVA
        elif [ -f $TAOBAO_JAVA ] ; then
        	JAVA=$TAOBAO_JAVA
        else
        	echo "Cannot find a Java JDK. Please set either set JAVA or put java (>=1.5) in your PATH." 2>&2
          exit 1
        fi
      fi
      
      case "$#"
      in
      0 )
        ;;
      2 )
        if [ "$1" = "debug" ]; then
          DEBUG_PORT=$2
          DEBUG_SUSPEND="n"
          JAVA_DEBUG_OPT="-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=$DEBUG_PORT,server=y,suspend=$DEBUG_SUSPEND"
        fi
        ;;
      * )
        echo "THE PARAMETERS MUST BE TWO OR LESS.PLEASE CHECK AGAIN."
        exit;;
      esac
      
      str=`file -L $JAVA | grep 64-bit`
      if [ -n "$str" ]; then
      	JAVA_OPTS="-server -Xms128m -Xmx128m -Xmn128m -XX:SurvivorRatio=2 -Xss256k -XX:+DisableExplicitGC -XX:+HeapDumpOnOutOfMemoryError"
      else
      	JAVA_OPTS="-server -Xms128m -Xmx128m -XX:NewSize=128m -XX:MaxNewSize=128m -XX:MaxPermSize=128m "
      fi
      
      JAVA_OPTS=" $JAVA_OPTS -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dfile.encoding=UTF-8"
      ADAPTER_OPTS="-DappName=canal-adapter"
      
      for i in $base/lib/*;
          do CLASSPATH=$i:"$CLASSPATH";
      done
      
      CLASSPATH="$base/conf:$CLASSPATH";
      
      echo "cd to $bin_abs_path for workaround relative path"
      cd $bin_abs_path
      
      echo CLASSPATH :$CLASSPATH
      exec $JAVA $JAVA_OPTS $JAVA_DEBUG_OPT $ADAPTER_OPTS -classpath .:$CLASSPATH com.alibaba.otter.canal.adapter.launcher.CanalAdapterApplication
      
    • 关闭容器

      docker stop slpcat/canal-adapter:v1.1.5
      
    • 移除容器

      docker rm slpcat/canal-adapter:v1.1.5
      
    • 修改conf目录下的application.yml配置文件

      server:
        port: 8081
      spring:
        jackson:
          date-format: yyyy-MM-dd HH:mm:ss
          time-zone: GMT+8
          default-property-inclusion: non_null
      
      canal.conf:
        mode: tcp #tcp kafka rocketMQ rabbitMQ
        flatMessage: true
        zookeeperHosts:
        syncBatchSize: 1000
        retries: 0
        timeout:
        accessKey:
        secretKey:
        consumerProperties:
          # canal tcp consumer
          canal.tcp.server.host: canal:11111   # --link canal:canal
          canal.tcp.zookeeper.hosts:
          canal.tcp.batch.size: 500
          canal.tcp.username:
          canal.tcp.password:
          # kafka consumer
          kafka.bootstrap.servers: 127.0.0.1:9092
          kafka.enable.auto.commit: false
          kafka.auto.commit.interval.ms: 1000
          kafka.auto.offset.reset: latest
          kafka.request.timeout.ms: 40000
          kafka.session.timeout.ms: 30000
          kafka.isolation.level: read_committed
          kafka.max.poll.records: 1000
          # rocketMQ consumer
          rocketmq.namespace:
          rocketmq.namesrv.addr: 127.0.0.1:9876
          rocketmq.batch.size: 1000
          rocketmq.enable.message.trace: false
          rocketmq.customized.trace.topic:
          rocketmq.access.channel:
          rocketmq.subscribe.filter:
          # rabbitMQ consumer
          rabbitmq.host:
          rabbitmq.virtual.host:
          rabbitmq.username:
          rabbitmq.password:
          rabbitmq.resource.ownerId:
      
      # 修改数据库连接信息
        srcDataSources:
          defaultDS:
            url: jdbc:mysql://mysql:7280/canal-test?useUnicode=true # --link mysql-8.0:mysql
            username: root
            password: root
        canalAdapters:
        - instance: example # canal instance Name or mq topic name
          groups:
          - groupId: g1
            outerAdapters:
            - name: logger
      #      - name: rdb
      #        key: mysql1
      #        properties:
      #          jdbc.driverClassName: com.mysql.jdbc.Driver
      #          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
      #          jdbc.username: root
      #          jdbc.password: 121212
      #      - name: rdb
      #        key: oracle1
      #        properties:
      #          jdbc.driverClassName: oracle.jdbc.OracleDriver
      #          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
      #          jdbc.username: mytest
      #          jdbc.password: m121212
      #      - name: rdb
      #        key: postgres1
      #        properties:
      #          jdbc.driverClassName: org.postgresql.Driver
      #          jdbc.url: jdbc:postgresql://localhost:5432/postgres
      #          jdbc.username: postgres
      #          jdbc.password: 121212
      #          threads: 1
      #          commitSize: 3000
      #      - name: hbase
      #        properties:
      #          hbase.zookeeper.quorum: 127.0.0.1
      #          hbase.zookeeper.property.clientPort: 2181
      #          zookeeper.znode.parent: /hbase
      # 修改es的配置信息
            - name: es7 # 我们安装的是es7
              hosts: elasticsearch:9200 # --link elasticsearch:elasticsearch 并且开放是9200端口
              properties:
                mode: rest
                # security.auth: test:123456 #  only used for rest mode
                cluster.name: wdhcr-es # 这是对应的是上方es的配置文件elasticsearch.yml中的节点名称
      #        - name: kudu
      #          key: kudu
      #          properties:
      #            kudu.master.address: 127.0.0.1 # ',' split multi address
      
      
    • 修改conf/es7文件夹, 新建一个wdhr-collect.yml. 名字随意

      dataSourceKey: defaultDS
      destination: example
      groupId: g1
      esMapping:
        _index: wdhcr_collect #es索引名称
        _id: _id #es的id
        upsert: true 
      # _type:_doc 
      #  pk: id
        sql: "select
              w.id as _id,
              w.title as title,
              w.note as note,
              w.url as url,
              w.create_time as createTime,
              w.is_public as isPublic,
              wu.user_name as userName,
              wu.avatar_url as userAvatar,
              wu.id as userId 
      FROM
              wdhcr_note w
              LEFT JOIN wdhcr_user wu ON w.user_id = wu.id"  ## 一定要注意如果涉及到关联表,则on左右的字段都必须在查询字段中,否则Load canal adapter: es7 failed java.lang.RuntimeException: com.alibaba.fastsql.sql.parser.ParserException
        commitBatch: 3000
      
    • 开始安装slpcat/canal-adapter:v1.1.5, 命令

      docker run --name canal-adapter -p 8081:8081 -d \
      --link mysql-8.0:mysql \
      --link canal:canal \
      --link elasticsearch:elasticsearch \
      -v /usr/local/docker/canal-adapter/bin:/opt/canal-adapter/bin \
      -v /usr/local/docker/canal-adapter/conf:/opt/canal-adapter/conf \
      slpcat/canal-adapter:v1.1.5
      
  5. 这里注意, 由于我们使用了**–link容器互联** ,并且我们没有使用docker-compose. 所以我们对于容器的启动顺序是有要求的.

  6. 使用到的sql文件

    SET NAMES utf8mb4;
    SET FOREIGN_KEY_CHECKS = 0;
    
    -- ----------------------------
    -- Table structure for wdhcr_note
    -- ----------------------------
    DROP TABLE IF EXISTS `wdhcr_note`;
    CREATE TABLE `wdhcr_note` (
      `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键',
      `create_time` datetime DEFAULT NULL COMMENT '创建时间',
      `note` varchar(255) DEFAULT NULL COMMENT '记录',
      `is_public` char(2) DEFAULT NULL COMMENT '是否公共(0:私有; 1:公共)',
      `user_id` bigint DEFAULT NULL COMMENT '用户ID',
      `title` varchar(255) DEFAULT NULL COMMENT '标题',
      `url` varchar(255) DEFAULT NULL COMMENT '笔记url',
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='笔记记录表';
    
    -- ----------------------------
    -- Table structure for wdhcr_user
    -- ----------------------------
    DROP TABLE IF EXISTS `wdhcr_user`;
    CREATE TABLE `wdhcr_user` (
      `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键',
      `create_time` datetime DEFAULT NULL COMMENT '创建时间',
      `open_id` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT 'open_id',
      `avatar_url` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '用户头像',
      `user_name` varchar(255) DEFAULT NULL COMMENT '用户名称',
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='笔记记录表';
    
    SET FOREIGN_KEY_CHECKS = 1;
    
    

演示环节

有关使用canal实现mysql数据同步到elasticsearch的功能的更多相关文章

  1. ruby - 如何使用 Nokogiri 的 xpath 和 at_xpath 方法 - 2

    我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div

  2. ruby - 使用 RubyZip 生成 ZIP 文件时设置压缩级别 - 2

    我有一个Ruby程序,它使用rubyzip压缩XML文件的目录树。gem。我的问题是文件开始变得很重,我想提高压缩级别,因为压缩时间不是问题。我在rubyzipdocumentation中找不到一种为创建的ZIP文件指定压缩级别的方法。有人知道如何更改此设置吗?是否有另一个允许指定压缩级别的Ruby库? 最佳答案 这是我通过查看ruby​​zip内部创建的代码。level=Zlib::BEST_COMPRESSIONZip::ZipOutputStream.open(zip_file)do|zip|Dir.glob("**/*")d

  3. ruby - 为什么我可以在 Ruby 中使用 Object#send 访问私有(private)/ protected 方法? - 2

    类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc

  4. ruby-on-rails - 使用 Ruby on Rails 进行自动化测试 - 最佳实践 - 2

    很好奇,就使用ruby​​onrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提

  5. ruby - 在 Ruby 中使用匿名模块 - 2

    假设我做了一个模块如下:m=Module.newdoclassCendend三个问题:除了对m的引用之外,还有什么方法可以访问C和m中的其他内容?我可以在创建匿名模块后为其命名吗(就像我输入“module...”一样)?如何在使用完匿名模块后将其删除,使其定义的常量不再存在? 最佳答案 三个答案:是的,使用ObjectSpace.此代码使c引用你的类(class)C不引用m:c=nilObjectSpace.each_object{|obj|c=objif(Class===objandobj.name=~/::C$/)}当然这取决于

  6. ruby - 使用 ruby​​ 和 savon 的 SOAP 服务 - 2

    我正在尝试使用ruby​​和Savon来使用网络服务。测试服务为http://www.webservicex.net/WS/WSDetails.aspx?WSID=9&CATID=2require'rubygems'require'savon'client=Savon::Client.new"http://www.webservicex.net/stockquote.asmx?WSDL"client.get_quotedo|soap|soap.body={:symbol=>"AAPL"}end返回SOAP异常。检查soap信封,在我看来soap请求没有正确的命名空间。任何人都可以建议我

  7. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

  8. ruby-on-rails - 'compass watch' 是如何工作的/它是如何与 rails 一起使用的 - 2

    我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t

  9. ruby - 使用 ruby​​ 将 HTML 转换为纯文本并维护结构/格式 - 2

    我想将html转换为纯文本。不过,我不想只删除标签,我想智能地保留尽可能多的格式。为插入换行符标签,检测段落并格式化它们等。输入非常简单,通常是格式良好的html(不是整个文档,只是一堆内容,通常没有anchor或图像)。我可以将几个正则表达式放在一起,让我达到80%,但我认为可能有一些现有的解决方案更智能。 最佳答案 首先,不要尝试为此使用正则表达式。很有可能你会想出一个脆弱/脆弱的解决方案,它会随着HTML的变化而崩溃,或者很难管理和维护。您可以使用Nokogiri快速解析HTML并提取文本:require'nokogiri'h

  10. ruby - 在 64 位 Snow Leopard 上使用 rvm、postgres 9.0、ruby 1.9.2-p136 安装 pg gem 时出现问题 - 2

    我想为Heroku构建一个Rails3应用程序。他们使用Postgres作为他们的数据库,所以我通过MacPorts安装了postgres9.0。现在我需要一个postgresgem并且共识是出于性能原因你想要pggem。但是我对我得到的错误感到非常困惑当我尝试在rvm下通过geminstall安装pg时。我已经非常明确地指定了所有postgres目录的位置可以找到但仍然无法完成安装:$envARCHFLAGS='-archx86_64'geminstallpg--\--with-pg-config=/opt/local/var/db/postgresql90/defaultdb/po

随机推荐