文章目录
RabbitListener是Springboot RabbitMq中经常用到的一个注解,将被RabbitListener注解的类和方法封装成MessageListener注入MessageListenerContainer





通过自动配置类RabbitAutoConfiguration将EnableRabbit引入,而EnableRabbit又通过import注解引入了配置类RabbitBootstrapConfiguration
public class RabbitBootstrapConfiguration implements ImportBeanDefinitionRegistrar {
@Override
public void registerBeanDefinitions(@Nullable AnnotationMetadata importingClassMetadata,
BeanDefinitionRegistry registry) {
if (!registry.containsBeanDefinition(
RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)) {
registry.registerBeanDefinition(RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME,
new RootBeanDefinition(RabbitListenerAnnotationBeanPostProcessor.class));
}
if (!registry.containsBeanDefinition(RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)) {
registry.registerBeanDefinition(RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
new RootBeanDefinition(RabbitListenerEndpointRegistry.class));
}
}
}
容器Ioc中注入RabbitListenerAnnotationBeanPostProcessor和RabbitListenerEndpointRegistry

RabbitListenerAnnotationBeanPostProcessor类实现了BeanPostProcessor, Ordered, BeanFactoryAware, BeanClassLoaderAware, EnvironmentAware, SmartInitializingSingleton接口,Ordered表示处理顺序,BeanFactoryAware, BeanClassLoaderAware, EnvironmentAware主要用于获取对应的BeanFactory,BeanClassLoader, Environment属性,我们主要关注从SmartInitializingSingleton和BeanPostProcessor继承的方法

public void afterSingletonsInstantiated() {
this.registrar.setBeanFactory(this.beanFactory);
if (this.beanFactory instanceof ListableBeanFactory) {
Map<String, RabbitListenerConfigurer> instances =
((ListableBeanFactory) this.beanFactory).getBeansOfType(RabbitListenerConfigurer.class);
for (RabbitListenerConfigurer configurer : instances.values()) {
configurer.configureRabbitListeners(this.registrar);
}
}
if (this.registrar.getEndpointRegistry() == null) {
if (this.endpointRegistry == null) {
Assert.state(this.beanFactory != null,
"BeanFactory must be set to find endpoint registry by bean name");
this.endpointRegistry = this.beanFactory.getBean(
RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
RabbitListenerEndpointRegistry.class);
}
this.registrar.setEndpointRegistry(this.endpointRegistry);
}
if (this.containerFactoryBeanName != null) {
this.registrar.setContainerFactoryBeanName(this.containerFactoryBeanName);
}
// Set the custom handler method factory once resolved by the configurer
MessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();
if (handlerMethodFactory != null) {
this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(handlerMethodFactory);
}
// Actually register all listeners
this.registrar.afterPropertiesSet();
// clear the cache - prototype beans will be re-cached.
this.typeCache.clear();
}
初始化工作,主要是基于自定义配置RabbitListenerConfigurer进行RabbitListenerAnnotationBeanPostProcessor(尤其是registrar元素)的初始化



@Override
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
Class<?> targetClass = AopUtils.getTargetClass(bean);
final TypeMetadata metadata = this.typeCache.computeIfAbsent(targetClass, this::buildMetadata);
for (ListenerMethod lm : metadata.listenerMethods) {
for (RabbitListener rabbitListener : lm.annotations) {
processAmqpListener(rabbitListener, lm.method, bean, beanName);
}
}
if (metadata.handlerMethods.length > 0) {
processMultiMethodListeners(metadata.classAnnotations, metadata.handlerMethods, bean, beanName);
}
return bean;
}
对RabbitListener注解查找和解析
RabbitListenerAnnotationBeanPostProcessor#buildMetadata
private TypeMetadata buildMetadata(Class<?> targetClass) {
Collection<RabbitListener> classLevelListeners = findListenerAnnotations(targetClass);
final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
final List<ListenerMethod> methods = new ArrayList<>();
final List<Method> multiMethods = new ArrayList<>();
ReflectionUtils.doWithMethods(targetClass, method -> {
Collection<RabbitListener> listenerAnnotations = findListenerAnnotations(method);
if (listenerAnnotations.size() > 0) {
methods.add(new ListenerMethod(method,
listenerAnnotations.toArray(new RabbitListener[listenerAnnotations.size()])));
}
if (hasClassLevelListeners) {
RabbitHandler rabbitHandler = AnnotationUtils.findAnnotation(method, RabbitHandler.class);
if (rabbitHandler != null) {
multiMethods.add(method);
}
}
}, ReflectionUtils.USER_DECLARED_METHODS);
if (methods.isEmpty() && multiMethods.isEmpty()) {
return TypeMetadata.EMPTY;
}
return new TypeMetadata(
methods.toArray(new ListenerMethod[methods.size()]),
multiMethods.toArray(new Method[multiMethods.size()]),
classLevelListeners.toArray(new RabbitListener[classLevelListeners.size()]));
}
RabbitListenerAnnotationBeanPostProcessor就是针对每一个bean类进行解析,针对类上的RabbitListener注解、方法上的RabbitHandle注解和方法上的RabbitListener注解解析后封装到TypeMetadata类中
通过RabbitListenerAnotationBeanPostProcessor#buildMetadata查找并封装成TypeMetadata分别交给processAmqpListener和processMultiMethodListeners进行解析
protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {
Method methodToUse = checkProxy(method, bean);
MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
endpoint.setMethod(methodToUse);
endpoint.setBeanFactory(this.beanFactory);
endpoint.setReturnExceptions(resolveExpressionAsBoolean(rabbitListener.returnExceptions()));
String errorHandlerBeanName = resolveExpressionAsString(rabbitListener.errorHandler(), "errorHandler");
if (StringUtils.hasText(errorHandlerBeanName)) {
endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, RabbitListenerErrorHandler.class));
}
processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
}
private void processMultiMethodListeners(RabbitListener[] classLevelListeners, Method[] multiMethods,
Object bean, String beanName) {
List<Method> checkedMethods = new ArrayList<Method>();
for (Method method : multiMethods) {
checkedMethods.add(checkProxy(method, bean));
}
for (RabbitListener classLevelListener : classLevelListeners) {
MultiMethodRabbitListenerEndpoint endpoint = new MultiMethodRabbitListenerEndpoint(checkedMethods, bean);
endpoint.setBeanFactory(this.beanFactory);
processListener(endpoint, classLevelListener, bean, bean.getClass(), beanName);
}
}
RabbitListenerAnnotationBeanPostProcessor#processAmqpListener针对被RabbitListener注解的方法进行解析,
RabbitListenerAnnotationBeanPostProcessot#processMultiMethodListeners针对RabbitListener注解的类中被RabbitHandle注解的方法进行解析
新建MultiMethodRabbitListenerEndpoint对象,针对两种方式的差异进行部分属性的初始化后交给RabbitListenerAnnotationBeanPostProcessor进行后续处理processListener
protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,
Object adminTarget, String beanName) {
endpoint.setBean(bean);
endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
endpoint.setId(getEndpointId(rabbitListener));
endpoint.setQueueNames(resolveQueues(rabbitListener));
endpoint.setConcurrency(resolveExpressionAsStringOrInteger(rabbitListener.concurrency(), "concurrency"));
String group = rabbitListener.group();
if (StringUtils.hasText(group)) {
Object resolvedGroup = resolveExpression(group);
if (resolvedGroup instanceof String) {
endpoint.setGroup((String) resolvedGroup);
}
}
String autoStartup = rabbitListener.autoStartup();
if (StringUtils.hasText(autoStartup)) {
endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup));
}
endpoint.setExclusive(rabbitListener.exclusive());
String priority = resolve(rabbitListener.priority());
if (StringUtils.hasText(priority)) {
try {
endpoint.setPriority(Integer.valueOf(priority));
}
catch (NumberFormatException ex) {
throw new BeanInitializationException("Invalid priority value for " +
rabbitListener + " (must be an integer)", ex);
}
}
String rabbitAdmin = resolve(rabbitListener.admin());
if (StringUtils.hasText(rabbitAdmin)) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to resolve RabbitAdmin by bean name");
try {
endpoint.setAdmin(this.beanFactory.getBean(rabbitAdmin, RabbitAdmin.class));
}
catch (NoSuchBeanDefinitionException ex) {
throw new BeanInitializationException("Could not register rabbit listener endpoint on [" +
adminTarget + "], no " + RabbitAdmin.class.getSimpleName() + " with id '" +
rabbitAdmin + "' was found in the application context", ex);
}
}
RabbitListenerContainerFactory<?> factory = null;
String containerFactoryBeanName = resolve(rabbitListener.containerFactory());
if (StringUtils.hasText(containerFactoryBeanName)) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
try {
factory = this.beanFactory.getBean(containerFactoryBeanName, RabbitListenerContainerFactory.class);
}
catch (NoSuchBeanDefinitionException ex) {
throw new BeanInitializationException("Could not register rabbit listener endpoint on [" +
adminTarget + "] for bean " + beanName + ", no " + RabbitListenerContainerFactory.class.getSimpleName() + " with id '" +
containerFactoryBeanName + "' was found in the application context", ex);
}
}
this.registrar.registerEndpoint(endpoint, factory);
}
根据RabbitListener注解的属性进行MethodRabbitListenerEndpoint 的属性设置和校验,最后通过RabbitListenerEndpointRegistrar#registerEndpoint方法将MethodRabbitListenerEndpoint 注入容器RabbitListenerContainerFactory

@Override
public void afterPropertiesSet() {
registerAllEndpoints();
}
protected void registerAllEndpoints() {
synchronized (this.endpointDescriptors) {
for (AmqpListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
this.endpointRegistry.registerListenerContainer(
descriptor.endpoint, resolveContainerFactory(descriptor));
}
this.startImmediately = true; // trigger immediate startup
}
}
RabbitListenerEndpointRegistrar#registerEndpoint
public void registerEndpoint(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory) {
Assert.notNull(endpoint, "Endpoint must be set");
Assert.hasText(endpoint.getId(), "Endpoint id must be set");
// Factory may be null, we defer the resolution right before actually creating the container
AmqpListenerEndpointDescriptor descriptor = new AmqpListenerEndpointDescriptor(endpoint, factory);
synchronized (this.endpointDescriptors) {
if (this.startImmediately) { // Register and start immediately
this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
resolveContainerFactory(descriptor), true);
}
else {
this.endpointDescriptors.add(descriptor);
}
}
}
RabbitListenerEndpointRegistry#registerListenerContainer进行注册监听器的容器
RabbitListenerEndpointRegistry#registerListenerContainer
public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory,
boolean startImmediately) {
Assert.notNull(endpoint, "Endpoint must not be null");
Assert.notNull(factory, "Factory must not be null");
String id = endpoint.getId();
Assert.hasText(id, "Endpoint id must not be empty");
synchronized (this.listenerContainers) {
Assert.state(!this.listenerContainers.containsKey(id),
"Another endpoint is already registered with id '" + id + "'");
MessageListenerContainer container = createListenerContainer(endpoint, factory);
this.listenerContainers.put(id, container);
if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
List<MessageListenerContainer> containerGroup;
if (this.applicationContext.containsBean(endpoint.getGroup())) {
containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
}
else {
containerGroup = new ArrayList<MessageListenerContainer>();
this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
}
containerGroup.add(container);
}
if (startImmediately) {
startIfNecessary(container);
}
}
}
基于RabbitListenerEndpoint根据监听器的容器工厂类生成一个监听器的容器,并且整个注册过程是同步的,同时最多只能有一个endpoint在注册
RabbitListenerEndpointRegistry#start
@Override
public void start() {
for (MessageListenerContainer listenerContainer : getListenerContainers()) {
startIfNecessary(listenerContainer);
}
}
private void startIfNecessary(MessageListenerContainer listenerContainer) {
if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
listenerContainer.start();
}
}
调用MessageListenerContainer#start方法, 监听器的启动。
我有一个字符串input="maybe(thisis|thatwas)some((nice|ugly)(day|night)|(strange(weather|time)))"Ruby中解析该字符串的最佳方法是什么?我的意思是脚本应该能够像这样构建句子:maybethisissomeuglynightmaybethatwassomenicenightmaybethiswassomestrangetime等等,你明白了......我应该一个字符一个字符地读取字符串并构建一个带有堆栈的状态机来存储括号值以供以后计算,还是有更好的方法?也许为此目的准备了一个开箱即用的库?
我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i
我正在使用ruby1.9解析以下带有MacRoman字符的csv文件#encoding:ISO-8859-1#csv_parse.csvName,main-dialogue"Marceu","Giveittohimóhe,hiswife."我做了以下解析。require'csv'input_string=File.read("../csv_parse.rb").force_encoding("ISO-8859-1").encode("UTF-8")#=>"Name,main-dialogue\r\n\"Marceu\",\"Giveittohim\x97he,hiswife.\"\
简而言之错误:NOTE:Gem::SourceIndex#add_specisdeprecated,useSpecification.add_spec.Itwillberemovedonorafter2011-11-01.Gem::SourceIndex#add_speccalledfrom/opt/local/lib/ruby/site_ruby/1.8/rubygems/source_index.rb:91./opt/local/lib/ruby/gems/1.8/gems/rails-2.3.8/lib/rails/gem_dependency.rb:275:in`==':und
一、引擎主循环UE版本:4.27一、引擎主循环的位置:Launch.cpp:GuardedMain函数二、、GuardedMain函数执行逻辑:1、EnginePreInit:加载大多数模块int32ErrorLevel=EnginePreInit(CmdLine);PreInit模块加载顺序:模块加载过程:(1)注册模块中定义的UObject,同时为每个类构造一个类默认对象(CDO,记录类的默认状态,作为模板用于子类实例创建)(2)调用模块的StartUpModule方法2、FEngineLoop::Init()1、检查Engine的配置文件找出使用了哪一个GameEngine类(UGame
我正在使用ruby2.1.0我有一个json文件。例如:test.json{"item":[{"apple":1},{"banana":2}]}用YAML.load加载这个文件安全吗?YAML.load(File.read('test.json'))我正在尝试加载一个json或yaml格式的文件。 最佳答案 YAML可以加载JSONYAML.load('{"something":"test","other":4}')=>{"something"=>"test","other"=>4}JSON将无法加载YAML。JSON.load("
我想用Nokogiri解析HTML页面。页面的一部分有一个表,它没有使用任何特定的ID。是否可以提取如下内容:Today,3,455,34Today,1,1300,3664Today,10,100000,3444,Yesterday,3454,5656,3Yesterday,3545,1000,10Yesterday,3411,36223,15来自这个HTML:TodayYesterdayQntySizeLengthLengthSizeQnty345534345456563113003664354510001010100000344434113622315
我使用的第一个解析器生成器是Parse::RecDescent,它的指南/教程很棒,但它最有用的功能是它的调试工具,特别是tracing功能(通过将$RD_TRACE设置为1来激活)。我正在寻找可以帮助您调试其规则的解析器生成器。问题是,它必须用python或ruby编写,并且具有详细模式/跟踪模式或非常有用的调试技术。有人知道这样的解析器生成器吗?编辑:当我说调试时,我并不是指调试python或ruby。我指的是调试解析器生成器,查看它在每一步都在做什么,查看它正在读取的每个字符,它试图匹配的规则。希望你明白这一点。赏金编辑:要赢得赏金,请展示一个解析器生成器框架,并说明它的
我有这样的HTML代码:Label1Value1Label2Value2...我的代码不起作用。doc.css("first").eachdo|item|label=item.css("dt")value=item.css("dd")end显示所有首先标记,然后标记标签,我需要“标签:值” 最佳答案 首先,您的HTML应该有和中的元素:Label1Value1Label2Value2...但这不会改变您解析它的方式。你想找到s并遍历它们,然后在每个你可以使用next_element得到;像这样:doc=Nokogiri::HTML(
我想禁用HTTP参数的自动XML解析。但我发现命令仅适用于Rails2.x,它们都不适用于3.0:config.action_controller.param_parsers.deleteMime::XML(application.rb)ActionController::Base.param_parsers.deleteMime::XMLRails3.0中的等价物是什么? 最佳答案 根据CVE-2013-0156的最新安全公告你可以将它用于Rails3.0。3.1和3.2ActionDispatch::ParamsParser::