草庐IT

spring cloud stream使用

磊哥 低调 2023-07-28 原文

技术兴起的原因:为了解决系统中不同中间件的适配问题,出现了cloud stream,采用适配绑定的方式,自动给不同的MQ之间进行切换。

屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。

inputs是消费者,outputs是生产者

Stream中的消息通信方式遵循了发布-订阅模式,用 Topic 主题进行广播(在RabbitMQ就是Exchange,在Kafka中就是Topic)

其主要流程如下图

Binder:很方便的连接中间件,屏蔽差异。

Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过channel对队列进行配置。

Source和Sink:简单理解为消息的生产者和消费者。

消息分组

可以让每个消息组都获取消息,而每个组内,只能有一个消费者消费这条消息。
Spring Cloud Stream 其实是发布订阅模型,如果一个topic有多个订阅实例 ,消息就会被这些消息消费者接收到,这样就会带来一个问题,那就是消息的重复消费,这种问题在很多业务场景下是不允许的,我们这时候需要给消息消费者加个分组信息,这样多个消费者实例在一个组下面就不会再出现消息重复消费。
多个消息消费者只有其中一个可以消费(同一个group,多个消费者之间是竞争的关系)。
简单说,生产者发送10条,其中一个消费者收到4条,另一个消费者可能收到6条

Spring Cloud Stream 当使用的消息中间件为Rabbit MQ的时候,配置input的时候,如果没有指定消息分组,那么生成的队列名称就是匿名的,并且当连接断开的时候会自动删除对应的队列。

在Rabbit MQ可以看到对应队列如下,第一个队列就是没有指定消息分组属性group生成的队列,可以看到生成的队列特性auto-delete:true,exclusive:true,也就是队列是排他性的,只有当前连接可见,并且当连接断开的时候队列会自动删除。


如果指定的分组,那么生成的队列名称就是destination+group,例如如下配置生成的队列名字就是css_test.css_test_queue,队列是配置:

spring:
  cloud:
    stream:
      bindings:
        css_test:
          destination: css_test
          binder: rabbit1
          group: css_test_queue      

在Rabbit MQ可以看到对应队列如下,队列是durable: true,队列是持久的,不会自动删除。


1、队列持久化的概念
队列的声明默认是存放到内存中的,若是rabbitmq重启会丢失,若是想重启以后还存在就要使队列持久化,保存到Erlang自带的Mnesia数据库中,当rabbitmq重启以后会读取该数据库。数据库

2、排他
简单理解就是在链接关闭时是否会删除队列(不管队列中有没有消息) 性能

3、自动删除
当队列中有消息时,不管是否排他,关闭链接都不会删除队列,此时消费者消费完消息后再断开消费者,队列会被自动删除。(这里若是有多个消费者消费同一个队列,则须要全部消费者都断开后才能自动删除)

Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动配置,引用了发布、订阅、消费、分区的三个核心概念。

官方版本目前仅仅支持RabbitMQ和Kafka。

MQ相关术语
Message:生产者/消费者之间靠消息媒介传递信息内容

MessageChannel:消息必须走特定的通道

消息通道的子接口SubscribableChannel,由MessageHandle消息处理器所订阅。

相关注解
Middleware:中间件,目前只支持RabbitMQ和Kafka
Binder:应用层和消息中间件之间的封装,实现了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型,这些可以通过配置文件修改。
Input:表示输入通道,消息进入该通道传到应用程序。
Output:注解标识输出通道,发布的消息将通过该通道离开应用程序。
StreamListener:监听队列,用于消费者的队列的消息接收。
EnableBinding:将信道channel和exchange绑定在一起。

如果多次重复消费依然不成功,总要有个兜底的,那就给它丢到死信队列(Dead Letter Queue, DLQ)里就好了。

在死信队列中的消息,不会主动向消费者发送消息,需要我们人工处理,比如将它们取出再次消费等。

在测试时,可以抛出异常试一下,会发现超过阈值(本地重试max-attempts=2)后会抛异常,并将消息放入死信队列中。如下图:

dlq-topic.dlq-group只是正常的队列。死信队列名字的格式为topicName.queueName.dlq。也就是说这里实际的死信队列是dlq-topic.dlq-group.dlq。

而死信队列也可以通过move message把消息重新移入正常队列中进行重新消费。(其实正常队列也可以move,不过没必要)


Spring Cloud Stream核心架构1

两个比较重要的地方:inputs(输入)消息接收端、outputs(输出)消息发送端

一个 Spring Cloud Stream 应用以消息中间件为核心,应用通过Spring Cloud Stream注入的输入/输出通道 channels 与外部进行通信。channels 通过特定的Binder实现与外部消息中间件进行通信。

Spring Cloud Stream核心架构2

黄色:表示RabbitMQ

绿色:插件,消息的输入输出都套了一层插件,插件可以用于各种各样不同的消息,也可以用于消息中间件的替换。

核心概念:

Barista接口:Barista接口是定义来作为后面类的参数,这一接口定义来通道类型和通道名称,通道名称是作为配置用,通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息。

通道接口如何定义:

@Output:输出注解,用于定义发送消息接口

@Input:输入注解,用于定义消息的消费者接口

@StreamListener:用于定义监听方法的注解

使用Spring Cloud Stream 非常简单,只需要使用好这3个注解即可,在实现高性能消息的生产和消费的场景非常合适,但是使用SpringCloudStream框架有一个非常大的问题,就是不能实现可靠性的投递,也就是没法保证消息的100%可靠性,会存在少量消息丢失的问题。目前SpringCloudStream整合了RabbitMQ与Kafka,我们都知道Kafka是无法进行消息可靠性投递的,这个原因是因为SpringCloudStream框架为了和Kafka兼顾所以在实际工作中使用它的目的就是针对高性能的消息通信的!这点就是在当前版本SpringCloudStream的定位。

因此在实际的工作中,可以采用SpringCloudStream,如果需要保证可靠性投递,也可以单独采用RabbitMQ,也是可以的。

yl-jts-edi-tiktok项目发送消息的yl-jts-edi-tiktok-rabbitmq.yaml配置

spring:
  cloud:
    stream:
      bindings:
        test-trace-push-input:
          destination: test-trace-push-phl
          binder: mq-edi
          group: jts-edi-tiktok
          consumer:
            maxAttempts: 3
            backOffInitialInterval: 10000
            backOffMaxInterval: 200000
            backOffMultiplier: 3.0

        tiktok-trace-push-input:
          destination: tiktok-trace-push
          binder: mq-edi
          group: jts-edi-tiktok
          consumer:
            maxAttempts: 3
            backOffInitialInterval: 10000
            backOffMaxInterval: 200000
            backOffMultiplier: 3.0
            
        # 测试轨迹推送
        test-trace-push-output:
          destination: test-trace-push-phl
          binder: mq-edi
        #下单
        tiktok-order-create-output:
          destination: tiktok-order-create
          binder: mq-order

      rabbit:
        bindings:
          test-trace-push-input:
            consumer:
              concurrency: 4
              max-concurrency: 8
              prefetch: 10
              #这个参数为true的时候会自动为当前的队列创建一个死信队列,以dlq结尾
              auto-bind-dlq: true
              republish-to-dlq: true

          tiktok-trace-push-input:
            consumer:
              concurrency: 4
              max-concurrency: 8
              prefetch: 10
              #这个参数为true的时候会自动为当前的队列创建一个死信队列,以dlq结尾
              auto-bind-dlq: true
              republish-to-dlq: true
              #消费者开启延时队列支持
              delayed-exchange: true
            #生产者开启延时队列支持
            producer:
              delayed-exchange: true
              
      defaultBinder: mq-edi
      binders:
        mq-edi:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 172.31.44.118
                port: 5672
                username: admin
                password: 123456
                virtual-host: /jts-phl-job

        mq-order:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 172.31.44.118
                port: 5672
                username: admin
                password: 123456
                virtual-host: /jts-phl-order

jts-phl-openplatform-consumer项目消费的jts-phl-common-mq.yaml配置

spring:
  rabbitmq:
    addresses: 172.31.44.118:5672,172.31.44.119:5672,172.31.44.120:5672
    username: admin
    password: 123456
    virtual-host: /jts-phl-order
    connection-timeout: 3000
  cloud:
    stream:
      bindings:
        #创建订单输出通道
        create-order-out-put:
          destination: createOrder
          group: order_create_group
          content-type: application/json
        #创建订单输入通道
        create-order-in-put:
          destination: createOrder
          group: order_create_group
          content-type: application/json
          consumer:
            #最大重试次数,默认为3
            maxAttempts: 3
            #初始/最少/空闲时 消费者数量。默认1
            concurrency: 1          
        #取消订单输出通道
        cancel-order-out-put:
          destination: cancelOrder
          group: order_cancel_group
          content-type: application/json
        #取消订单输入通道
        cancel-order-in-put:
          destination: cancelOrder
          group: order_cancel_group
          content-type: application/json
          consumer:
            #最大重试次数,默认为3
            maxAttempts: 3  
            #初始/最少/空闲时 消费者数量。默认1
            concurrency: 1                     
        #订单日志输出通道
        order-log-out-put:
          destination: orderLog
          group: order_log_group
          content-type: application/json
        #订单日志输入通道
        order-log-in-put:
          destination: orderLog
          group: order_log_group
          content-type: application/json
          consumer:
            #最大重试次数,默认为3
            maxAttempts: 3  
            #初始/最少/空闲时 消费者数量。默认1
            concurrency: 1
        #取消包裹状态输出通道
        cancel-package-status-out-put:
          destination: cancelPackageStatus
          group: cancel_package_status_group
          content-type: application/json
        #取消包裹状态输入通道
        cancel-package-status-in-put:
          destination: cancelPackageStatus
          group: cancel_package_status_group
          content-type: application/json
          consumer:
            #最大重试次数,默认为3
            maxAttempts: 3  
            #初始/最少/空闲时 消费者数量。默认1
            concurrency: 1  

        #tiktok创建订单
        tiktok-order-create-input:
          destination: tiktok-order-create
          group: tiktok-order-create
          content-type: application/json
          consumer:
            #最大重试次数,默认为3
            maxAttempts: 3  
            #初始/最少/空闲时 消费者数量。默认1
            concurrency: 5                                    
      rabbit:
        bindings:
          #创建订单输入通道 启用死信队列
          create-order-in-put: 
            consumer:
              #限制consumer在消费消息时,一次能同时获取的消息数量,默认:1
              prefetch: 10
              #默认:1。queue的消费者的最大数量。当前消费者数量不足以及时消费消息时, 会动态增加消费者数量, 直到到达最大数量.
              maxConcurrency: 10              
              #是否自动声明死信队列(DLQ)并将其绑定到死信交换机(DLX)。默认是false。
              autoBindDlq: true
              republishToDlq: true
          #取消订单输入通道 启用死信队列
          cancel-order-in-put: 
            consumer:
              #限制consumer在消费消息时,一次能同时获取的消息数量,默认:1
              prefetch: 10
              #默认:1。queue的消费者的最大数量。当前消费者数量不足以及时消费消息时, 会动态增加消费者数量, 直到到达最大数量.
              maxConcurrency: 10              
              #是否自动声明死信队列(DLQ)并将其绑定到死信交换机(DLX)。默认是false。
              autoBindDlq: true
              republishToDlq: true
          #订单日志输入通道
          order-log-in-put:
            consumer:
              #限制consumer在消费消息时,一次能同时获取的消息数量,默认:1
              prefetch: 10
              #默认:1。queue的消费者的最大数量。当前消费者数量不足以及时消费消息时, 会动态增加消费者数量, 直到到达最大数量.
              maxConcurrency: 10
              #是否自动声明死信队列(DLQ)并将其绑定到死信交换机(DLX)。默认是false。
              autoBindDlq: true
              republishToDlq: true
          #取消包裹状态输入通道
          cancel-package-status-in-put:
            consumer:
              #限制consumer在消费消息时,一次能同时获取的消息数量,默认:1
              prefetch: 10
              #默认:1。queue的消费者的最大数量。当前消费者数量不足以及时消费消息时, 会动态增加消费者数量, 直到到达最大数量.
              maxConcurrency: 10
              #是否自动声明死信队列(DLQ)并将其绑定到死信交换机(DLX)。默认是false。
              autoBindDlq: true
              republishToDlq: true   

          #tiktok创建订单 启用死信队列
          tiktok-order-create-input:
            consumer:
              #限制consumer在消费消息时,一次能同时获取的消息数量,默认:1
              prefetch: 10
              #默认:1。queue的消费者的最大数量。当前消费者数量不足以及时消费消息时, 会动态增加消费者数量, 直到到达最大数量.
              maxConcurrency: 10
              #是否自动声明死信队列(DLQ)并将其绑定到死信交换机(DLX)。默认是false。
              autoBindDlq: true
              republishToDlq: true             

springCloud使用stream配置rabbitMq实现延时消息

先安装rabbitMq延时插件
参考我另一篇文章

https://blog.csdn.net/weixin_43944305/article/details/120828003

上依赖

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

消息通道

package com.fchan.springcloudstream.service;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

public interface MyMessageChannel {

    String out = "out";
    String in = "in";
    @Output(out)
    MessageChannel out();
    @Input(in)
    SubscribableChannel in();
}

// 发送延迟消息

@PostMapping("/delayed")
public String sendDelayedMessage(@RequestParam("body") String body,
                                 @RequestParam("seconds") Integer seconds) {

    Map<String,Object> message = new HashMap<>();
    message.put("body", body);
    myMessageChannel.out().send(
            MessageBuilder.withPayload(message)
                    .setHeader("x-delay", seconds * 1000)
                    .build()
    );
    log.info("发送延迟消息成功");
    return "SUCCESS";
}

延时消息接收

package com.fchan.springcloudstream.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
@EnableBinding({MyMessageChannel.class})
public class MyConsumer {
    Logger log = LoggerFactory.getLogger(MyConsumer.class);
    @StreamListener(MyMessageChannel.in)
    public void input(Message<Map<String,Object>> message){
        log.info("收到消息:{}", message.getPayload());
    }
}

yml配置

spring:
  rabbitmq:
    host: 110.40.181.73
    port: 35672
    username: root
    password: 10086
    virtual-host: /fchan
  cloud:
    stream:
      rabbit:
        bindings:
		  #消费者开启延时队列支持
          in:
            consumer:
              delayed-exchange: true
		  #生产者开启延时队列支持
          out:
            producer:
              delayed-exchange: true
      bindings:
        in:
		   #指定消息所属exchange
          destination: test
		  #指定消费者分组,在多实例的时候必需指定,防止重复消费
          group: myIn
        out:
          destination: test

再展示一个例子配合 Spring Cloud Stream 使用延迟交换机

首先来看一下延迟交换机如何配置:

spring:
  cloud:
    stream:
      bindings:
        delayedQueueOutput:
          destination: delayedQueueTopic
          content-type: application/json
          binder: rabbit

        delayedQueueInput:
          destination: delayedQueueTopic
          content-type: application/json
          group: ${spring.application.name}
          binder: rabbit

      rabbit:
        bindings:
          delayedQueueOutput:
            producer:
              delayedExchange: true # 是否将目标exchange声明为一个延迟消息交换机,默认false。即消息productor发布消息到延迟exchange后,延迟n长时间后才将消息推送到指定的queue中。 -RabbitMQ需要安装/启用插件: rabbitmq-delayed-message-exchange

          delayedQueueInput:
            consumer:
              delayedExchange: true # 是否将目标exchange声明为一个延迟消息交换机,默认false。即消息productor发布消息到延迟exchange后,延迟n长时间后才将消息推送到指定的queue中。 -RabbitMQ需要安装/启用插件: rabbitmq-delayed-message-exchange

重点关注2个配置:spring.cloud.stream.rabbit.bindings.ChannelName.producer.delayedExchange 和 spring.cloud.stream.rabbit.bindings.ChannelName.consumer.delayedExchange。

这2个配置分别属于生产者和消费者的配置,但都是用于告诉 Spring Cloud Stream 是否将交换机声明为一个延迟消息交换机。这2个是成对出现,如果少配置了一个,服务启动时会报一个警告.

有关spring cloud stream使用的更多相关文章

  1. ruby - 如何使用 Nokogiri 的 xpath 和 at_xpath 方法 - 2

    我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div

  2. ruby - 使用 RubyZip 生成 ZIP 文件时设置压缩级别 - 2

    我有一个Ruby程序,它使用rubyzip压缩XML文件的目录树。gem。我的问题是文件开始变得很重,我想提高压缩级别,因为压缩时间不是问题。我在rubyzipdocumentation中找不到一种为创建的ZIP文件指定压缩级别的方法。有人知道如何更改此设置吗?是否有另一个允许指定压缩级别的Ruby库? 最佳答案 这是我通过查看ruby​​zip内部创建的代码。level=Zlib::BEST_COMPRESSIONZip::ZipOutputStream.open(zip_file)do|zip|Dir.glob("**/*")d

  3. ruby - 为什么我可以在 Ruby 中使用 Object#send 访问私有(private)/ protected 方法? - 2

    类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc

  4. ruby-on-rails - 使用 Ruby on Rails 进行自动化测试 - 最佳实践 - 2

    很好奇,就使用ruby​​onrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提

  5. ruby - 在 Ruby 中使用匿名模块 - 2

    假设我做了一个模块如下:m=Module.newdoclassCendend三个问题:除了对m的引用之外,还有什么方法可以访问C和m中的其他内容?我可以在创建匿名模块后为其命名吗(就像我输入“module...”一样)?如何在使用完匿名模块后将其删除,使其定义的常量不再存在? 最佳答案 三个答案:是的,使用ObjectSpace.此代码使c引用你的类(class)C不引用m:c=nilObjectSpace.each_object{|obj|c=objif(Class===objandobj.name=~/::C$/)}当然这取决于

  6. ruby - 使用 ruby​​ 和 savon 的 SOAP 服务 - 2

    我正在尝试使用ruby​​和Savon来使用网络服务。测试服务为http://www.webservicex.net/WS/WSDetails.aspx?WSID=9&CATID=2require'rubygems'require'savon'client=Savon::Client.new"http://www.webservicex.net/stockquote.asmx?WSDL"client.get_quotedo|soap|soap.body={:symbol=>"AAPL"}end返回SOAP异常。检查soap信封,在我看来soap请求没有正确的命名空间。任何人都可以建议我

  7. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

  8. ruby-on-rails - 'compass watch' 是如何工作的/它是如何与 rails 一起使用的 - 2

    我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t

  9. ruby - 使用 ruby​​ 将 HTML 转换为纯文本并维护结构/格式 - 2

    我想将html转换为纯文本。不过,我不想只删除标签,我想智能地保留尽可能多的格式。为插入换行符标签,检测段落并格式化它们等。输入非常简单,通常是格式良好的html(不是整个文档,只是一堆内容,通常没有anchor或图像)。我可以将几个正则表达式放在一起,让我达到80%,但我认为可能有一些现有的解决方案更智能。 最佳答案 首先,不要尝试为此使用正则表达式。很有可能你会想出一个脆弱/脆弱的解决方案,它会随着HTML的变化而崩溃,或者很难管理和维护。您可以使用Nokogiri快速解析HTML并提取文本:require'nokogiri'h

  10. ruby - 在 64 位 Snow Leopard 上使用 rvm、postgres 9.0、ruby 1.9.2-p136 安装 pg gem 时出现问题 - 2

    我想为Heroku构建一个Rails3应用程序。他们使用Postgres作为他们的数据库,所以我通过MacPorts安装了postgres9.0。现在我需要一个postgresgem并且共识是出于性能原因你想要pggem。但是我对我得到的错误感到非常困惑当我尝试在rvm下通过geminstall安装pg时。我已经非常明确地指定了所有postgres目录的位置可以找到但仍然无法完成安装:$envARCHFLAGS='-archx86_64'geminstallpg--\--with-pg-config=/opt/local/var/db/postgresql90/defaultdb/po

随机推荐