草庐IT

使用java代码调用rabbitmq接口进行新增编辑mq用户、虚拟机vhost、动态创建交换机exchange、队列queue以及设置权限,绑定vhost与exchange等操作

苍绮 2023-04-17 原文

使用java代码操作rabbitmq时,首先需要一个有创建用户等权限的管理员账号,需要在rabbitmq的后台管理页面手动创建这个账号,系统推荐的这几个tag可以让账号有rabbitmq后台管理页面的访问权限

                                                    图一 

 

管理账号创建完成后就可以在代码中操作新增编辑mq账号及vhost等等了,点击rabbitmq后台管理页面左下角的HTTP API(见上文图一左下角)可以查看所有API接口

 

 以下代码中,rabbitmqUsername 为管理员账号的用户名,rabbitmqPassword为管理员账号的密码,rabbitmqUrl为rabbitmq服务器接口地址(例:http://127.0.0.1:15672/api/)

1. 新增用户或修改用户密码

1.1 API

 

 1.2 代码示例

 以下代码中,yourUsername为新增账号的用户名,yourPassword为新增账号的密码,guest为新增账号的tag,可以自定义,也可以使用rabbitmq提供的tag(见上文图一),该接口也可以用来修改已有账号的密码

  //add user
  String enc = new String( Base64.encodeBase64((rabbitmqUsername + ":" + rabbitmqPassword).getBytes() ) );
  HttpPut putCriaUsuario = new HttpPut( rabbitmqUrl+"users/"+yourUserName );
  // RabbitMQ requires a user with create permission, create it mannually first
  putCriaUsuario.addHeader( "Authorization", "Basic " + enc );
  putCriaUsuario.addHeader( "content-type", "application/json" );
  putCriaUsuario.setEntity( new StringEntity( "{\"password\":\""+yourPassword+"\",\"tags\":\"guest\"}" ) );
  CloseableHttpClient client = HttpClients.createDefault();
  client.execute( putCriaUsuario );

2. 新增vhost

 2.1 API

 

 2.2 代码示例

 以下代码中 yourVhost 为新增vhost的名称,guest为自定义的tag

 // 管理员账号用户名密码
 String enc = new String( Base64.encodeBase64((rabbitmqUsername + ":" + rabbitmqPassword).getBytes() ) );
 //add vhost
HttpPut putVhost = new HttpPut( rabbitmqUrl+"vhosts/"+yourVhost );
 putVhost.addHeader( "Authorization", "Basic " + enc ); 
 putVhost.addHeader( "content-type", "application/json" );
 putVhost.setEntity( new StringEntity( "{\"tags\":\"guest\"}" ) );
 CloseableHttpClient putVhostClient = HttpClients.createDefault();
 putVhostClient.execute( putVhost );

3. mq用户绑定vhost并设置权限

3.1 API

 

 

 3.2 代码示例

以下代码中,yourVhost 与yourUsername为绑定的mq用户与vhost的名称,代码示例中该用户对该vhost只开启了read权限,如果需要开启全部的configure(配置),write(写入),read(读取)权限,参数需要写成:

"{\"configure\":\".*\",\"write\":\".*\",\"read\":\".*\"}"
//管理员账号用户名密码 
String enc = new String( Base64.encodeBase64((rabbitmqUsername + ":" + rabbitmqPassword).getBytes() ) );
//add permissions and bind user&vhost
HttpPut putPermissions = new HttpPut( rabbitmqUrl+"permissions/"+yourVhost+"/"+yourUsername);
putPermissions.addHeader( "Authorization", "Basic " + enc );
putPermissions.addHeader( "content-type", "application/json" );
putPermissions.setEntity( new StringEntity( "{\"configure\":\"\",\"write\":\"\",\"read\":\".*\"}" ) );
CloseableHttpClient putPermissionsClient = HttpClients.createDefault();
putPermissionsClient.execute( putPermissions );

4. 动态创建exchange交换机和queue队列,并绑定指定vhost虚拟机

            //add exchange, queue and bind vhost
            RabbitModuleInfo rabbitModuleInfo = new RabbitModuleInfo();
            rabbitModuleInfo.setVhost(vhost);
            RabbitModuleInfo.Queue queue = new RabbitModuleInfo.Queue();
            Map<String, Object> arguments = new HashMap<>();
            //消息过期时间
            arguments.put("x-message-ttl",3600000);
            queue.setName(queueName);
            queue.setArguments(arguments);
            rabbitModuleInfo.setQueue(queue);

            RabbitModuleInfo.Exchange exchange = new RabbitModuleInfo.Exchange();
            exchange.setName(exchangeName);
            rabbitModuleInfo.setExchange(exchange);
            rabbitModuleInfo.setRoutingKey(queueName);

            rabbitModuleInitializer.declareRabbitModule(rabbitModuleInfo);
/**
 * RabbitMQ队列初始化器
 */
public class RabbitModuleInitializer{

    private AmqpAdmin amqpAdmin;
    private RealtimePushProducer realtimePushProducer;

    public RabbitModuleInitializer(AmqpAdmin amqpAdmin,RealtimePushProducer realtimePushProducer) {
        this.amqpAdmin = amqpAdmin;
        this.realtimePushProducer = realtimePushProducer;
    }


    /**
     * RabbitMQ 根据配置动态创建和绑定队列、交换机
     */
    public void declareRabbitModule(RabbitModuleInfo rabbitModuleInfo) {
        configParamValidate(rabbitModuleInfo);

        // 队列
        Queue queue = convertQueue(rabbitModuleInfo.getQueue());
        // 交换机
        Exchange exchange = convertExchange(rabbitModuleInfo.getExchange());
        // 绑定关系
        String routingKey = rabbitModuleInfo.getRoutingKey();
        String queueName = rabbitModuleInfo.getQueue().getName();
        String exchangeName = rabbitModuleInfo.getExchange().getName();
        Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null);

        realtimePushProducer.bindVhostExchangeQueue(rabbitModuleInfo.getVhost(),exchange,queue,binding);
    }

    /**
     * RabbitMQ动态配置参数校验
     *
     * @param rabbitModuleInfo
     */
    public void configParamValidate(RabbitModuleInfo rabbitModuleInfo) {

        String routingKey = rabbitModuleInfo.getRoutingKey();

        Assert.isTrue(StrUtil.isNotBlank(routingKey), "RoutingKey 未配置");

        Assert.isTrue(rabbitModuleInfo.getExchange() != null, "routingKey:{}未配置exchange", routingKey);
        Assert.isTrue(StrUtil.isNotBlank(rabbitModuleInfo.getExchange().getName()), "routingKey:{}未配置exchange的name属性", routingKey);

        Assert.isTrue(rabbitModuleInfo.getQueue() != null, "routingKey:{}未配置queue", routingKey);
        Assert.isTrue(StrUtil.isNotBlank(rabbitModuleInfo.getQueue().getName()), "routingKey:{}未配置exchange的name属性", routingKey);

    }

    /**
     * 转换生成RabbitMQ队列
     *
     * @param queue
     * @return
     */
    public Queue convertQueue(RabbitModuleInfo.Queue queue) {
        Map<String, Object> arguments = queue.getArguments();

        // 转换ttl的类型为long
        if (arguments != null && arguments.containsKey("x-message-ttl")) {
            arguments.put("x-message-ttl", Convert.toLong(arguments.get("x-message-ttl")));
        }

        // 是否需要绑定死信队列
        String deadLetterExchange = queue.getDeadLetterExchange();
        String deadLetterRoutingKey = queue.getDeadLetterRoutingKey();
        if (StrUtil.isNotBlank(deadLetterExchange) && StrUtil.isNotBlank(deadLetterRoutingKey)) {

            if (arguments == null) {
                arguments = new HashMap<>(4);
            }
            arguments.put("x-dead-letter-exchange", deadLetterExchange);
            arguments.put("x-dead-letter-routing-key", deadLetterRoutingKey);

        }

        return new Queue(queue.getName(), queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), arguments);
    }


    /**
     * 转换生成RabbitMQ交换机
     *
     * @param exchangeInfo
     * @return
     */
    public Exchange convertExchange(RabbitModuleInfo.Exchange exchangeInfo) {

        AbstractExchange exchange = null;

        RabbitExchangeTypeEnum exchangeType = exchangeInfo.getType();

        String exchangeName = exchangeInfo.getName();
        boolean isDurable = exchangeInfo.isDurable();
        boolean isAutoDelete = exchangeInfo.isAutoDelete();

        Map<String, Object> arguments = exchangeInfo.getArguments();

        switch (exchangeType) {
            case DIRECT:// 直连交换机
                exchange = new DirectExchange(exchangeName, isDurable, isAutoDelete, arguments);
                break;
            case TOPIC: // 主题交换机
                exchange = new TopicExchange(exchangeName, isDurable, isAutoDelete, arguments);
                break;
            case FANOUT: //扇形交换机
                exchange = new FanoutExchange(exchangeName, isDurable, isAutoDelete, arguments);
                break;
            case HEADERS: // 头交换机
                exchange = new HeadersExchange(exchangeName, isDurable, isAutoDelete, arguments);
                break;
        }
        return exchange;
    }
}
    /**
     * 根据配置动态创建和绑定队列、交换机
     * @param vhost
     * @param exchange
     * @param queue
     * @param binding
     */
    @Override
    public void bindVhostExchangeQueue(String vhost, Exchange exchange, Queue queue, Binding binding) {
        ConnectionFactory factory = queueConfig.pushConnectionFactory(rabbitProperties,vhost);
        RabbitAdmin rabbitAdmin = new RabbitAdmin(factory);
        log.debug("bind vhost={},exchange={},queue={}",vhost,exchange.getName(),queue.getName());
        // 创建队列
        rabbitAdmin.declareQueue(queue);
        // 创建交换机
        rabbitAdmin.declareExchange(exchange);
        // 队列 绑定 交换机
        rabbitAdmin.declareBinding(binding);
    }
    /**
     * 生成指定vhost的ConnectionFactory
     * @param rabbitProperties
     * @param vhost
     * @return
     */
    public ConnectionFactory pushConnectionFactory(RabbitProperties rabbitProperties, String vhost) {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setHost(rabbitProperties.getHost());
        cachingConnectionFactory.setPort(rabbitProperties.getPort());
        cachingConnectionFactory.setUsername(rabbitProperties.getUsername());
        cachingConnectionFactory.setPassword(rabbitProperties.getPassword());
        cachingConnectionFactory.setVirtualHost(vhost);
        return cachingConnectionFactory;
    }

 

有关使用java代码调用rabbitmq接口进行新增编辑mq用户、虚拟机vhost、动态创建交换机exchange、队列queue以及设置权限,绑定vhost与exchange等操作的更多相关文章

  1. ruby - 使用 RubyZip 生成 ZIP 文件时设置压缩级别 - 2

    我有一个Ruby程序,它使用rubyzip压缩XML文件的目录树。gem。我的问题是文件开始变得很重,我想提高压缩级别,因为压缩时间不是问题。我在rubyzipdocumentation中找不到一种为创建的ZIP文件指定压缩级别的方法。有人知道如何更改此设置吗?是否有另一个允许指定压缩级别的Ruby库? 最佳答案 这是我通过查看ruby​​zip内部创建的代码。level=Zlib::BEST_COMPRESSIONZip::ZipOutputStream.open(zip_file)do|zip|Dir.glob("**/*")d

  2. ruby-on-rails - 使用 Ruby on Rails 进行自动化测试 - 最佳实践 - 2

    很好奇,就使用ruby​​onrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提

  3. ruby-openid:执行发现时未设置@socket - 2

    我在使用omniauth/openid时遇到了一些麻烦。在尝试进行身份验证时,我在日志中发现了这一点:OpenID::FetchingError:Errorfetchinghttps://www.google.com/accounts/o8/.well-known/host-meta?hd=profiles.google.com%2Fmy_username:undefinedmethod`io'fornil:NilClass重要的是undefinedmethodio'fornil:NilClass来自openid/fetchers.rb,在下面的代码片段中:moduleNetclass

  4. ruby-on-rails - 按天对 Mongoid 对象进行分组 - 2

    在控制台中反复尝试之后,我想到了这种方法,可以按发生日期对类似activerecord的(Mongoid)对象进行分组。我不确定这是完成此任务的最佳方法,但它确实有效。有没有人有更好的建议,或者这是一个很好的方法?#eventsisanarrayofactiverecord-likeobjectsthatincludeatimeattributeevents.map{|event|#converteventsarrayintoanarrayofhasheswiththedayofthemonthandtheevent{:number=>event.time.day,:event=>ev

  5. ruby - 使用 C 扩展开发 ruby​​gem 时,如何使用 Rspec 在本地进行测试? - 2

    我正在编写一个包含C扩展的gem。通常当我写一个gem时,我会遵循TDD的过程,我会写一个失败的规范,然后处理代码直到它通过,等等......在“ext/mygem/mygem.c”中我的C扩展和在gemspec的“扩展”中配置的有效extconf.rb,如何运行我的规范并仍然加载我的C扩展?当我更改C代码时,我需要采取哪些步骤来重新编译代码?这可能是个愚蠢的问题,但是从我的gem的开发源代码树中输入“bundleinstall”不会构建任何native扩展。当我手动运行rubyext/mygem/extconf.rb时,我确实得到了一个Makefile(在整个项目的根目录中),然后当

  6. ruby-on-rails - 如何使用 instance_variable_set 正确设置实例变量? - 2

    我正在查看instance_variable_set的文档并看到给出的示例代码是这样做的:obj.instance_variable_set(:@instnc_var,"valuefortheinstancevariable")然后允许您在类的任何实例方法中以@instnc_var的形式访问该变量。我想知道为什么在@instnc_var之前需要一个冒号:。冒号有什么作用? 最佳答案 我的第一直觉是告诉你不要使用instance_variable_set除非你真的知道你用它做什么。它本质上是一种元编程工具或绕过实例变量可见性的黑客攻击

  7. ruby-on-rails - 使用 rails 4 设计而不更新用户 - 2

    我将应用程序升级到Rails4,一切正常。我可以登录并转到我的编辑页面。也更新了观点。使用标准View时,用户会更新。但是当我添加例如字段:name时,它​​不会在表单中更新。使用devise3.1.1和gem'protected_attributes'我需要在设备或数据库上运行某种更新命令吗?我也搜索过这个地方,找到了许多不同的解决方案,但没有一个会更新我的用户字段。我没有添加任何自定义字段。 最佳答案 如果您想允许额外的参数,您可以在ApplicationController中使用beforefilter,因为Rails4将参数

  8. ruby-on-rails - date_field_tag,如何设置默认日期? [ rails 上的 ruby ] - 2

    我想设置一个默认日期,例如实际日期,我该如何设置?还有如何在组合框中设置默认值顺便问一下,date_field_tag和date_field之间有什么区别? 最佳答案 试试这个:将默认日期作为第二个参数传递。youcorrectlysetthedefaultvalueofcomboboxasshowninyourquestion. 关于ruby-on-rails-date_field_tag,如何设置默认日期?[rails上的ruby],我们在StackOverflow上找到一个类似的问

  9. ruby - 如何进行排列以有效地定制输出 - 2

    这是一道面试题,我没有答对,但还是很好奇怎么解。你有N个人的大家庭,分别是1,2,3,...,N岁。你想给你的大家庭拍张照片。所有的家庭成员都排成一排。“我是家里的friend,建议家庭成员安排如下:”1岁的家庭成员坐在这一排的最左边。每两个坐在一起的家庭成员的年龄相差不得超过2岁。输入:整数N,1≤N≤55。输出:摄影师可以拍摄的照片数量。示例->输入:4,输出:4符合条件的数组:[1,2,3,4][1,2,4,3][1,3,2,4][1,3,4,2]另一个例子:输入:5输出:6符合条件的数组:[1,2,3,4,5][1,2,3,5,4][1,2,4,3,5][1,2,4,5,3][

  10. ruby - 即使失败也继续进行多主机测试 - 2

    我已经构建了一些serverspec代码来在多个主机上运行一组测试。问题是当任何测试失败时,测试会在当前主机停止。即使测试失败,我也希望它继续在所有主机上运行。Rakefile:namespace:specdotask:all=>hosts.map{|h|'spec:'+h.split('.')[0]}hosts.eachdo|host|begindesc"Runserverspecto#{host}"RSpec::Core::RakeTask.new(host)do|t|ENV['TARGET_HOST']=hostt.pattern="spec/cfengine3/*_spec.r

随机推荐