草庐IT

SpringBoot整合RocketMQ,老鸟们都是这么玩的!

飘渺Jam 2023-04-22 原文

今天我们来讨论如何在项目开发中优雅地使用RocketMQ。本文分为三部分,第一部分实现SpringBoot与RocketMQ的整合,第二部分解决在使用RocketMQ过程中可能遇到的一些问题并解决他们,第三部分介绍如何封装RocketMQ以便更好地使用。

1. SpringBoot整合RocketMQ

在SpringBoot中集成RocketMQ,只需要简单四步:

  1. 引入相关依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
  1. 添加RocketMQ的相关配置
rocketmq:
consumer:
group: springboot_consumer_group
# 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值
pull-batch-size: 10
name-server: 10.5.103.6:9876
producer:
# 发送同一类消息的设置为同一个group,保证唯一
group: springboot_producer_group
# 发送消息超时时间,默认3000
sendMessageTimeout: 10000
# 发送消息失败重试次数,默认2
retryTimesWhenSendFailed: 2
# 异步消息重试此处,默认2
retryTimesWhenSendAsyncFailed: 2
# 消息最大长度,默认1024 * 1024 * 4(默认4M)
maxMessageSize: 4096
# 压缩消息阈值,默认4k(1024 * 4)
compressMessageBodyThreshold: 4096
# 是否在内部发送失败时重试另一个broker,默认false
retryNextServer: false
  1. 使用提供的模板工具类RocketMQTemplate发送消息
@RestController
public class NormalProduceController {
@Setter(onMethod_ = @Autowired)
private RocketMQTemplate rocketmqTemplate;

@GetMapping("/test")
public SendResult test() {
Message<String> msg = MessageBuilder.withPayload("Hello,RocketMQ").build();
SendResult sendResult = rocketmqTemplate.send(topic, msg);
}
}
  1. 实现RocketMQListener接口消费消息
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(topic = "your_topic_name", consumerGroup = "your_consumer_group_name")
public class MyConsumer implements RocketMQListener<String> {

@Override
public void onMessage(String message) {
// 处理消息的逻辑
System.out.println("Received message: " + message);
}
}

以上4步即可实现SpringBoot与RocketMQ的整合,这部分属于基础知识,不做过多说明。

2 使用RocketMQ会遇到的问题

以下是一些在SpringBoot中使用RocketMQ时常遇到的问题,现在为您逐一解决。

2.1 WARN No appenders could be found for logger

启动项目时会在日志中看到如下告警

RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
RocketMQLog:WARN Please initialize the logger system properly.

此时我们只需要在启动类中设置环境变量 rocketmq.client.logUseSlf4j 为 true 明确指定RocketMQ的日志框架

@SpringBootApplication
public class RocketDemoApplication {

public static void main(String[] args) {
/*
* 指定使用的日志框架,否则将会告警
* RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
* RocketMQLog:WARN Please initialize the logger system properly.
*/
System.setProperty("rocketmq.client.logUseSlf4j", "true");

SpringApplication.run(RocketDemoApplication.class, args);
}
}

同时还得在配置文件中调整日志级别,不然在控制台会一直看到broker的日志信息

logging:
level:
RocketmqClient: ERROR
io:
netty: ERROR

2.2 不支持LocalDate 和 LocalDateTime

在使用Java8后经常会使用LocalDate/LocalDateTime这两个时间类型字段,然而RocketMQ原始配置并不支持Java时间类型,当我们发送的实体消息中包含上述两个字段时,消费端在消费时会出现如下所示的错误。

比如生产者的代码如下:

@GetMapping("/test")
public void test(){
//普通消息无返回值,只负责发送消息⽽不等待服务器回应且没有回调函数触发。
RocketMessage rocketMessage = RocketMessage.builder().
id(1111L).
message("hello,world")
.localDate(LocalDate.now())
.localDateTime(LocalDateTime.now())
.build();
rocketmqTemplate.convertAndSend(destination,rocketMessage);
}

消费者的代码如下:

@Component
@RocketMQMessageListener(consumerGroup = "springboot_consumer_group",topic = "consumer_topic")
public class RocketMQConsumer implements RocketMQListener<RocketMessage> {
@Override
public void onMessage(RocketMessage message) {
System.out.println("消费消息-" + message);
}
}

消费者开始消费时会出现类型转换异常错误Cannot construct instance of java.time.LocalDate,错误详情如下:

原因:RocketMQ内置使用的转换器是RocketMQMessageConverter,转换Json时使用的是MappingJackson2MessageConverter,但是这个转换器不支持时间类型。

解决办法:需要自定义消息转换器,将MappingJackson2MessageConverter进行替换,并添加支持时间模块

@Configuration
public class RocketMQEnhanceConfig {

/**
* 解决RocketMQ Jackson不支持Java时间类型配置
* 源码参考:{@link org.apache.rocketmq.spring.autoconfigure.MessageConverterConfiguration}
*/
@Bean
@Primary
public RocketMQMessageConverter enhanceRocketMQMessageConverter(){
RocketMQMessageConverter converter = new RocketMQMessageConverter();
CompositeMessageConverter compositeMessageConverter = (CompositeMessageConverter) converter.getMessageConverter();
List<MessageConverter> messageConverterList = compositeMessageConverter.getConverters();
for (MessageConverter messageConverter : messageConverterList) {
if(messageConverter instanceof MappingJackson2MessageConverter){
MappingJackson2MessageConverter jackson2MessageConverter = (MappingJackson2MessageConverter) messageConverter;
ObjectMapper objectMapper = jackson2MessageConverter.getObjectMapper();
objectMapper.registerModules(new JavaTimeModule());
}
}
return converter;
}
}

2.3 RockeMQ环境隔离

在使用RocketMQ时,通常会在代码中直接指定消息主题(topic),而且开发环境和测试环境可能共用一个RocketMQ环境。如果没有进行处理,在开发环境发送的消息就可能被测试环境的消费者消费,测试环境发送的消息也可能被开发环境的消费者消费,从而导致数据混乱的问题。

为了解决这个问题,我们可以根据不同的环境实现自动隔离。通过简单配置一个选项,如dev、test、prod等不同环境,所有的消息都会被自动隔离。例如,当发送的消息主题为consumer_topic​时,可以自动在topic后面加上环境后缀,如consumer_topic_dev。

那么,我们该如何实现呢?

可以编写一个配置类实现BeanPostProcessor,并重写postProcessBeforeInitialization方法,在监听器实例初始化前修改对应的topic。

BeanPostProcessor是Spring框架中的一个接口,它的作用是在Spring容器实例化、配置完bean之后,在bean初始化前后进行一些额外的处理工作。

具体来说,BeanPostProcessor接口定义了两个方法:

postProcessBeforeInitialization(Object bean, String beanName): 在bean初始化之前进行处理,可以对bean做一些修改等操作。

postProcessAfterInitialization(Object bean, String beanName): 在bean初始化之后进行处理,可以进行一些清理或者其他操作。BeanPostProcessor可以在应用程序中对Bean的创建和初始化过程进行拦截和修改,对Bean的生命周期进行干预和操作。

它可以对所有的Bean类实例进行增强处理,使得开发人员可以在Bean初始化前后自定义一些操作,从而实现自己的业务需求。比如,可以通过BeanPostProcessor来实现注入某些必要的属性值、加入某一个对象等等。

实现方案如下:

  1. 在配置文件中增加相关配置
rocketmq:
enhance:
# 启动隔离,用于激活配置类EnvironmentIsolationConfig
# 启动后会自动在topic上拼接激活的配置文件,达到自动隔离的效果
enabledIsolation: true
# 隔离环境名称,拼接到topic后,topic_dev,默认空字符串
environment: dev
  1. 新增配置类,在实例化消息监听者之前把topic修改掉
@Configuration
public class EnvironmentIsolationConfig implements BeanPostProcessor {
@Value("${rocketmq.enhance.enabledIsolation:true}")
private boolean enabledIsolation;
@Value("${rocketmq.enhance.environment:''}")
private String environmentName;

/**
* 在装载Bean之前实现参数修改
*/
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
if(bean instanceof DefaultRocketMQListenerContainer){

DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;
//拼接Topic
if(enabledIsolation && StringUtils.hasText(environmentName)){
container.setTopic(String.join("_", container.getTopic(),environmentName));
}
return container;
}
return bean;
}
}
  1. 启动项目可以看到日志中消息监听的队列已经被修改了
2023-03-23 17:04:59.726 [main] INFO  o.a.r.s.support.DefaultRocketMQListenerContainer:290 - running container: DefaultRocketMQListenerContainer{cnotallow='springboot_consumer_group', nameServer='10.5.103.6:9876', topic='consumer_topic_dev', cnotallow=CONCURRENTLY, selectorType=TAG, selectorExpressinotallow='*', messageModel=CLUSTERING}

3. RocketMQ二次封装

在解释为什么要二次封装之前先来看看RocketMQ官方文档中推荐的最佳实践

  1. 消息发送成功或者失败要打印消息日志,用于业务排查问题。
  2. 如果消息量较少,建议在消费入口方法打印消息,消费耗时等,方便后续排查问题。
  3. RocketMQ 无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。可以借助关系数据库进行去重。首先需要确定消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等。

上面三个步骤基本每次发送消息或者消费消息都要实现,属于重复动作。

接下来讨论的是在RocketMQ中发送消息时选择何种消息类型最为合适。

在RocketMQ中有四种可选格式:

  1. 发送Json对象
  2. 发送转Json后的String对象
  3. 根据业务封装对应实体类
  4. 直接使用原生MessageExt接收。

对于如何选择消息类型,需要考虑到消费者在不查看消息发送者的情况下,如何获取消息的含义。因此,在这种情况下,使用第三种方式即根据业务封装对应实体类的方式最为合适,也是大多数开发者在发送消息时的常用方式。

有了上面两点结论以后我们来看看为什么要对RocketMQ二次封装。

3.1 为什么要二次封装

按照上述最佳实践,一个完整的消息传递链路从生产到消费应包括 准备消息、发送消息、记录消息日志、处理发送失败、记录接收消息日志、处理业务逻辑、异常处理和异常重试 等步骤。

虽然使用原生RocketMQ可以完成这些动作,但每个生产者和消费者都需要编写大量重复的代码来完成相同的任务,这就是需要进行二次封装的原因。我们希望通过二次封装,生产者只需准备好消息实体并调用封装后的工具类发送,而消费者只需处理核心业务逻辑,其他公共逻辑会得到统一处理。 

在二次封装中,关键是找出框架在日常使用中所涵盖的许多操作,以及区分哪些操作是可变的,哪些是不变的。以上述例子为例,实际上只有生产者的消息准备和消费者的业务处理是可变的操作,需要根据需求进行处理,而其他步骤可以固定下来形成一个模板。

当然,本文提到的二次封装不是指对源代码进行封装,而是针对工具的原始使用方式进行的封装。可以将其与Mybatis和Mybatis-plus区分开来。这两者都能完成任务,只不过Mybatis-plus更为简单便捷。

3.2 实现二次封装

实现二次封装需要创建一个自定义的starter,这样其他项目只需要依赖此starter即可使用封装功能。同时,在自定义starter中还需要解决文章第二部分中提到的一些问题。

代码结构如下所示:

image-20230403160031944

3.2.1 消息实体类的封装

/**
* 消息实体,所有消息都需要继承此类
* 公众号:JAVA日知录
*/
@Data
public abstract class BaseMessage {
/**
* 业务键,用于RocketMQ控制台查看消费情况
*/
protected String key;
/**
* 发送消息来源,用于排查问题
*/
protected String source = "";

/**
* 发送时间
*/
protected LocalDateTime sendTime = LocalDateTime.now();

/**
* 重试次数,用于判断重试次数,超过重试次数发送异常警告
*/
protected Integer retryTimes = 0;
}

后面所有发送的消息实体都需要继承此实体类。

3.2.2 消息发送工具类的封装

@Slf4j
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class RocketMQEnhanceTemplate {
private final RocketMQTemplate template;

@Resource
private RocketEnhanceProperties rocketEnhanceProperties;

public RocketMQTemplate getTemplate() {
return template;
}

/**
* 根据系统上下文自动构建隔离后的topic
* 构建目的地
*/
public String buildDestination(String topic, String tag) {
topic = reBuildTopic(topic);
return topic + ":" + tag;
}

/**
* 根据环境重新隔离topic
* @param topic 原始topic
*/
private String reBuildTopic(String topic) {
if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.hasText(rocketEnhanceProperties.getEnvironment())){
return topic +"_" + rocketEnhanceProperties.getEnvironment();
}
return topic;
}

/**
* 发送同步消息
*/
public <T extends BaseMessage> SendResult send(String topic, String tag, T message) {
// 注意分隔符
return send(buildDestination(topic,tag), message);
}


public <T extends BaseMessage> SendResult send(String destination, T message) {
// 设置业务键,此处根据公共的参数进行处理
// 更多的其它基础业务处理...
Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
SendResult sendResult = template.syncSend(destination, sendMessage);
// 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集
log.info("[{}]同步消息[{}]发送结果[{}]", destination, JSONObject.toJSON(message), JSONObject.toJSON(sendResult));
return sendResult;
}

/**
* 发送延迟消息
*/
public <T extends BaseMessage> SendResult send(String topic, String tag, T message, int delayLevel) {
return send(buildDestination(topic,tag), message, delayLevel);
}

public <T extends BaseMessage> SendResult send(String destination, T message, int delayLevel) {
Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
SendResult sendResult = template.syncSend(destination, sendMessage, 3000, delayLevel);
log.info("[{}]延迟等级[{}]消息[{}]发送结果[{}]", destination, delayLevel, JSONObject.toJSON(message), JSONObject.toJSON(sendResult));
return sendResult;
}
}

这里封装了一个消息发送类,实现了日志记录以及自动重建topic的功能(即生产者实现环境隔离),后面项目中只需要注入RocketMQEnhanceTemplate来实现消息的发送。

3.2.3 消费者的封装

@Slf4j
public abstract class EnhanceMessageHandler<T extends BaseMessage> {
/**
* 默认重试次数
*/
private static final int MAX_RETRY_TIMES = 3;

/**
* 延时等级
*/
private static final int DELAY_LEVEL = EnhanceMessageConstant.FIVE_SECOND;


@Resource
private RocketMQEnhanceTemplate rocketMQEnhanceTemplate;

/**
* 消息处理
*
* @param message 待处理消息
* @throws Exception 消费异常
*/
protected abstract void handleMessage(T message) throws Exception;

/**
* 超过重试次数消息,需要启用isRetry
*
* @param message 待处理消息
*/
protected abstract void handleMaxRetriesExceeded(T message);


/**
* 是否需要根据业务规则过滤消息,去重逻辑可以在此处处理
* @param message 待处理消息
* @return true: 本次消息被过滤,false:不过滤
*/
protected boolean filter(T message) {
return false;
}

/**
* 是否异常时重复发送
*
* @return true: 消息重试,false:不重试
*/
protected abstract boolean isRetry();

/**
* 消费异常时是否抛出异常
* 返回true,则由rocketmq机制自动重试
* false:消费异常(如果没有开启重试则消息会被自动ack)
*/
protected abstract boolean throwException();

/**
* 最大重试次数
*
* @return 最大重试次数,默认5次
*/
protected int getMaxRetryTimes() {
return MAX_RETRY_TIMES;
}

/**
* isRetry开启时,重新入队延迟时间
* @return -1:立即入队重试
*/
protected int getDelayLevel() {
return DELAY_LEVEL;
}

/**
* 使用模板模式构建消息消费框架,可自由扩展或删减
*/
public void dispatchMessage(T message) {
// 基础日志记录被父类处理了
log.info("消费者收到消息[{}]", JSONObject.toJSON(message));

if (filter(message)) {
log.info("消息id{}不满足消费条件,已过滤。",message.getKey());
return;
}
// 超过最大重试次数时调用子类方法处理
if (message.getRetryTimes() > getMaxRetryTimes()) {
handleMaxRetriesExceeded(message);
return;
}
try {
long now = System.currentTimeMillis();
handleMessage(message);
long costTime = System.currentTimeMillis() - now;
log.info("消息{}消费成功,耗时[{}ms]", message.getKey(),costTime);
} catch (Exception e) {
log.error("消息{}消费异常", message.getKey(),e);
// 是捕获异常还是抛出,由子类决定
if (throwException()) {
//抛出异常,由DefaultMessageListenerConcurrently类处理
throw new RuntimeException(e);
}
//此时如果不开启重试机制,则默认ACK了
if (isRetry()) {
handleRetry(message);
}
}
}

protected void handleRetry(T message) {
// 获取子类RocketMQMessageListener注解拿到topic和tag
RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class);
if (annotation == null) {
return;
}
//重新构建消息体
String messageSource = message.getSource();
if(!messageSource.startsWith(EnhanceMessageConstant.RETRY_PREFIX)){
message.setSource(EnhanceMessageConstant.RETRY_PREFIX + messageSource);
}
message.setRetryTimes(message.getRetryTimes() + 1);

SendResult sendResult;

try {
// 如果消息发送不成功,则再次重新发送,如果发送异常则抛出由MQ再次处理(异常时不走延迟消息)
sendResult = rocketMQEnhanceTemplate.send(annotation.topic(), annotation.selectorExpression(), message, getDelayLevel());
} catch (Exception ex) {
// 此处捕获之后,相当于此条消息被消息完成然后重新发送新的消息
//由生产者直接发送
throw new RuntimeException(ex);
}
// 发送失败的处理就是不进行ACK,由RocketMQ重试
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
throw new RuntimeException("重试消息发送失败");
}

}
}

使用模版设计模式定义了消息消费的骨架,实现了日志打印,异常处理,异常重试等公共逻辑,消息过滤(查重)、业务处理则交由子类实现。

3.2.4 基础配置类

@Configuration
@EnableConfigurationProperties(RocketEnhanceProperties.class)
public class RocketMQEnhanceAutoConfiguration {

/**
* 注入增强的RocketMQEnhanceTemplate
*/
@Bean
public RocketMQEnhanceTemplate rocketMQEnhanceTemplate(RocketMQTemplate rocketMQTemplate){
return new RocketMQEnhanceTemplate(rocketMQTemplate);
}

/**
* 解决RocketMQ Jackson不支持Java时间类型配置
* 源码参考:{@link org.apache.rocketmq.spring.autoconfigure.MessageConverterConfiguration}
*/
@Bean
@Primary
public RocketMQMessageConverter enhanceRocketMQMessageConverter(){
RocketMQMessageConverter converter = new RocketMQMessageConverter();
CompositeMessageConverter compositeMessageConverter = (CompositeMessageConverter) converter.getMessageConverter();
List<MessageConverter> messageConverterList = compositeMessageConverter.getConverters();
for (MessageConverter messageConverter : messageConverterList) {
if(messageConverter instanceof MappingJackson2MessageConverter){
MappingJackson2MessageConverter jackson2MessageConverter = (MappingJackson2MessageConverter) messageConverter;
ObjectMapper objectMapper = jackson2MessageConverter.getObjectMapper();
objectMapper.registerModules(new JavaTimeModule());
}
}
return converter;
}


/**
* 环境隔离配置
*/
@Bean
@ConditionalOnProperty(name="rocketmq.enhance.enabledIsolation", havingValue="true")
public EnvironmentIsolationConfig environmentSetup(RocketEnhanceProperties rocketEnhanceProperties){
return new EnvironmentIsolationConfig(rocketEnhanceProperties);
}

}
public class EnvironmentIsolationConfig implements BeanPostProcessor {
private RocketEnhanceProperties rocketEnhanceProperties;

public EnvironmentIsolationConfig(RocketEnhanceProperties rocketEnhanceProperties) {
this.rocketEnhanceProperties = rocketEnhanceProperties;
}


/**
* 在装载Bean之前实现参数修改
*/
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
if(bean instanceof DefaultRocketMQListenerContainer){

DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;

if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.hasText(rocketEnhanceProperties.getEnvironment())){
container.setTopic(String.join("_", container.getTopic(),rocketEnhanceProperties.getEnvironment()));
}
return container;
}
return bean;
}
}
@ConfigurationProperties(prefix = "rocketmq.enhance")
@Data
public class RocketEnhanceProperties {

private boolean enabledIsolation;

private String environment;
}

3.3 封装后的使用

3.3.1 引入依赖

<dependency>
<groupId>com.jianzh5</groupId>
<artifactId>cloud-rocket-starter</artifactId>
</dependency>

3.3.2 自定义配置

rocketmq:
...
enhance:
# 启动隔离,用于激活配置类EnvironmentIsolationConfig
# 启动后会自动在topic上拼接激活的配置文件,达到自动隔离的效果
enabledIsolation: true
# 隔离环境名称,拼接到topic后,topic_dev,默认空字符串
environment: dev

3.3.3 发送消息

@RestController
@RequestMapping("enhance")
@Slf4j
public class EnhanceProduceController {

//注入增强后的模板,可以自动实现环境隔离,日志记录
@Setter(onMethod_ = @Autowired)
private RocketMQEnhanceTemplate rocketMQEnhanceTemplate;

private static final String topic = "rocket_enhance";
private static final String tag = "member";

/**
* 发送实体消息
*/
@GetMapping("/member")
public SendResult member() {
String key = UUID.randomUUID().toString();
MemberMessage message = new MemberMessage();
// 设置业务key
message.setKey(key);
// 设置消息来源,便于查询
message.setSource("MEMBER");
// 业务消息内容
message.setUserName("Java日知录");
message.setBirthday(LocalDate.now());

return rocketMQEnhanceTemplate.send(topic, tag, message);
}
}

注意这里使用的是封装后的模板工具类,一旦在配置文件中启动环境隔离,则生产者的消息也自动发送到隔离后的topic中。

3.3.4 消费者

@Slf4j
@Component
@RocketMQMessageListener(
consumerGroup = "enhance_consumer_group",
topic = "rocket_enhance",
selectorExpression = "*",
consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
)
public class EnhanceMemberMessageListener extends EnhanceMessageHandler<MemberMessage> implements RocketMQListener<MemberMessage> {

@Override
protected void handleMessage(MemberMessage message) throws Exception {
// 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试
System.out.println("业务消息处理:"+message.getUserName());
}

@Override
protected void handleMaxRetriesExceeded(MemberMessage message) {
// 当超过指定重试次数消息时此处方法会被调用
// 生产中可以进行回退或其他业务操作
log.error("消息消费失败,请执行后续处理");
}


/**
* 是否执行重试机制
*/
@Override
protected boolean isRetry() {
return true;
}

@Override
protected boolean throwException() {
// 是否抛出异常,false搭配retry自行处理异常
return false;
}

@Override
protected boolean filter() {
// 消息过滤
return false;
}

/**
* 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
*/
@Override
public void onMessage(MemberMessage memberMessage) {
super.dispatchMessage(memberMessage);
}
}

为了方便消费者对RocketMQ中的消息进行处理,我们可以使用EnhanceMessageHandler来进行消息的处理和逻辑的处理。

消费者实现了RocketMQListener的同时,可以继承EnhanceMessageHandler来进行公共逻辑的处理,而核心业务逻辑需要自己实现handleMessage方法。 如果需要对消息进行过滤或者去重的处理,则可以重写父类的filter方法进行实现。这样可以更加方便地对消息进行处理,减轻开发者的工作量。

以上,就是今天的主要内容,希望对你有所帮助!

有关SpringBoot整合RocketMQ,老鸟们都是这么玩的!的更多相关文章

  1. ruby - 为什么在 Ruby 中一切都是 Class 的实例? - 2

    这个问题在这里已经有了答案:Rubymetaclassconfusion(4个答案)关闭7年前。我对Ruby对象模型不太了解。首先,Ruby中的一切都是Class的实例吗??这些都产生true:pObject.instance_of?(Class)pClass.instance_of?(Class)pModule.instance_of?(Class)pBasicObject.instance_of?(Class)classHello;endpHello.instance_of?(Class)我不太明白这怎么可能,如果Object是Class的父类(superclass),它怎么可能都

  2. ruby - 如何将文本文件读入数组数组(每个子数组都是文本文件中的一行?) - 2

    所以我在Ruby方面几乎是个新手,我整理了一个代码来解决MinCut问题(对于一个作业,是的——我整理并测试了那部分代码),并且我无法弄清楚如何读取文件并将其放入数组数组中。我有一个文本文件要阅读,其中包含不同长度的列,如下所示137791642123134348123134109我想将它读入一个二维数组,其中每一行和每一列都被拆分,每一行都进入一个数组。因此,上述示例的结果数组将是:[[1,37,79,164],[2,123,134],[3,48,123,134,109]]我读取文本文件的代码如下:defread_array(file,count)int_array=[]File.f

  3. springboot定时任务 - 2

    如果您希望在Spring中启用定时任务功能,则需要在主类上添加 @EnableScheduling 注解。这样Spring才会扫描 @Scheduled 注解并执行定时任务。在大多数情况下,只需要在主类上添加 @EnableScheduling 注解即可,不需要在Service层或其他类中再次添加。以下是一个示例,演示如何在SpringBoot中启用定时任务功能:@SpringBootApplication@EnableSchedulingpublicclassApplication{publicstaticvoidmain(String[]args){SpringApplication.ru

  4. 基于SpringBoot的线上日志阅读器 - 2

    软件特点部署后能通过浏览器查看线上日志。支持Linux、Windows服务器。采用随机读取的方式,支持大文件的读取。支持实时打印新增的日志(类终端)。支持日志搜索。使用手册基本页面配置路径配置日志所在的目录,配置后按回车键生效,下拉框选择日志名称。选择日志后点击生效,即可加载日志。windows路径E:\java\project\log-view\logslinux路径/usr/local/XX历史模式历史模式下,不会读取新增的日志。针对历史文件可以分页读取,配置分页大小、跳转。历史模式下,支持根据关键词搜索。目前搜索引擎使用的是jdk自带类库,搜索速度相对较低,优点是比较简单。2G日志全文搜

  5. springboot使用validator进行参数校验 - 2

    1.依赖导入org.springframework.bootspring-boot-starter-weborg.springframework.bootspring-boot-starter-validation2.validation常用注解@Null被注释的元素必须为null@NotNull被注释的元素不能为null,可以为空字符串@AssertTrue被注释的元素必须为true@AssertFalse被注释的元素必须为false@Min(value)被注释的元素必须是一个数字,其值必须大于等于指定的最小值@Max(value)被注释的元素必须是一个数字,其值必须小于等于指定的最大值@D

  6. ruby - 如何将 Interactive Ruby 整合到我的开发过程中? - 2

    我正在尝试找到一种更好的方法将IRB与我的常规ruby​​开发集成。目前我很少在我的代码中使用IRB。我只用它来验证语法或尝试一些小的东西。我知道我可以将我自己的代码加载到ruby​​中作为一个require'mycode'但这通常不符合我的编程风格。有时我要检查的变量超出范围或在循环内。有没有一种简单的方法可以启动我的脚本并在IRB内的某个点卡住?我想我正在寻找一种更简单的方法来调试我的ruby​​代码而不破坏我的F5(编译)键。也许有经验的ruby开发者可以和我分享一个更精简的开发方法。 最佳答案 安装ruby​​-debugg

  7. ruby - 使用 Drupal 和 Ruby。有没有人整合两者? - 2

    我开始了一个小型网络项目并使用Drupal来构建它。到目前为止,还不错:您可以快速建立一个不错的面向CMS的网站,通过模块添加社交功能,并且您有一个广泛的API可以在一个架构良好的平台中进行自定义。现在问题来了:网站的增长超出了最初的计划,我发现自己正处于认真开始为它编写代码的境地。由于Drupal项目,我对PHP有了新的认识,但我想用Ruby来做。我会感觉更舒服,以后维护起来更容易,我可以在其他Ruby/Rails应用程序中重用它。随着时间的推移,我想我会用Ruby重写Drupal中的现有部分。基于此,问题是:是否有人将两者(成功或失败的故事)结合起来?这是一个相当大的决定,但我在G

  8. 停车系统源码-基于springboot+uniapp开源项目 - 2

    Iparking停车收费管理系统-可商用介绍Iparking是一款基于springBoot的停车收费管理系统,支持封闭车场和路边车场,支持微信支付宝多种支付渠道,支持多种硬件,涵盖了停车场管理系统的所有基础功能。技术栈Springboot,MybatisPlus,Beetl,Mysql,Redis,RabbitMQ,UniApp功能云端功能序号模块功能描述1系统管理菜单管理配置系统菜单2系统管理组织管理管理组织机构3系统管理角色管理配置系统角色,包含数据权限和功能权限配置4系统管理用户管理管理后台用户5系统管理租户管理多租户管理6系统管理公众号配置租户公众号配置7系统管理操作日志审计日志8系统

  9. Ruby,检查字符串是否都是有效的十六进制字符? - 2

    我必须检查4个字符的字符串是否都是有效的十六进制,我发现了另一个问题,它准确地演示了我想做什么,但它是Java:RegextocheckstringcontainsonlyHexcharacters我怎样才能做到这一点?我阅读了有关正则表达式的ruby​​文档,但我不明白如何根据此匹配项返回true或false? 最佳答案 在ruby​​正则表达式中,\h匹配一个十六进制数字,\H匹配一个非十六进制数字。所以!str[/\H/]就是您要查找的内容。 关于Ruby,检查字符串是否都是有效的

  10. ruby-on-rails - 为什么都是autoload,load_all!并要求全部用于 active_support.rb? - 2

    我正在查看active_support.rb以尝试了解它使用的加载过程。它使用三种加载方法:load_all!、autoload和require。为什么在同一个文件中使用三种不同的加载方式?moduleActiveSupportdefself.load_all![Dependencies,Deprecation,Gzip,MessageVerifier,Multibyte,SecureRandom,TimeWithZone]endautoload:BacktraceCleaner,'active_support/backtrace_cleaner'autoload:Base64,'ac

随机推荐