草庐IT

spring boot +springboot集成es7.9.1+canal同步到es

GideonYeung 2023-05-01 原文

spring boot +springboot集成es7.9.1+canal同步到es


未经许可,请勿转载。

前言

  1. 其实大部分的代码是来源于参考资料来源主要代码实现,我只是在他的基础上增加自定义注解,自定义分词器等。需要看详细源码的可以去看主要代码实现,结合我的来使用。
  2. 有人会问为什么需要自定义注解,因为elasticsearch7.6 索引将去除type 没有类型的概念了。所以我自己自定义数据类型,有需要的可以自己拓展自己需要的类型。
  3. 我这里主要写的是代码实现,没有涉及到中间件的搭建,因为真的没有时间,哈哈。

参考资料来源

主要实现代码:https://gitee.com/gz-yami/mall4cloud?_from=gitee_search
自定义注解:https://cloud.tencent.com/developer/article/1911164
自定义分词器:https://blog.csdn.net/m0_57302315/article/details/121103241
Canal胶水层:https://gitee.com/throwableDoge/canal-glue

rocketmq

  1. mq maven依赖
            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-spring-boot-starter</artifactId>
                <version>2.2.0</version>
            </dependency>
  1. mq 适配器
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

/**
 * @author gideon
 */
@Configuration
@RequiredArgsConstructor
public class RocketMqAdapter {

    private final RocketMQMessageConverter rocketMqMessageConverter;

    @Value("${rocketmq.name-server:}")
    private String nameServer;

    public RocketMQTemplate getTemplateByTopicName(String topic){
        RocketMQTemplate mqTemplate = new RocketMQTemplate();
        DefaultMQProducer producer = new DefaultMQProducer(topic);
        producer.setNamesrvAddr(nameServer);
        producer.setRetryTimesWhenSendFailed(2);
        producer.setSendMsgTimeout((int) RocketMqConstant.TIMEOUT);
        mqTemplate.setProducer(producer);
        mqTemplate.setMessageConverter(rocketMqMessageConverter.getMessageConverter());
        return mqTemplate;
    }

}
  1. mq的一些常量信息RocketMqConstant
/**
 * nameserver用;分割
 * 同步消息,如果两次
 */
public class RocketMqConstant {

    // 延迟消息 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h (1-18)

    /**
     * 自动收货时间,实际上7天
     */
    public static final int ORDER_AUTO_RECEIPT_DELAY_LEVEL = 60 * 24 * 7;

    /**
     * 默认发送消息超时时间
     */
    public static final long TIMEOUT = 3000;


    /**
     * 订单取消退款
     */
    public static final String ORDER_REFUND_TOPIC = "order-refund-topic";

    /**
     * 订单自动收货
     */
    public static final String AUTO_RECEIPT_TOPIC = "auto-receipt-topic";

    /**
     * 服务订单订单支付成功
     */
    public static final String ORDER_NOTIFY_TOPIC = "order-notify-topic";

    /**
     * canal-topic
     */
    public static final String CANAL_TOPIC = "canal-topic";

}

  1. mq的配置类
import com.onecode.dtg.basic.RocketMqAdapter;
import com.onecode.dtg.basic.RocketMqConstant;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;

/**
 * @author gideon
 */
@Configuration
@RequiredArgsConstructor
public class RocketMqConfig {

    private final RocketMqAdapter rocketMqAdapter;

    @Lazy
    @Bean(destroyMethod = "destroy")
    public RocketMQTemplate autoReceiptTemplate() {
        return rocketMqAdapter.getTemplateByTopicName(RocketMqConstant.AUTO_RECEIPT_TOPIC);
    }
}

  1. mq 的配置文件信息
rocketmq:
  name-server: 127.0.0.1:9876

elasticsearch

elasticsearch的搭建我在这里就不不多bb了,你们自行百度,下面是资料。

  1. maven所需依赖
 	</properties>
        <elasticsearch.version>7.9.1</elasticsearch.version>
    </properties>
      <dependencies>
            <dependency>
                <groupId>org.elasticsearch.client</groupId>
                <artifactId>elasticsearch-rest-high-level-client</artifactId>
                <version>${elasticsearch.version}</version>
            </dependency>
            <dependency>
                <groupId>org.elasticsearch</groupId>
                <artifactId>elasticsearch</artifactId>
                <version>${elasticsearch.version}</version>
            </dependency>
            <dependency>
                <groupId>org.elasticsearch.client</groupId>
                <artifactId>elasticsearch-rest-client</artifactId>
                <version>${elasticsearch.version}</version>
                <exclusions>
                    <exclusion>
                        <artifactId>commons-logging</artifactId>
                        <groupId>commons-logging</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
          </dependencies>
  1. 分词所需资料 elasticsearch搭建的资料,点击这里。
  2. elasticsearch的yml
# elastic的地址
elastic:
  hostname: 127.0.0.1
  port: 9200
  1. elasticsearch 启动配置类
import lombok.RequiredArgsConstructor;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author gideon
 */
@Configuration
@RequiredArgsConstructor
public class ElasticConfig {


    @Value("${elastic.hostname}")
    private String hostname;

    @Value("${elastic.port}")
    private int port;

    @Bean
    public RestHighLevelClient restHighLevelClient() {
        return new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost(hostname, port)));
    }
}
  1. elasticsearch 自定义注解,AnalyzerType在下面。
import java.lang.annotation.*;

/**
 * @author gideon
 * @date 2022/9/8
 */
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
@Documented
@Inherited
public @interface EsField {

    FieldType type() default FieldType.TEXT;

    /**
     * 指定分词器
     *
     * @return AnalyzerType
     */
    AnalyzerType analyzer() default AnalyzerType.STANDARD;
}
  1. elasticsearch 自定义AnalyzerType,因为我自己的业务需要所以我加了一个自定义分词器comma
import lombok.Getter;

/**
 * @author gideon
 * @date 2022/9/8
 */
@Getter
public enum AnalyzerType {

    /**
     * 不使用分词
     */
    NO("不使用分词"),
    /**
     * 标准分词,默认分词器
     */
    STANDARD("standard"),

    /**
     * ik_smart:会做最粗粒度的拆分;已被分出的词语将不会再次被其它词语占有
     */
    IK_SMART("ik_smart"),

    /**
     * ik_max_word :会将文本做最细粒度的拆分;尽可能多的拆分出词语
     */
    IK_MAX_WORD("ik_max_word"),

    /**
     * ik_max_word :会将文本做逗号分词
     */
    COMMA("comma"),
    ;

    private final String type;

    AnalyzerType(String type) {
        this.type = type;
    }

}
  1. elasticsearch 自定义FieldType
import lombok.Getter;

/**
 * @author gideon
 * @date 2022/9/8
 */
@Getter
public enum FieldType {
    /**
     *
     */
    TEXT("text"),

    KEYWORD("keyword"),

    INTEGER("integer"),

    DOUBLE("double"),

    DATE("date"),

    LONG("long"),

    /**
     * 单条数据
     */
    OBJECT("object"),

    /**
     * 嵌套数组
     */
    NESTED("nested"),
    ;

    FieldType(String type){
        this.type = type;
    }

    private final String type;
}
  1. elasticsearch索引名称枚举
/**
 * es当中的index
 *
 * @author gideon
 */
public enum EsIndexEnum {

    /**
     * 护理员
     */
    SERVER("server"),

    ;

    private final String value;

    public String value() {
        return value;
    }

    EsIndexEnum(String value) {
        this.value = value;
    }
}
  1. elasticsearch 创建索引代码EsIndexCreateService,CommonBizException是我自定义的异常,你们可以使用自己自定义的异常类。
import com.onecode.dtg.basic.common.enums.ResultCode;
import com.onecode.dtg.basic.common.es.annotation.EsField;
import com.onecode.dtg.basic.common.es.enums.FieldType;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.lang.reflect.Field;

/**
 * @author gideon
 * @date 2022/9/8
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class EsIndexCreateService {

    private final RestHighLevelClient restHighLevelClient;

    /**
     * 不需要逗号分词器索引
     *
     * @param indexName 索引名称
     * @param clazz     同步到es的实体类
     * @return boolean
     */
    public boolean createIndex(String indexName, Class<?> clazz) {
        return createIndex(indexName, clazz, false);
    }

    /**
     * 建立索引
     *
     * @param indexName 索引名称
     * @param comma     是否需要逗号分词器
     * @return boolean
     */
    public boolean createIndex(String indexName, Class<?> clazz, Boolean comma) {
        try {
//            判断索引是否存在
            GetIndexRequest getIndexRequest = new GetIndexRequest(indexName);
            boolean exists = restHighLevelClient.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
            if (exists) {
                return true;
            }
            CreateIndexRequest request = new CreateIndexRequest(indexName);

            if (comma) {
                XContentBuilder settingsBuilder = XContentFactory.jsonBuilder()
                        .startObject()
                        .startObject("analysis")
                        .startObject("analyzer")
                        .startObject("comma")
                        .field("type", "pattern")
//                        将分词器规则定义为按照","进行分词
                        .field("pattern", ",")
                        .endObject()
                        .endObject()
                        .endObject()
                        .endObject();
                request.settings(settingsBuilder);
            }

//            这里创建索引结构
            request.mapping(generateBuilder(clazz));
            CreateIndexResponse response = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
//            指示是否所有节点都已确认请求
            boolean acknowledged = response.isAcknowledged();
//            指示是否在超时之前为索引中的每个分片启动了必需的分片副本数
            boolean shardsAcknowledged = response.isShardsAcknowledged();
            if (acknowledged || shardsAcknowledged) {
                log.info("创建索引成功!索引名称为{}", indexName);
                return true;
            }
            return false;
        } catch (IOException e) {
            throw new CommonBizException(ResultCode.FAIL.getModel(), "创建索引:" + indexName + "失败。");
        }
    }


    /**
     * 生成es索引
     *
     * @param clazz 对于的es实体
     * @return XContentBuilder
     */
    public static XContentBuilder generateBuilder(Class<?> clazz) {
        try {
//        获取索引名称及类型
            XContentBuilder builder = XContentFactory.jsonBuilder();
            builder.startObject();
            builder.startObject("properties");
            Field[] declaredFields = clazz.getDeclaredFields();
            for (Field declaredField : declaredFields) {
                if (declaredField.isAnnotationPresent(EsField.class)) {
//                获取注解
                    EsField declaredAnnotation = declaredField.getDeclaredAnnotation(EsField.class);
//                        如果嵌套对象
                    /**
                     * {
                     *   "mappings": {
                     *     "properties": {
                     *       "region": {
                     *         "type": "keyword"
                     *       },
                     *       "manager": {
                     *         "properties": {
                     *           "age":  { "type": "integer" },
                     *           "name": {
                     *             "properties": {
                     *               "first": { "type": "text" },
                     *               "last":  { "type": "text" }
                     *             }
                     *           }
                     *         }
                     *       }
                     *     }
                     *   }
                     * }
                     */
                    if (declaredAnnotation.type() == FieldType.OBJECT) {
//                    获取当前类的对象-- Action
                        Class<?> type = declaredField.getType();
                        Field[] typeDeclaredFields = type.getDeclaredFields();
                        builder.startObject(declaredField.getName());
                        builder.startObject("properties");
//                    遍历该对象中的所有属性
                        for (Field field : typeDeclaredFields) {
                            if (field.isAnnotationPresent(EsField.class)) {
//                            获取注解
                                EsField fieldDeclaredAnnotation = field.getDeclaredAnnotation(EsField.class);
                                builder.startObject(field.getName());
                                builder.field("type", fieldDeclaredAnnotation.type().getType());
//                            keyword不需要分词
                                if (fieldDeclaredAnnotation.type() == FieldType.TEXT) {
                                    builder.field("analyzer", fieldDeclaredAnnotation.analyzer().getType());
                                }
                                builder.endObject();
                            }
                        }
                        builder.endObject();
                        builder.endObject();

                    } else {
                        builder.startObject(declaredField.getName());
                        builder.field("type", declaredAnnotation.type().getType());
//                        keyword不需要分词
                        if (declaredAnnotation.type() == FieldType.TEXT) {
                            builder.field("analyzer", declaredAnnotation.analyzer().getType());
                        }
                        builder.endObject();
                    }
                }
            }
//            对应property
            builder.endObject();
            builder.endObject();
            return builder;
        } catch (IOException e) {
            throw new CommonBizException(ResultCode.FAIL.getModel(), "创建索引失败。");
        }
    }
}

canal

因为我这里使用的是第三方的canal jar包,也就是上面说到的Canal胶水层
获取地址:https://gitee.com/gz-yami/mall4cloud/tree/master/mall4cloud-common/mall4cloud-common-core/lib
引入的maven

		<dependency>
			<groupId>cn.throwx</groupId>
			<artifactId>canal-glue-core</artifactId>
			<version>1.0</version>
			<scope>system</scope>
			<systemPath>${pom.basedir}/lib/canal-glue-core.jar</systemPath>
		</dependency>

类似于下面这样放

  1. canal的canal.properties配置文件信息,这里主要看你使用什么信息队列就配置什么。我使用的是RocketMQ,然后需要创建一个topic去监听数据库的操作日志,配置topic在rocketmq.producer.group = canal-topic
#################################################
######### 		common argument		#############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
#canal.admin.register.auto = true
#canal.admin.register.cluster =
#canal.admin.register.name =

canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = rocketMQ
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true

## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false

# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

# binlog ddl isolation
canal.instance.get.ddl.isolation = false

# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360

#################################################
######### 		destinations		#############
#################################################
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = false

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################
######### 	      MQ Properties      #############
##################################################
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=

canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local

canal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8

##################################################
######### 		     Kafka 		     #############
##################################################
kafka.bootstrap.servers = 127.0.0.1:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0

kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"

##################################################
######### 		    RocketMQ	     #############
##################################################
rocketmq.producer.group = canal-topic
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 192.168.1.46:9876
rocketmq.retry.times.when.send.failed = 3
rocketmq.vip.channel.enabled = false
rocketmq.tag =

##################################################
######### 		    RabbitMQ	     #############
##################################################
rabbitmq.host =
rabbitmq.virtual.host =
rabbitmq.exchange =
rabbitmq.username =
rabbitmq.password =
rabbitmq.deliveryMode =

  1. canal的instance.properties配置文件信息,canal.instance.filter.regex这个参数可以指定监听的数据库->表
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=192.168.1.46:3306
canal.instance.master.journal.name=mysql-binlog.000001
canal.instance.master.position=0
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=false
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=mp_biz_service.server:*,mp_biz_service.shop_service_server:*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=canal-topic
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################

  1. canal自定义处理器NdCanalBinLogEventParser
import cn.throwx.canal.gule.common.BinLogEventType;
import cn.throwx.canal.gule.common.OperationType;
import cn.throwx.canal.gule.model.CanalBinLogEvent;
import cn.throwx.canal.gule.model.CanalBinLogResult;
import cn.throwx.canal.gule.support.parser.BaseCommonEntryFunction;
import cn.throwx.canal.gule.support.parser.BasePrimaryKeyTupleFunction;
import cn.throwx.canal.gule.support.parser.CanalBinLogEventParser;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;

import java.util.*;

/** 
 * @author gideon
 */
@Slf4j
public class NdCanalBinLogEventParser implements CanalBinLogEventParser {

    @Override
    public <T> List<CanalBinLogResult<T>> parse(CanalBinLogEvent event, Class<T> klass, BasePrimaryKeyTupleFunction primaryKeyFunction, BaseCommonEntryFunction<T> commonEntryFunction) {
        BinLogEventType eventType = BinLogEventType.fromType(event.getType());
        if (Objects.equals(BinLogEventType.CREATE, eventType) || Objects.equals(BinLogEventType.ALTER, eventType)) {
            if (log.isDebugEnabled()) {
                log.debug("监听到不需要处理或者未知的binlog事件类型[{}],将忽略解析过程返回空列表,binlog事件:{}", eventType, JSON.toJSONString(event));
            }
            return Collections.emptyList();
        }

        if (BinLogEventType.UNKNOWN != eventType && BinLogEventType.QUERY != eventType) {
            if (Boolean.TRUE.equals(event.getIsDdl())) {
                CanalBinLogResult<T> entry = new CanalBinLogResult<>();
                entry.setOperationType(OperationType.DDL);
                entry.setBinLogEventType(eventType);
                entry.setDatabaseName(event.getDatabase());
                entry.setTableName(event.getTable());
                entry.setSql(event.getSql());
                return Collections.singletonList(entry);
            } else {
                Optional.ofNullable(event.getPkNames()).filter((x) -> x.size() == 1).orElseThrow(() -> new IllegalArgumentException("DML类型binlog事件主键列数量不为1"));
                String primaryKeyName = event.getPkNames().get(0);
                List<CanalBinLogResult<T>> entryList = new LinkedList<>();
                List<Map<String, String>> data = event.getData();
                List<Map<String, String>> old = event.getOld();
                int dataSize = null != data ? data.size() : 0;
                int oldSize = null != old ? old.size() : 0;
                if (dataSize > 0) {
                    for(int index = 0; index < dataSize; ++index) {
                        CanalBinLogResult<T> entry = new CanalBinLogResult<>();
                        entryList.add(entry);
                        entry.setSql(event.getSql());
                        entry.setOperationType(OperationType.DML);
                        entry.setBinLogEventType(eventType);
                        entry.setTableName(event.getTable());
                        entry.setDatabaseName(event.getDatabase());
                        Map<String, String> item = data.get(index);
                        entry.setAfterData(commonEntryFunction.apply(item));
                        Map<String, String> oldItem = null;
                        if (oldSize > 0 && index <= oldSize) {
                            oldItem = old.get(index);
                            entry.setBeforeData(commonEntryFunction.apply(oldItem));
                        }

                        entry.setPrimaryKey(primaryKeyFunction.apply(oldItem, item, primaryKeyName));
                    }
                }

                return entryList;
            }
        } else {
            if (log.isDebugEnabled()) {
                log.debug("监听到不需要处理或者未知的binlog事件类型[{}],将忽略解析过程返回空列表,binlog事件:{}", eventType, JSON.toJSONString(event));
            }

            return Collections.emptyList();
        }
    }

    private NdCanalBinLogEventParser() {
    }

    public static NdCanalBinLogEventParser of() {
        return new NdCanalBinLogEventParser();
    }
}
  1. canal自定义处理器NdCanalBinlogEventProcessorFactory
import cn.throwx.canal.gule.model.ModelTable;
import cn.throwx.canal.gule.support.processor.BaseCanalBinlogEventProcessor;
import cn.throwx.canal.gule.support.processor.CanalBinlogEventProcessorFactory;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
 * @author gideon
 */
public class NdCanalBinlogEventProcessorFactory implements CanalBinlogEventProcessorFactory {

    private final ConcurrentMap<ModelTable, List<BaseCanalBinlogEventProcessor<?>>> cache = new ConcurrentHashMap<>(16);

    @Override
    public void register(ModelTable modelTable, BaseCanalBinlogEventProcessor<?> processor) {
        synchronized(this.cache) {
            this.cache.putIfAbsent(modelTable, new LinkedList<>());
            this.cache.get(modelTable).add(processor);
        }
    }

    @Override
    public List<BaseCanalBinlogEventProcessor<?>> get(ModelTable modelTable) {
        return this.cache.get(modelTable);
    }

    private NdCanalBinlogEventProcessorFactory() {
    }

    public static NdCanalBinlogEventProcessorFactory of() {
        return new NdCanalBinlogEventProcessorFactory();
    }
}

  1. canal自定义处理器NdCanalGlue
import cn.throwx.canal.gule.CanalGlue;
import cn.throwx.canal.gule.model.CanalBinLogEvent;
import cn.throwx.canal.gule.model.ModelTable;
import cn.throwx.canal.gule.support.adapter.SourceAdapterFacade;
import cn.throwx.canal.gule.support.processor.BaseCanalBinlogEventProcessor;
import cn.throwx.canal.gule.support.processor.CanalBinlogEventProcessorFactory;

import java.util.List;

/**
 * @author gideon
 */
public class NdCanalGlue implements CanalGlue {

    private final CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory;

    @Override
    public void process(String content) {
        CanalBinLogEvent event = SourceAdapterFacade.X.adapt(CanalBinLogEvent.class, content);
        ModelTable modelTable = ModelTable.of(event.getDatabase(), event.getTable());
        List<BaseCanalBinlogEventProcessor<?>> baseCanalBinlogEventProcessors = this.canalBinlogEventProcessorFactory.get(modelTable);
        if (baseCanalBinlogEventProcessors.isEmpty()) {
            return;
        }
        baseCanalBinlogEventProcessors.forEach((processor) -> processor.process(event));
    }


    private NdCanalGlue(CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory) {
        this.canalBinlogEventProcessorFactory = canalBinlogEventProcessorFactory;
    }

    public static NdCanalGlue of(CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory) {
        return new NdCanalGlue(canalBinlogEventProcessorFactory);
    }
}
  1. canal配置类
import cn.throwx.canal.gule.CanalGlue;
import cn.throwx.canal.gule.support.parser.*;
import cn.throwx.canal.gule.support.parser.converter.CanalFieldConverterFactory;
import cn.throwx.canal.gule.support.parser.converter.InMemoryCanalFieldConverterFactory;
import cn.throwx.canal.gule.support.processor.BaseCanalBinlogEventProcessor;
import cn.throwx.canal.gule.support.processor.CanalBinlogEventProcessorFactory;
import com.onecode.middle.search.service.canal.NdCanalBinLogEventParser;
import com.onecode.middle.search.service.canal.NdCanalBinlogEventProcessorFactory;
import com.onecode.middle.search.service.canal.NdCanalGlue;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import java.util.Map;

/**
 * @author gideon
 */
@Configuration
public class CanalGlueAutoConfiguration implements SmartInitializingSingleton, BeanFactoryAware {

    private ConfigurableListableBeanFactory configurableListableBeanFactory;

    @Bean
    @ConditionalOnMissingBean
    public CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory() {
        return NdCanalBinlogEventProcessorFactory.of();
    }

    @Bean
    @ConditionalOnMissingBean
    public ModelTableMetadataManager modelTableMetadataManager(CanalFieldConverterFactory canalFieldConverterFactory) {
        return InMemoryModelTableMetadataManager.of(canalFieldConverterFactory);
    }

    @Bean
    @ConditionalOnMissingBean
    public CanalFieldConverterFactory canalFieldConverterFactory() {
        return InMemoryCanalFieldConverterFactory.of();
    }

    @Bean
    @ConditionalOnMissingBean
    public CanalBinLogEventParser canalBinLogEventParser() {
        return NdCanalBinLogEventParser.of();
    }

    @Bean
    @ConditionalOnMissingBean
    public ParseResultInterceptorManager parseResultInterceptorManager(ModelTableMetadataManager modelTableMetadataManager) {
        return InMemoryParseResultInterceptorManager.of(modelTableMetadataManager);
    }

    @Bean
    @Primary
    public CanalGlue canalGlue(CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory) {
        return NdCanalGlue.of(canalBinlogEventProcessorFactory);
    }

    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.configurableListableBeanFactory = (ConfigurableListableBeanFactory) beanFactory;
    }

    @SuppressWarnings({"rawtypes", "unchecked"})
    @Override
    public void afterSingletonsInstantiated() {
        ParseResultInterceptorManager parseResultInterceptorManager
                = configurableListableBeanFactory.getBean(ParseResultInterceptorManager.class);
        ModelTableMetadataManager modelTableMetadataManager
                = configurableListableBeanFactory.getBean(ModelTableMetadataManager.class);
        CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory
                = configurableListableBeanFactory.getBean(CanalBinlogEventProcessorFactory.class);
        CanalBinLogEventParser canalBinLogEventParser
                = configurableListableBeanFactory.getBean(CanalBinLogEventParser.class);
        Map<String, BaseParseResultInterceptor> interceptors
                = configurableListableBeanFactory.getBeansOfType(BaseParseResultInterceptor.class);
        interceptors.forEach((k, interceptor) -> parseResultInterceptorManager.registerParseResultInterceptor(interceptor));
        Map<String, BaseCanalBinlogEventProcessor> processors
                = configurableListableBeanFactory.getBeansOfType(BaseCanalBinlogEventProcessor.class);
        processors.forEach((k, processor) -> processor.init(canalBinLogEventParser, modelTableMetadataManager,
                canalBinlogEventProcessorFactory, parseResultInterceptorManager));
    }
}

  1. ServerBO canal转换的实体类,@CanalModel 的参数database是对应的数据库,table是对应数据库下的数据表,因为这个数据是需要同步到es的所以设置了类型。
import cn.throwx.canal.gule.annotation.CanalModel;
import cn.throwx.canal.gule.common.FieldNamingPolicy;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.onecode.dtg.basic.common.es.annotation.EsField;
import com.onecode.dtg.basic.common.es.enums.AnalyzerType;
import com.onecode.dtg.basic.common.es.enums.FieldType;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

import java.time.LocalDate;
import java.time.LocalDateTime;

/**
 * @author gideon
 */
@Data
@CanalModel(database = "mp_biz_service", table = "server", fieldNamingPolicy = FieldNamingPolicy.LOWER_UNDERSCORE)
public class ServerBO {

    @ApiModelProperty("id")
    @EsField(type = FieldType.LONG)
    private Long id;

    @ApiModelProperty("用户标识")
    @EsField(type = FieldType.LONG)
    private Long userId;

    @ApiModelProperty("类型")
    @EsField(type = FieldType.KEYWORD)
    private String type;

    @ApiModelProperty("姓名")
    @EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String name;

    @ApiModelProperty("值为1时是男性,值为2时是女性,值为0时是未知")
    @EsField(type = FieldType.KEYWORD)
    private String gender;

    @ApiModelProperty("出生年月日")
    @JsonFormat(pattern = "yyyy-MM-dd")
    @EsField(type = FieldType.DATE)
    private LocalDate birthday;

    @ApiModelProperty("学历")
    @EsField(type = FieldType.KEYWORD)
    private String education;

    @ApiModelProperty("从业时间")
    @JsonFormat(pattern = "yyyy-MM-dd")
    @EsField(type = FieldType.DATE)
    private LocalDate practiceDate;

    @ApiModelProperty("评级")
    @EsField(type = FieldType.KEYWORD)
    private String level;

    @ApiModelProperty("认证标签")
    @EsField(type = FieldType.TEXT)
    private String authLabel;

    @ApiModelProperty("勋章(逗号隔开)")
    @EsField(type = FieldType.TEXT)
    private String medal;

    @ApiModelProperty("服务评分")
    @EsField(type = FieldType.INTEGER)
    private Integer serviceScore;

    @ApiModelProperty("已实名认证")
    @EsField(type = FieldType.INTEGER)
    private Integer realNameAuth;

    @ApiModelProperty("身份证号")
    @EsField(type = FieldType.TEXT)
    private String idCardNo;

    @ApiModelProperty("户籍-省")
    @EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String idCardProvince;

    @ApiModelProperty("户籍-市")
    @EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String idCardCity;

    @ApiModelProperty("户籍-区")
    @EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String idCardRegion;

    @ApiModelProperty("手机号")
    @EsField(type = FieldType.TEXT)
    private String phone;

    @ApiModelProperty("现住-省")
    @EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String presentProvince;

    @ApiModelProperty("现住-市")
    @EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String presentCity;

    @ApiModelProperty("现住-区")
    @EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String presentRegion;

    @ApiModelProperty("现住-地址")
    @EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String presentAddress;

    @ApiModelProperty("头像")
    @EsField(type = FieldType.TEXT)
    private String head;

    @ApiModelProperty("使用状态(初始化:init;正常:normal,禁用:ban)")
    @EsField(type = FieldType.KEYWORD)
    private String useStatus;

    @ApiModelProperty("审核状态 (待审核:await;通过:pass,驳回:reject)")
    @EsField(type = FieldType.KEYWORD)
    private String auditStatus;

    @ApiModelProperty("驳回理由")
    @EsField(type = FieldType.TEXT)
    private String rejectReason;

    @ApiModelProperty("注册时间")
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    @EsField(type = FieldType.DATE)
    private LocalDateTime regDate;

    @ApiModelProperty("介绍-内容")
    @EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String introContent;

    @ApiModelProperty("介绍-视频")
    @EsField(type = FieldType.TEXT)
    private String introVideo;

    @ApiModelProperty("介绍-标签")
    @EsField(type = FieldType.TEXT)
    private String introLabel;

    @ApiModelProperty("商家标识")
    @EsField(type = FieldType.LONG)
    private Long merchantId;

    @ApiModelProperty("组织标识")
    @EsField(type = FieldType.LONG)
    private Long orgId;

    @ApiModelProperty("护理员申请sku,多个逗号隔开")
    @EsField(type = FieldType.TEXT, analyzer = AnalyzerType.COMMA)
    private String skuId;

    @ApiModelProperty("护理员排班数据,多个逗号隔开")
    @EsField(type = FieldType.TEXT, analyzer = AnalyzerType.COMMA)
    private String schedule;

    /**
     * 逻辑删除
     */
    @EsField(type = FieldType.INTEGER)
    private Integer del;

    /**
     * 创建人
     */
    @EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String createBy;

    /**
     * 创建时间
     */
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    @EsField(type = FieldType.DATE)
    private LocalDateTime createTime;

    /**
     * 更新者
     */
    @EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String updateBy;

    /**
     * 更新时间
     */
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    @EsField(type = FieldType.DATE)
    private LocalDateTime updateTime;

}

  1. ShopServiceServer canal转换的实体类,@CanalModel 的参数database是对应的数据库,table是对应数据库下的数据表
import cn.throwx.canal.gule.annotation.CanalModel;
import cn.throwx.canal.gule.common.FieldNamingPolicy;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

import java.time.LocalDateTime;

/**
 * @author gideon
 */
@Data
@CanalModel(database = "mp_biz_service", table = "shop_service_server", fieldNamingPolicy = FieldNamingPolicy.LOWER_UNDERSCORE)
public class ShopServiceServerBO {
    @ApiModelProperty("ID")
    private Long id;

    @ApiModelProperty("产品标识")
    private Long productId;

    @ApiModelProperty("服务者用户标识")
    private Long serverUserId;

    @ApiModelProperty("产品sku标识")
    private Long productSkuId;

    @ApiModelProperty(value = "盈利")
    private Integer profit;

    @ApiModelProperty("商家标识")
    private Long merchantId;

    @ApiModelProperty("组织标识")
    private Long orgId;

    /**
     * 逻辑删除
     */
    private Integer del;

    /**
     * 创建人
     */
    private String createBy;

    /**
     * 创建时间
     */
    private LocalDateTime createTime;

    /**
     * 更新者
     */
    private String updateBy;

    /**
     * 更新时间
     */
    private LocalDateTime updateTime;
}

  1. 至此canal的基础代码就完成。

消费MQ订阅的canal信息,进行elasticsearch的同步以及搜索

  1. 监听我们上面canal-topic订阅的消息然后进行同步数据CanalListener
import cn.throwx.canal.gule.CanalGlue;
import com.onecode.dtg.basic.RocketMqConstant;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
 * canal消费数据库操作日志mq
 *
 * @author gideon
 */
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(topic = RocketMqConstant.CANAL_TOPIC, consumerGroup = RocketMqConstant.CANAL_TOPIC)
public class CanalListener implements RocketMQListener<String> {

    private final CanalGlue canalGlue;

    @Override
    public void onMessage(String message) {
        canalGlue.process(message);
    }
}
  1. 对我们需要监听的表进行处理ServerCanalListener,这里面的hutool是一个工具类有需要的可以自行引入
<dependency>
    <groupId>cn.hutool</groupId>
    <artifactId>hutool-all</artifactId>
    <version>5.8.6</version>
</dependency>
import cn.hutool.json.JSONUtil;
import cn.throwx.canal.gule.model.CanalBinLogEvent;
import cn.throwx.canal.gule.model.CanalBinLogResult;
import cn.throwx.canal.gule.support.processor.BaseCanalBinlogEventProcessor;
import cn.throwx.canal.gule.support.processor.ExceptionHandler;
import com.onecode.dtg.basic.common.core.exception.CommonBizException;
import com.onecode.dtg.basic.common.enums.ResultCode;
import com.onecode.middle.search.service.bo.ServerBO;
import com.onecode.middle.search.service.constant.EsIndexEnum;
import com.onecode.middle.search.service.util.EsIndexCreateService;
import com.onecode.service.feign.ServerFeignClient;
import com.onecode.service.feign.bo.EsServerBO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author gideon
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class ServerCanalListener extends BaseCanalBinlogEventProcessor<ServerBO> {

    private final EsIndexCreateService esIndexCreateService;

    private final ServerFeignClient serverFeignClient;

    private final RestHighLevelClient restHighLevelClient;

    /**
     * 插入护理员,此时插入es
     */
    @Override
    protected void processInsertInternal(CanalBinLogResult<ServerBO> result) {
        Long serverId = result.getPrimaryKey();
        EsServerBO esServerBO = serverFeignClient.loadEsServerBO(serverId);
        if (esServerBO == null) {
            throw new CommonBizException(ResultCode.FAIL.getModel(), "创建索引异常");
        }

//        创建索引
        boolean index = esIndexCreateService.createIndex(EsIndexEnum.SERVER.value(), ServerBO.class, true);
        if (!index) {
            throw new CommonBizException(ResultCode.FAIL.getModel(), "创建索引:" + EsIndexEnum.SERVER.value() + "失败。");
        }

        IndexRequest request = new IndexRequest(EsIndexEnum.SERVER.value());
        request.id(String.valueOf(serverId));
        request.source(JSONUtil.toJsonStr(esServerBO), XContentType.JSON);
        try {
            IndexResponse indexResponse = restHighLevelClient.index(request, RequestOptions.DEFAULT);
            log.info(indexResponse.toString());

        } catch (IOException e) {
            log.error(e.toString());
            throw new CommonBizException(ResultCode.FAIL.getModel(), "保存es信息异常:" + e);
        }
    }

    /**
     * 更新护理员,删除护理员索引,再重新构建一个
     */
    @Override
    protected void processUpdateInternal(CanalBinLogResult<ServerBO> result) {
        Long spuId = result.getPrimaryKey();
        EsServerBO esServerBO = serverFeignClient.loadEsServerBO(spuId);
        String source = JSONUtil.toJsonStr(esServerBO);

//        创建索引
        boolean index = esIndexCreateService.createIndex(EsIndexEnum.SERVER.value(), ServerBO.class, true);
        if (!index) {
            throw new CommonBizException(ResultCode.FAIL.getModel(), "创建索引:" + EsIndexEnum.SERVER.value() + "失败。");
        }
        UpdateRequest request = new UpdateRequest(EsIndexEnum.SERVER.value(), String.valueOf(spuId));
        request.doc(source, XContentType.JSON);
        request.docAsUpsert(true);
        try {
            UpdateResponse updateResponse = restHighLevelClient.update(request, RequestOptions.DEFAULT);
            log.info(updateResponse.toString());
        } catch (IOException e) {
            log.error(e.toString());
            throw new CommonBizException(ResultCode.FAIL.getModel(), "删除es信息异常:" + e);
        }
    }

    @Override
    protected ExceptionHandler exceptionHandler() {
        return (CanalBinLogEvent event, Throwable throwable) -> {
            throw new CommonBizException(ResultCode.FAIL.getModel(), "创建索引异常:" + throwable);
        };
    }

}

  1. 这个表的监听,是因为我的业务需求,shop_service_server表增加或者删除的时候需要将skuId加到server表的skuId字段里面去,所以需要监听修改。ShopServiceServerCanalListener
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import cn.throwx.canal.gule.model.CanalBinLogResult;
import cn.throwx.canal.gule.support.processor.BaseCanalBinlogEventProcessor;
import com.onecode.dtg.basic.common.core.exception.CommonBizException;
import com.onecode.dtg.basic.common.enums.ResultCode;
import com.onecode.middle.search.service.bo.ShopServiceServerBO;
import com.onecode.middle.search.service.manager.ServerUpdateManager;
import com.onecode.service.feign.ServerFeignClient;
import com.onecode.service.feign.bo.EsServerBO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * @author gideon
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class ShopServiceServerCanalListener extends BaseCanalBinlogEventProcessor<ShopServiceServerBO> {

    private final ServerFeignClient serverFeignClient;
    private final ServerUpdateManager serverUpdateManager;

    /**
     * 新增商品服务者数据
     *
     * @param result result
     */
    @Override
    protected void processInsertInternal(CanalBinLogResult<ShopServiceServerBO> result) {
    //数据库操作后的数据
        ShopServiceServerBO afterData = result.getAfterData();
        EsServerBO loadServerBO = loadServerBO(afterData.getServerUserId());
        List<String> skuIdList = StrUtil.split(loadServerBO.getSkuId(), ",");
        skuIdList.add(afterData.getProductSkuId().toString());
        EsServerBO esServerBO = new EsServerBO();
        esServerBO.setSkuId(StrUtil.join(",", skuIdList));
        serverUpdateManager.esUpdateServerByServerId(loadServerBO.getId(), esServerBO);
    }

    /**
     * 更新商品服务者数据
     *
     * @param result result
     */
    @Override
    protected void processUpdateInternal(CanalBinLogResult<ShopServiceServerBO> result) {
    	//数据库执行操作后的数据
        ShopServiceServerBO afterData = result.getAfterData();
        //del字段是我的表是否逻辑删除的判断,大家根据自己需要去掉
        if ("1".equals(afterData.getDel())) {
            return;
        }
        //微服务项目调用接口查询数据
        EsServerBO loadEsServerBO = loadServerBO(afterData.getServerUserId());
        //处理修改后的数据
        EsServerBO esServerBO = dealWithData(afterData, loadEsServerBO);
        serverUpdateManager.esUpdateServerByServerId(loadEsServerBO.getId(), esServerBO);
    }

    /**
     * 删除商品服务者数据
     *
     * @param result result
     */
    @Override
    protected void processDeleteInternal(CanalBinLogResult<ShopServiceServerBO> result) {
    //数据库操作前的数据
        ShopServiceServerBO beforeData = result.getBeforeData();
        EsServerBO loadServerBO = loadServerBO(beforeData.getServerUserId());
        EsServerBO esServerBO = dealWithData(beforeData, loadServerBO);
        serverUpdateManager.esUpdateServerByServerId(loadServerBO.getId(), esServerBO);
    }

    /**
     * 处理数据
     *
     * @param data 数据库操作数据
     * @return EsServerBO
     */
    private EsServerBO dealWithData(ShopServiceServerBO data, EsServerBO loadEsServerBO) {

        List<String> skuIdList = StrUtil.split(loadEsServerBO.getSkuId(), ",");
        CollUtil.removeAny(skuIdList, data.getProductSkuId().toString());
        EsServerBO esServerBO = new EsServerBO();
        esServerBO.setSkuId(StrUtil.join(",", skuIdList));
        return esServerBO;

    }

    /**
     * 获取护理员书信息
     *
     * @param serverUserId 护理员用户标识
     * @return EsServerBO
     */
    private EsServerBO loadServerBO(Long serverUserId) {
        EsServerBO loadEsServerBO = serverFeignClient.loadEsServerBoByServerUserId(serverUserId);
        if (loadEsServerBO == null) {
            throw new CommonBizException(ResultCode.FAIL.getModel(),
                    "es数据同步失败:无法通过护工用户标识:" + serverUserId + "找到护理员信息。");
        }
        return loadEsServerBO;
    }
}

  1. ServerUpdateManager,这个是ShopServiceServerListener的处理实现类
import cn.hutool.json.JSONUtil;
import com.onecode.dtg.basic.common.core.exception.CommonBizException;
import com.onecode.dtg.basic.common.enums.ResultCode;
import com.onecode.middle.search.service.constant.EsIndexEnum;
import com.onecode.service.feign.bo.EsServerBO;
import lombok.RequiredArgsConstructor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.stereotype.Component;

/**
 * @author gideon
 */
@Component
@RequiredArgsConstructor
public class ServerUpdateManager {

    private final RestHighLevelClient restHighLevelClient;

    /**
     * 批量更新es中的商品信息
     *
     * @param serverId      护理员标识
     * @param esServerBO 更新的数据
     */
    public void esUpdateServerByServerId(Long serverId, EsServerBO esServerBO) {
        String source = JSONUtil.toJsonStr(esServerBO);
        try {
            BulkRequest request = new BulkRequest();
            // 准备更新的数据
            request.add(new UpdateRequest(EsIndexEnum.SERVER.value(), String.valueOf(serverId)).doc(source, XContentType.JSON));
            //更新
            BulkResponse bulkResponse = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
            if (bulkResponse.hasFailures()) {
                throw new CommonBizException(ResultCode.FAIL.getModel(), bulkResponse.buildFailureMessage());
            }
        } catch (Exception e) {
            throw new CommonBizException(ResultCode.FAIL.getModel(), e.getMessage());
        }
    }
}
  1. ServerSearchManager是搜索接口实现
import cn.hutool.json.JSONUtil;
import com.onecode.dtg.basic.common.core.exception.CommonBizException;
import com.onecode.dtg.basic.common.enums.ResultCode;
import com.onecode.dtg.basic.common.util.ColumnUtil;
import com.onecode.dtg.basic.common.util.LocalDateUtil;
import com.onecode.middle.search.service.bo.ServerBO;
import com.onecode.middle.search.service.constant.EsIndexEnum;
import com.onecode.middle.search.service.dto.ServerSearchDTO;
import com.onecode.middle.search.service.vo.EsPageVO;
import com.onecode.middle.search.service.vo.search.EsServerVO;
import com.onecode.service.feign.constant.AuditStatus;
import com.onecode.service.feign.constant.UseStatus;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.Operator;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

/**
 * @author gideon
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class ServerSearchManager {

    private final RestHighLevelClient restHighLevelClient;

    /**
     * 通过搜索信息分页搜索es数据的信息
     *
     * @param serverSearchDTO 护理员搜索条件
     * @return 搜索结果
     */
    public EsPageVO<EsServerVO> pageSearchResult(ServerSearchDTO serverSearchDTO) {
        //1、动态构建出查询需要的DSL语句
        EsPageVO<EsServerVO> result;

        //1、准备检索请求
        SearchRequest searchRequest = buildSearchRequest(serverSearchDTO);

        try {
            //2、执行检索请求
            SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);

            log.info("搜索返回结果:" + response.toString());

            //3、分析响应数据,封装成我们需要的格式
            result = buildSearchResult(serverSearchDTO, response);
        } catch (IOException e) {
            log.error(e.toString());
            throw new CommonBizException(ResultCode.FAIL.getModel(), "搜索服务出了点小差,请稍后再试:" + e);
        }
        return result;
    }

    /**
     * 构建结果数据
     */
    private EsPageVO<EsServerVO> buildSearchResult(ServerSearchDTO dto, SearchResponse response) {
        EsPageVO<EsServerVO> esPageVO = new EsPageVO<>();

        //1、返回的所有查询到的商品
        SearchHits hits = response.getHits();
        List<EsServerVO> productSearchs = getEsOrderBOList(response);
        esPageVO.setList(productSearchs);


        //===============分页信息====================//
        //总记录数
        long total = hits.getTotalHits().value;
        esPageVO.setTotal(total);
        // 总页码
        int totalPages = (int) total % dto.getPageSize() == 0 ?
                (int) total / dto.getPageSize() : ((int) total / dto.getPageSize() + 1);
        esPageVO.setPages(totalPages);
        return esPageVO;
    }

    private List<EsServerVO> getEsOrderBOList(SearchResponse response) {

        return getOrderListByResponse(response.getHits().getHits());
    }

    /**
     * 从es返回的数据中获取spu列表
     *
     * @param hits es返回的数据
     * @return
     */
    private List<EsServerVO> getOrderListByResponse(SearchHit[] hits) {
        List<EsServerVO> esOrders = new ArrayList<>();
        for (SearchHit hit : hits) {
            EsServerVO esOrder = JSONUtil.toBean(hit.getSourceAsString(), EsServerVO.class);
            esOrders.add(esOrder);
        }
        return esOrders;
    }


    /**
     * 准备检索请求
     *
     * @param param 搜索参数
     * @return
     */
    private SearchRequest buildSearchRequest(ServerSearchDTO param) {

        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

        // 构建bool-query
        BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();

        // 过滤
        filterQueryIfNecessary(param, boolQueryBuilder);

        // 关键字搜索
        keywordSearch(param, boolQueryBuilder);

        // 排序
        sort(searchSourceBuilder, boolQueryBuilder);

        //分页
        searchSourceBuilder.from((param.getPageNum() - 1) * param.getPageSize());
        searchSourceBuilder.size(param.getPageSize());

        log.info("构建的DSL语句 {}", searchSourceBuilder);

        return new SearchRequest(new String[]{EsIndexEnum.SERVER.value()}, searchSourceBuilder);
    }


    /**
     * 关键字搜索
     */
    private void keywordSearch(ServerSearchDTO param, BoolQueryBuilder boolQueryBuilder) {

        BoolQueryBuilder keywordShouldQuery = QueryBuilders.boolQuery();
//        现住-省
        if (Objects.nonNull(param.getPresentProvince())) {
            keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getPresentProvince), param.getPresentProvince()).operator(Operator.AND));
        }
//        现住-市
        if (Objects.nonNull(param.getPresentCity())) {
            keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getPresentCity), param.getPresentCity()).operator(Operator.AND));
        }
//        现住-区
        if (Objects.nonNull(param.getPresentRegion())) {
            keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getPresentRegion), param.getPresentRegion()).operator(Operator.AND));
        }
//        户籍-省
        if (Objects.nonNull(param.getIdCardProvince())) {
            keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getIdCardProvince), param.getIdCardProvince()).operator(Operator.AND));
        }
//        户籍-市
        if (Objects.nonNull(param.getIdCardCity())) {
            keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getIdCardCity), param.getIdCardCity()).operator(Operator.AND));
        }
//        户籍-区
        if (Objects.nonNull(param.getIdCardRegion())) {
            keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getIdCardRegion), param.getIdCardRegion()).operator(Operator.AND));
        }

//        标签
        if (Objects.nonNull(param.getIntroLabels())) {
            keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getIntroLabel), param.getIntroLabels()).operator(Operator.AND));
        }
//        排班,使用了我的自定义逗号分词器,所以是根据逗号分隔后进行匹配的,但是需要多个匹配,我就使用了for循环,应该是有优化的地方,暂时没处理
        if (param.getServiceStartDate() != null && param.getServiceEndDate() != null) {
            List<String> scheduleList = LocalDateUtil.getContinuousTime(param.getServiceStartDate(), param.getServiceEndDate(), DateTimeFormatter.ofPattern("yyyyMMdd"));
            for (String schedule : scheduleList) {
                keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getSchedule), schedule).operator(Operator.AND));
            }
        }
//        skuId,使用了我的自定义逗号分词器,所以是根据逗号分隔后进行匹配的
        if (Objects.nonNull(param.getSkuId())) {
            keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getSkuId), param.getSkuId()).operator(Operator.AND));
        }
        boolQueryBuilder.must(keywordShouldQuery);
    }

    /**
     * 进行排序
     */
    private void sort(SearchSourceBuilder searchSourceBuilder, BoolQueryBuilder boolQueryBuilder) {
        searchSourceBuilder.sort(ColumnUtil.getName(ServerBO::getCreateTime), SortOrder.DESC);
        searchSourceBuilder.query(boolQueryBuilder);
    }

    /**
     * 过滤查询条件,如果有必要的话
     *
     * @param param            查询条件
     * @param boolQueryBuilder 组合进boolQueryBuilder
     */
    private void filterQueryIfNecessary(ServerSearchDTO param, BoolQueryBuilder boolQueryBuilder) {
//        类型
        if (Objects.nonNull(param.getType())) {
            boolQueryBuilder.filter(QueryBuilders.termQuery(ColumnUtil.getName(ServerBO::getType), param.getType()));
        }
//        性别
        if (Objects.nonNull(param.getGender())) {
            boolQueryBuilder.filter(QueryBuilders.termQuery(ColumnUtil.getName(ServerBO::getGender), param.getGender()));
        }
//        学历
        if (Objects.nonNull(param.getEducation())) {
            boolQueryBuilder.filter(QueryBuilders.termQuery(ColumnUtil.getName(ServerBO::getEducation), param.getEducation()));
        }
        boolQueryBuilder.filter(QueryBuilders.termQuery(ColumnUtil.getName(ServerBO::getAuditStatus), AuditStatus.PASS.getStatus()));
        boolQueryBuilder.filter(QueryBuilders.termQuery(ColumnUtil.getName(ServerBO::getUseStatus), UseStatus.NORMAL.getStatus()));
        boolQueryBuilder.filter(QueryBuilders.termQuery(ColumnUtil.getName(ServerBO::getDel), 0));
    }
}
  1. ServerSearchController
import com.onecode.dtg.basic.common.model.ResultBean;
import com.onecode.middle.search.service.dto.ServerSearchDTO;
import com.onecode.middle.search.service.manager.ServerSearchManager;
import com.onecode.middle.search.service.vo.EsPageVO;
import com.onecode.middle.search.service.vo.search.EsServerVO;
import io.swagger.annotations.Api;
import lombok.AllArgsConstructor;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;

/**
 * @author gideon
 * @date 2022/9/6
 */
@Validated
@AllArgsConstructor
@RestController
@RequestMapping("/search/server/")
@Api(tags = "api-服务者搜索接口")
public class ServerSearchController {

    private final ServerSearchManager serverSearchManager;

    @PostMapping("/page")
    public ResultBean<EsPageVO<EsServerVO>> page(@RequestBody ServerSearchDTO dto) {
        return new ResultBean<>(serverSearchManager.pageSearchResult(dto));
    }

}

  1. 分页参数EsPageDTO实体类
import com.onecode.dtg.basic.common.util.PrincipalUtil;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

import javax.validation.constraints.NotNull;
import java.util.Arrays;

/**
 * @author gideon
 */
@Data
public class EsPageDTO {

    public static final String ASC = "ASC";

    public static final String DESC = "DESC";

    /**
     * 最大分页大小,如果分页大小大于500,则用500作为分页的大小。防止有人直接传入一个较大的数,导致服务器内存溢出宕机
     */
    public static final Integer MAX_PAGE_SIZE = 500;

    /**
     * 当前页
     */
    @NotNull(message = "pageNum 不能为空")
    @ApiModelProperty(value = "当前页", required = true)
    private Integer pageNum;

    @NotNull(message = "pageSize 不能为空")
    @ApiModelProperty(value = "每页大小", required = true)
    private Integer pageSize;

    @ApiModelProperty(value = "排序字段数组,用逗号分割")
    private String[] columns;

    @ApiModelProperty(value = "排序字段方式,用逗号分割,ASC正序,DESC倒序")
    private String[] orders;

    public Integer getPageNum() {
        return pageNum;
    }

    public void setPageNum(Integer pageNum) {
        this.pageNum = pageNum;
    }

    public Integer getPageSize() {
        return pageSize;
    }

    public void setPageSize(Integer pageSize) {
        if (pageSize > MAX_PAGE_SIZE) {
            this.pageSize = MAX_PAGE_SIZE;
            return;
        }
        this.pageSize = pageSize;
    }

    public String getOrderBy() {
        return order(this.columns, this.orders);
    }

    public String[] getColumns() {
        return columns;
    }

    public void setColumns(String[] columns) {
        this.columns = columns;
    }

    public String[] getOrders() {
        return orders;
    }

    public void setOrders(String[] orders) {
        this.orders = orders;
    }

    public static String order(String[] columns, String[] orders) {

        if (columns == null || columns.length == 0) {
            return "";
        }

        StringBuilder stringBuilder = new StringBuilder();

        for (int x = 0; x < columns.length; x++) {

            String column = columns[x];
            String order;

            if (orders != null && orders.length > x) {
                order = orders[x].toUpperCase();
                if (!(order.equals(ASC) || order.equals(DESC))) {
                    throw new IllegalArgumentException("非法的排序策略:" + column);
                }
            } else {
                order = ASC;
            }

            // 判断列名称的合法性,防止SQL注入。只能是【字母,数字,下划线】
            if (PrincipalUtil.isField(column)) {
                throw new IllegalArgumentException("非法的排序字段名称:" + column);
            }

            // 驼峰转换为下划线
            column = humpConversionUnderscore(column);

            if (x != 0) {
                stringBuilder.append(", ");
            }
            stringBuilder.append("`").append(column).append("` ").append(order);
        }
        return stringBuilder.toString();
    }

    public static String humpConversionUnderscore(String value) {
        StringBuilder stringBuilder = new StringBuilder();
        char[] chars = value.toCharArray();
        for (char character : chars) {
            if (Character.isUpperCase(character)) {
                stringBuilder.append("_");
                character = Character.toLowerCase(character);
            }
            stringBuilder.append(character);
        }
        return stringBuilder.toString();
    }


    @Override
    public String toString() {
        return "EsPageDTO{" +
                "pageNum=" + pageNum +
                ", pageSize=" + pageSize +
                ", columns=" + Arrays.toString(columns) +
                ", orders=" + Arrays.toString(orders) +
                '}';
    }
}

  1. 查询参数实体类ServerSearchDTO
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;

import javax.validation.constraints.NotNull;
import java.time.LocalDate;

/**
 * @author gideon
 */
@EqualsAndHashCode(callSuper = true)
@Data
public class ServerSearchDTO extends EsPageDTO{

    @ApiModelProperty("类型")
    private String type;

    @ApiModelProperty("值为1时是男性,值为2时是女性,值为0时是未知")
    private String gender;

    @ApiModelProperty("学历")
    private String education;

    @ApiModelProperty("户籍-省")
    private String idCardProvince;

    @ApiModelProperty("户籍-市")
    private String idCardCity;

    @ApiModelProperty("户籍-区")
    private String idCardRegion;

    @ApiModelProperty("现住-省")
    private String presentProvince;

    @ApiModelProperty("现住-市")
    private String presentCity;

    @ApiModelProperty("现住-区")
    private String presentRegion;

    @ApiModelProperty("介绍-标签(多个值需要使用逗号分割)")
    private String introLabels;

    @ApiModelProperty("服务开始时间")
    @NotNull(message = "服务开始时间不能为空")
    private LocalDate serviceStartDate;

    @ApiModelProperty("服务结束时间")
    @NotNull(message = "服务结束时间不能为空")
    private LocalDate serviceEndDate;

    @ApiModelProperty("skuId")
    @NotNull(message = "skuId不能为空。")
    private Long skuId;

}

  1. 返回值EsServerVO参数
/**
 * @author gideon
 * @date 2022/9/5
 */
@Data
public class EsServerVO {

    @ApiModelProperty("id")
    private Long id;

    @ApiModelProperty("用户标识")
    private Long userId;

    @ApiModelProperty("类型")
    private String type;

    @ApiModelProperty("姓名")
    private String name;

    @ApiModelProperty("值为1时是男性,值为2时是女性,值为0时是未知")
    private String gender;

    @ApiModelProperty("出生年月日")
    @JsonFormat(pattern = "yyyy-MM-dd")
    private LocalDate birthday;

    @ApiModelProperty("学历")
    private String education;

    @ApiModelProperty("从业时间")
    @JsonFormat(pattern = "yyyy-MM-dd")
    private LocalDate practiceDate;

    @ApiModelProperty("评级")
    private String level;

    @ApiModelProperty("认证标签")
    private String authLabel;

    @ApiModelProperty("勋章(逗号隔开)")
    private String medal;

    @ApiModelProperty("服务评分")
    private Integer serviceScore;

    @ApiModelProperty("已实名认证")
    private Integer realNameAuth;

    @ApiModelProperty("身份证号")
    private String idCardNo;

    @ApiModelProperty("户籍-省")
    private String idCardProvince;

    @ApiModelProperty("户籍-市")
    private String idCardCity;

    @ApiModelProperty("户籍-区")
    private String idCardRegion;

    @ApiModelProperty("手机号")
    private String phone;

    @ApiModelProperty("现住-省")
    private String presentProvince;

    @ApiModelProperty("现住-市")
    private String presentCity;

    @ApiModelProperty("现住-区")
    private String presentRegion;

    @ApiModelProperty("现住-地址")
    private String presentAddress;

    @ApiModelProperty("头像")
    private String head;

    @ApiModelProperty("使用状态(初始化:init;正常:normal,禁用:ban)")
    private String useStatus;

    @ApiModelProperty("审核状态 (待审核:await;通过:pass,驳回:reject)")
    private String auditStatus;

    @ApiModelProperty("驳回理由")
    private String rejectReason;

    @ApiModelProperty("注册时间")
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    private LocalDateTime regDate;

    @ApiModelProperty("介绍-内容")
    private String introContent;

    @ApiModelProperty("介绍-视频")
    private String introVideo;

    @ApiModelProperty("介绍-标签")
    private String introLabel;

    @ApiModelProperty("商家标识")
    private Long merchantId;

    @ApiModelProperty("组织标识")
    private Long orgId;

    @ApiModelProperty("护理员申请sku,多个逗号隔开")
    private String skuId;

    @ApiModelProperty("护理员排班数据,多个逗号隔开")
    private String schedule;

    /**
     * 逻辑删除
     */
    private Integer del;

    /**
     * 创建人
     */
    private String createBy;

    /**
     * 创建时间
     */
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    private LocalDateTime createTime;

    /**
     * 更新者
     */
    private String updateBy;

    /**
     * 更新时间
     */
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    private LocalDateTime updateTime;
}

  1. 分页返回值EsPageVO
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

import java.util.List;

/**
 * @author gideon
 * @date 2022/9/5
 */
@Data
public class EsPageVO<T> {

    @ApiModelProperty("总页数")
    private Integer pages;

    @ApiModelProperty("总条目数")
    private Long total;

    @ApiModelProperty("结果集")
    private List<T> list;
}

有关spring boot +springboot集成es7.9.1+canal同步到es的更多相关文章

  1. ruby-on-rails - 如何使辅助方法在 Rails 集成测试中可用? - 2

    我在app/helpers/sessions_helper.rb中有一个帮助程序文件,其中包含一个方法my_preference,它返回当前登录用户的首选项。我想在集成测试中访问该方法。例如,这样我就可以在测试中使用getuser_path(my_preference)。在其他帖子中,我读到这可以通过在测试文件中包含requiresessions_helper来实现,但我仍然收到错误NameError:undefinedlocalvariableormethod'my_preference'.我做错了什么?require'test_helper'require'sessions_hel

  2. ruby-on-rails - 我如何将 Hoptoad 与 DelayedJob 和 DaemonSpawn 集成? - 2

    我一直很高兴地使用DelayedJob习惯用法:foo.send_later(:bar)这会调用DelayedJob进程中对象foo的方法bar。我一直在使用DaemonSpawn在我的服务器上启动DelayedJob进程。但是...如果foo抛出异常,Hoptoad不会捕获它。这是任何这些包中的错误...还是我需要更改某些配置...或者我是否需要在DS或DJ中插入一些异常处理来调用Hoptoad通知程序?回应下面的第一条评论。classDelayedJobWorker 最佳答案 尝试monkeypatchingDelayed::W

  3. 使用canal同步MySQL数据到ES - 2

    文章目录一、概述简介原理模块二、配置Mysql使用版本环境要求1.操作系统2.mysql要求三、配置canal-server离线下载在线下载上传解压修改配置单机配置集群配置分库分表配置1.修改全局配置2.实例配置垂直分库水平分库3.修改group-instance.xml4.启动监听四、配置canal-adapter1修改启动配置2配置映射文件3启动ES数据同步查询所有订阅同步数据同步开关启动4.验证五、配置canal-admin一、概述简介canal是Alibaba旗下的一款开源项目,Java开发。基于数据库增量日志解析,提供增量数据订阅&消费。Git地址:https://github.co

  4. jenkins部署1--jenkins+gitee持续集成 - 2

    前置步骤我们都操作完了,这篇开始介绍jenkins的集成。话不多说,看操作1、登录进入jenkins后会让你选择安装插件,选择第一个默认的就行。安装完成后设置账号密码,重新登录。2、配置JDK和Git都需要执行路径,所以需要先把执行路径找到,先进入服务器的docker容器,2.1JDK的路径root@69eef9ee86cf:/usr/bin#echo$JAVA_HOME/usr/local/openjdk-82.2Git的路径root@69eef9ee86cf:/#whichgit/usr/bin/git3、先配置JDK和Git。点击:ManageJenkins>>GlobalToolCon

  5. ES基础入门 - 2

    ES一、简介1、ElasticStackES技术栈:ElasticSearch:存数据+搜索;QL;Kibana:Web可视化平台,分析。LogStash:日志收集,Log4j:产生日志;log.info(xxx)。。。。使用场景:metrics:指标监控…2、基本概念Index(索引)动词:保存(插入)名词:类似MySQL数据库,给数据Type(类型)已废弃,以前类似MySQL的表现在用索引对数据分类Document(文档)真正要保存的一个JSON数据{name:"tcx"}二、入门实战{"name":"DESKTOP-1TSVGKG","cluster_name":"elasticsear

  6. 三分钟集成 TapTap 防沉迷 SDK(Unity 版) - 2

    三分钟集成Tap防沉迷SDK(Unity版)一、SDK介绍基于国家对上线所有游戏必须增加防沉迷功能的政策下,TapTap推出防沉迷SDK,供游戏开发者进行接入;允许未成年用户在周五、六、日以及法定节假日晚上8:00-9:00进行游戏,防沉谜时间段进入游戏会弹窗进行提示!开发环境要求:Unity2019.4或更高版本iOS10或更高版本Android5.0(APIlevel21)或更高版本🔗Unity集成Demo参考链接🔗UnityTapSDK功能体验APK下载链接二、集成前准备1.创建应用进入开发者后台,按照提示开始创建应用;2.开通服务在使用TDS实名认证和防沉迷服务之前,需要在上面创建的应

  7. ruby-on-rails - RailsTutorial - 第 8.4.3 章 - 在集成测试中添加用户后未清除测试数据库 - 2

    我被这个难住了。到目前为止教程中的一切都进行得很顺利,但是当我将这段代码添加到我的/spec/requests/users_spec.rb文件中时,事情开始变得糟糕:describe"success"doit"shouldmakeanewuser"dolambdadovisitsignup_pathfill_in"Name",:with=>"ExampleUser"fill_in"Email",:with=>"ryan@example.com"fill_in"Password",:with=>"foobar"fill_in"Confirmation",:with=>"foobar"cl

  8. ruby-on-rails - 本地 yaml key 的 i18n 同步 - 2

    类似的问题,但对于java,Keepingi18nresourcessynced如何保持i18nyamllocals的key同步?即,当将key添加到en.yml时,如何将它们添加到nb.yml或ru.yml?如果我在my_title:"atitle"旁边添加键my_label:"sometextinenglish"我想把它给我的其他本地人我指定,因为我不能做所有的翻译,它应该回到其他语言的英语例如en.ymlsomegroup:my_tile:"atitleinenglish"my_label:"sometextinenglish"othergroup:...我想发出命令,将整个键和

  9. ruby-on-rails - 将 Angular JS 与 Rails 集成 - 2

    我需要一些指导来了解如何将Angular整合到rails中。选择Rails的原因:我喜欢他们偏执的做事方式。还有迁移,gem真的很酷。使用angular的原因:我正在研究和寻找最适合SPA的框架。Backbone似乎太抽象了。我不得不在Angular和Ember之间做出选择。我首先开始阅读Angular,它对我来说很有意义。所以我从来没有去读过关于ember的文章。使用Angular和Rails的原因:我研究并尝试使用小型框架,例如grape、slim(是的,我也使用php)。但我觉得需要坚持项目的长期范围。我个人喜欢用Rails的方式做事。这就是我需要帮助的地方,我在Rails4中有

  10. ruby - 在 Maven 集成中运行 Ruby 单元测试 - 2

    有没有人有在Maven中运行用Ruby编写的单元测试的经验。任何输入,如要使用的库/maven插件,将不胜感激!我们已经在使用Maven+hudson+Junit。但是我们正在引入Ruby单元测试,找不到任何同样好的组合。 最佳答案 我建议让Maven使用ExecMavenPlugin启动rake测试(exec:exec目标)并使用ci_reportergem生成单元测试结果的XML文件,Hudson、Bamboo等可以读取该文件,以与JUnit测试相同的格式显示测试结果。如果您不需要使用mvntest运行Ruby测试,您也可以只使

随机推荐