草庐IT

(17)不重启服务动态调整RabbitMQ消费者数量

没头脑遇到不高兴 2023-07-11 原文

        我们使用springboot集成rabbitmq时会配置消费者数量,然而我们想调整这个数量时却每次都要重启,这样就很麻烦。如果能在不重启服务的情况下,可以动态调整消费者数量的话就会是分方便了。

        先看下springboot中关于rabbitmq的自动配置类,RabbitAutoConfiguration,

@Configuration
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
@EnableConfigurationProperties(RabbitProperties.class)
@Import(RabbitAnnotationDrivenConfiguration.class)
public class RabbitAutoConfiguration {
    。。。。。。
}

        @Import导入了RabbitAnnotationDrivenConfiguration

@Configuration
@ConditionalOnClass(EnableRabbit.class)
class RabbitAnnotationDrivenConfiguration {
    ......
}

        RabbitAnnotationDrivenConfiguration上面有个EnableRabbit,打开看一下EnableRabbit是一个注解,里面又导入了RabbitBootstrapConfiguration

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(RabbitBootstrapConfiguration.class)
public @interface EnableRabbit {
}

        RabbitBootstrapConfiguration类内容如下:

@Configuration
public class RabbitBootstrapConfiguration {

	@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor() {
		return new RabbitListenerAnnotationBeanPostProcessor();
	}

	@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
	public RabbitListenerEndpointRegistry defaultRabbitListenerEndpointRegistry() {
		return new RabbitListenerEndpointRegistry();
	}

}

        RabbitListenerAnnotationBeanPostProcessor用于处理@RabbitListener注解修饰的方法

        RabbitListenerEndpointRegistry用于创建和管理消息监听容器MessageListenerContainer,重点看这里。。。。。

        RabbitListenerEndpointRegistry类中有个registerListenerContainer注册消息监听容器的方法,该方法被RabbitListenerEndpointRegistrar的registerAllEndpoints调用,endpointDescriptors是前面的RabbitListenerAnnotationBeanPostProcessor获取的@RabbitListener注解修饰的消息消费处理的方法集合。

@Override
public void afterPropertiesSet() {
	registerAllEndpoints();
}

protected void registerAllEndpoints() {
	Assert.state(this.endpointRegistry != null, "No registry available");
	synchronized (this.endpointDescriptors) {
		for (AmqpListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
			this.endpointRegistry.registerListenerContainer(// NOSONAR never null
					descriptor.endpoint, resolveContainerFactory(descriptor));
		}
		this.startImmediately = true;  // trigger immediate startup
	}
}

        这里不再深入探究具体的源码了,感兴趣的话可以自己翻看一下。大致调用顺序为:

  1. RabbitListenerAnnotationBeanPostProcessor.postProcessAfterInitialization(获取被@RabbitListener注解修饰的方法)————>RabbitListenerEndpointRegistrar.registerEndpoint(添加到endpointDescriptors list集合)
  2. RabbitListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated()————>RabbitListenerEndpointRegistrar.afterPropertiesSet————>RabbitListenerEndpointRegistrar.registerAllEndpoints————>RabbitListenerEndpointRegistry.registerListenerContainer

         RabbitListenerEndpointRegistry.registerListenerContainer方法如下,将所有创建的消息监听容器MessageListenerContainer都放到了listenerContainers这个map中。

private final Map<String, MessageListenerContainer> listenerContainers =
			new ConcurrentHashMap<String, MessageListenerContainer>();
public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory,
				boolean startImmediately) {
	Assert.notNull(endpoint, "Endpoint must not be null");
	Assert.notNull(factory, "Factory must not be null");

	String id = endpoint.getId();
	Assert.hasText(id, "Endpoint id must not be empty");
	synchronized (this.listenerContainers) {
		Assert.state(!this.listenerContainers.containsKey(id),
				"Another endpoint is already registered with id '" + id + "'");
		MessageListenerContainer container = createListenerContainer(endpoint, factory);
		this.listenerContainers.put(id, container);
		if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
			List<MessageListenerContainer> containerGroup;
			if (this.applicationContext.containsBean(endpoint.getGroup())) {
				containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
			}
			else {
				containerGroup = new ArrayList<MessageListenerContainer>();
				this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
			}
			containerGroup.add(container);
		}
		if (startImmediately) {
			startIfNecessary(container);
		}
	}
}

        我们遍历listenerContainers就能拿到对应的消息监听容器MessageListenerContainer,然后调用MessageListenerContainer的setConcurrentConsumers、setMaxConcurrentConsumers方法就可以调整消费者数量了。

        RabbitListenerEndpointRegistry.getListenerContainers可以获取所有消费监听容器

public void setConcurrentConsumers(final int concurrentConsumers) {
	Assert.isTrue(concurrentConsumers > 0, "'concurrentConsumers' value must be at least 1 (one)");
	Assert.isTrue(!isExclusive() || concurrentConsumers == 1,
			"When the consumer is exclusive, the concurrency must be 1");
	if (this.maxConcurrentConsumers != null) {
		Assert.isTrue(concurrentConsumers <= this.maxConcurrentConsumers,
				"'concurrentConsumers' cannot be more than 'maxConcurrentConsumers'");
	}
	synchronized (this.consumersMonitor) {
		if (logger.isDebugEnabled()) {
			logger.debug("Changing consumers from " + this.concurrentConsumers + " to " + concurrentConsumers);
		}
		int delta = this.concurrentConsumers - concurrentConsumers;
		this.concurrentConsumers = concurrentConsumers;
		if (isActive()) {
			adjustConsumers(delta);
		}
	}
}


public void setMaxConcurrentConsumers(int maxConcurrentConsumers) {
	Assert.isTrue(maxConcurrentConsumers >= this.concurrentConsumers,
			"'maxConcurrentConsumers' value must be at least 'concurrentConsumers'");
	Assert.isTrue(!isExclusive() || maxConcurrentConsumers == 1,
			"When the consumer is exclusive, the concurrency must be 1");
	Integer oldMax = this.maxConcurrentConsumers;
	this.maxConcurrentConsumers = maxConcurrentConsumers;
	if (oldMax != null && isActive()) {
		int delta = oldMax - maxConcurrentConsumers;
		if (delta > 0) { // only decrease, not increase
			adjustConsumers(delta);
		}
	}

}

        废话不多说,直接上代码示例:

@Resource
RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;

@RequestMapping(value = "/modifyMqConsumerNum")
@ApiOperation(value = "更新队列消费者数量接口")
public Response modifyMqConsumerNum(@RequestParam(value = "queueName", required = false) String queueName,
									@RequestParam(value = "concurrentConsumers") Integer concurrentConsumers,
									@RequestParam(value = "maxConcurrentConsumers") Integer maxConcurrentConsumers) {
	Collection<MessageListenerContainer> listenerContainers = rabbitListenerEndpointRegistry.getListenerContainers();
	for (MessageListenerContainer container : listenerContainers) {
		SimpleMessageListenerContainer con = (SimpleMessageListenerContainer) container;
		//消息监听容器要消费的队列名称集合
		List<String> queueNamesList = Arrays.asList(con.getQueueNames());
		//判断容器中的队列名称是否包含需要调整的队列名参数
		if (queueNamesList.contains(queueName)) {
			//注意先设置最大的消费者数量,再设置最小的消费者数量,因为先修改最小数量超过修改前的最大数量时会报异常修改失败
			con.setMaxConcurrentConsumers(maxConcurrentConsumers);
			con.setConcurrentConsumers(concurrentConsumers);
		}
	}
	return Response.success();
}

        调用RabbitListenerEndpointRegistry.getListenerContainers获取所有消费者监听容器,判断是否包含要调整的队列名称,如果包含则进行调整。

        注意:先设置最大的消费者数量,再设置最小的消费者数量,因为先修改最小数量超过修改前的最大数量时会报异常修改失败。例如之前最小最大分别是2和4,如果先将最小改成5则会报参数异常,即最小数量超过了最大数量。

java.lang.IllegalArgumentException: 'concurrentConsumers' cannot be more than 'maxConcurrentConsumers'
    at org.springframework.util.Assert.isTrue(Assert.java:118)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.setConcurrentConsumers(SimpleMessageListenerContainer.java:161)

         另外,这里通过接口修改只能改服务的一个实例的消费者数量,生产上面一个服务都是集群部署的,可以结合配置中心(Nacos、Apollo等)进行处理。程序中监听配置中心的对应队列的消费者数量,如果数值发生了变化,则调用上面的方法进行变更就好了,这里就不再进行实现了。

有关(17)不重启服务动态调整RabbitMQ消费者数量的更多相关文章

  1. ruby - 使用 ruby​​ 和 savon 的 SOAP 服务 - 2

    我正在尝试使用ruby​​和Savon来使用网络服务。测试服务为http://www.webservicex.net/WS/WSDetails.aspx?WSID=9&CATID=2require'rubygems'require'savon'client=Savon::Client.new"http://www.webservicex.net/stockquote.asmx?WSDL"client.get_quotedo|soap|soap.body={:symbol=>"AAPL"}end返回SOAP异常。检查soap信封,在我看来soap请求没有正确的命名空间。任何人都可以建议我

  2. ruby - 具有身份验证的私有(private) Ruby Gem 服务器 - 2

    我想安装一个带有一些身份验证的私有(private)Rubygem服务器。我希望能够使用公共(public)Ubuntu服务器托管内部gem。我读到了http://docs.rubygems.org/read/chapter/18.但是那个没有身份验证-如我所见。然后我读到了https://github.com/cwninja/geminabox.但是当我使用基本身份验证(他们在他们的Wiki中有)时,它会提示从我的服务器获取源。所以。如何制作带有身份验证的私有(private)Rubygem服务器?这是不可能的吗?谢谢。编辑:Geminabox问题。我尝试“捆绑”以安装新的gem..

  3. ruby-on-rails - rails 目前在重启后没有安装 - 2

    我有一个奇怪的问题:我在rvm上安装了ruby​​onrails。一切正常,我可以创建项目。但是在我输入“railsnew”时重新启动后,我有“程序'rails'当前未安装。”。SystemUbuntu12.04ruby-v"1.9.3p194"gemlistactionmailer(3.2.5)actionpack(3.2.5)activemodel(3.2.5)activerecord(3.2.5)activeresource(3.2.5)activesupport(3.2.5)arel(3.0.2)builder(3.0.0)bundler(1.1.4)coffee-rails(

  4. ruby-on-rails - 如何优雅地重启 thin + nginx? - 2

    我的瘦服务器配置了nginx,我的ROR应用程序正在它们上运行。在我发布代码更新时运行thinrestart会给我的应用程序带来一些停机时间。我试图弄清楚如何优雅地重启正在运行的Thin实例,但找不到好的解决方案。有没有人能做到这一点? 最佳答案 #Restartjustthethinserverdescribedbythatconfigsudothin-C/etc/thin/mysite.ymlrestartNginx将继续运行并代理请求。如果您将Nginx设置为使用多个上游服务器,例如server{listen80;server

  5. ruby-on-rails - 启动 Rails 服务器时 ImageMagick 的警告 - 2

    最近,当我启动我的Rails服务器时,我收到了一长串警告。虽然它不影响我的应用程序,但我想知道如何解决这些警告。我的估计是imagemagick以某种方式被调用了两次?当我在警告前后检查我的git日志时。我想知道如何解决这个问题。-bcrypt-ruby(3.1.2)-better_errors(1.0.1)+bcrypt(3.1.7)+bcrypt-ruby(3.1.5)-bcrypt(>=3.1.3)+better_errors(1.1.0)bcrypt和imagemagick有关系吗?/Users/rbchris/.rbenv/versions/2.0.0-p247/lib/ru

  6. ruby-on-rails - s3_direct_upload 在生产服务器中不工作 - 2

    在Rails4.0.2中,我使用s3_direct_upload和aws-sdkgems直接为s3存储桶上传文件。在开发环境中它工作正常,但在生产环境中它会抛出如下错误,ActionView::Template::Error(noimplicitconversionofnilintoString)在View中,create_cv_url,:id=>"s3_uploader",:key=>"cv_uploads/{unique_id}/${filename}",:key_starts_with=>"cv_uploads/",:callback_param=>"cv[direct_uplo

  7. ruby - 用 Ruby 编写一个简单的网络服务器 - 2

    我想在Ruby中创建一个用于开发目的的极其简单的Web服务器(不,不想使用现成的解决方案)。代码如下:#!/usr/bin/rubyrequire'socket'server=TCPServer.new('127.0.0.1',8080)whileconnection=server.acceptheaders=[]length=0whileline=connection.getsheaders想法是从命令行运行这个脚本,提供另一个脚本,它将在其标准输入上获取请求,并在其标准输出上返回完整的响应。到目前为止一切顺利,但事实证明这真的很脆弱,因为它在第二个请求上中断并出现错误:/usr/b

  8. ruby-on-rails - 在 Rails 中调试生产服务器 - 2

    您如何在Rails中的实时服务器上进行有效调试,无论是在测试版/生产服务器上?我试过直接在服务器上修改文件,然后重启应用,但是修改好像没有生效,或者需要很长时间(缓存?)我也试过在本地做“脚本/服务器生产”,但是那很慢另一种选择是编码和部署,但效率很低。有人对他们如何有效地做到这一点有任何见解吗? 最佳答案 我会回答你的问题,即使我不同意这种热修补服务器代码的方式:)首先,你真的确定你已经重启了服务器吗?您可以通过跟踪日志文件来检查它。您更改的代码显示的View可能会被缓存。缓存页面位于tmp/cache文件夹下。您可以尝试手动删除

  9. HBase Region 简介和建议数量&大小 - 2

    Region是HBase数据管理的基本单位,region有一点像关系型数据的分区。region中存储这用户的真实数据,而为了管理这些数据,HBase使用了RegionSever来管理region。Region的结构hbaseregion的大小设置默认情况下,每个Table起初只有一个Region,随着数据的不断写入,Region会自动进行拆分。刚拆分时,两个子Region都位于当前的RegionServer,但处于负载均衡的考虑,HMaster有可能会将某个Region转移给其他的RegionServer。RegionSplit时机:当1个region中的某个Store下所有StoreFile

  10. ruby - 在 Ruby 中动态创建数组 - 2

    有没有办法在Ruby中动态创建数组?例如,假设我想遍历用户输入的书籍数组:books=gets.chomp用户输入:"TheGreatGatsby,CrimeandPunishment,Dracula,Fahrenheit451,PrideandPrejudice,SenseandSensibility,Slaughterhouse-Five,TheAdventuresofHuckleberryFinn"我把它变成一个数组:books_array=books.split(",")现在,对于用户输入的每一本书,我想用Ruby创建一个数组。伪代码来做到这一点:x=0books_array.

随机推荐