【rabbitMQ】MQ过期时间、死信队列实现-狂飙

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档


一、队列参数

简单认识通过@Bean 创建消息队列的同时,设置对象的参数,完成的不同的效果

参数名类型作用
x-message-tti(Time-To-Live)int,毫秒对消息设置预期的时间,过期将被丢弃
x-max-lengthint限制队列最大长度,个数,新增后删除最早的
x-expiresint,毫秒队列没有访问超时时,自动删除时间
x-max-length-bytesint限制队列最大容量
x-dead-letter-exchangestring指定死信交换机
x-dead-letter-routing-keystring死信路由,指定routingKey
x-max-priorityint队列优先级
x-queue-mode默认"lazy"对列模式,默认lazy(将数据放入磁盘,消费时放入内存)
x-queue-master-locatorQueueBuilder.MasterLocator主队列选择策略,min-masters:选择承载最小绑定主机数量的节点,client-local:选择客户机声明队列连接到的节点,min-masters:随机选择一个节点

二、过期时间TTL

2.1、队列过期时间

设置队列的TTL后,每个被放入此队列的消息都有一个过期时间,时间一过,这条消息就会被移除了。

  // 创建 direct 交换机
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("TTLExchange", true, false);
    }
    
    @Bean
    public Queue TTLQueue() {
    	//使用map进行多值的设置,同时可以使用链式进行相同的操作
        Map<String, Object> map = new HashMap<>();
        // 队列设置消息过期时间 60 秒
        map.put("x-message-ttl", 60 * 1000);
        // ....
        //参数一: 队列的名称
        //参数二: 队列是都持久化
        //参数三: 队列是否具有排他性(只有同一连接共享此队列,且连接断开时队列删除)排他队列详细说明
        //参数四: 队列是否自动删除
        //参数五: 封装队列的一些参数
        return new Queue("TTLQueue", true, false, false, map);
    }
       // 绑定关系
    @Bean
    public Binding smsBinding() {
        // 绑定关系。并且设置 'ttl' 的routingKey
        return BindingBuilder.bind(TTLQueue()).to(directExchange()).with("ttl");
    }

2.1、消息过期时间

设置消息的TTL后,每个消息在生产出来后都有一个过期时间,时间一过,这条消息就会被移除了。

    @Test
    public void TTLQueue() {
        MessageProperties messageProperties = new MessageProperties();
        //给消息设置过期时间
        messageProperties.setExpiration("6000");
 
 		Message msg = new Message("TTL发送的内容".getBytes(), messageProperties );
 		
        rabbitTemplate.convertAndSend("TTLExchange", "ttl", msg);
    }

需要注意的是:当同时指定了 queue 和 message 的 TTL 值,则两者中较小的那个才会起作用。

三、死信队列

3.1、生产者

DLX,全称为 Dead-Letter-Exchange , 可以称之为死信交换机,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是 DLX ,绑定 DLX 的队列就称之为死信队列。

消息变成死信,可能是由于以下的原因:

  • 消息被消费者给拒绝
  • 设置的过期时间的消息到期
  • 队列达到最大长度

DLX 也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性。当这个队列中存在死信时,Rabbitmq 就会自动地将这个消息重新发布到设置的 DLX 上去,进而被路由到另一个队列,即死信队列。
在这里插入图片描述
创建死信和正常交换机、队列进行绑定

    //创建死信交换,正常的交换
    @Bean
    public DirectExchange ttlExchange(){
        return new DirectExchange("ttlExchange",true,false);
    }
    //创建队列,指定死信交换机
    @Bean
    public Queue ttlQueue2(){
        Map<String,Object> map = new HashMap<>();
        map.put("x-dead-letter-exchange","DLXExchange");//执行死信交换机
        map.put("x-dead-letter-routing-key","DLX");//执行 死信路由
        return new Queue("ttlQueue2", true, false, false, map);
    }
    //绑定死信交换机和队列值之间的关系,并指定routingKey:DLX
    @Bean
    public Binding bindingttlQueue(Queue ttlQueue2,DirectExchange ttlExchange){
        return BindingBuilder.bind(ttlQueue2).to(ttlExchange).with("DLX");
    }
    
    //创建普通的交换机
    @Bean
    public DirectExchange DLXExchange(){
        return new DirectExchange("DLXExchange",true,false);
    }
    //创建普通的队列
    @Bean
    public Queue DLXQueue() {
        return new Queue("DLXQueue");
    }
    //绑定其关系
    @Bean
    public Binding bindingDLXQueue(Queue DLXQueue,DirectExchange DLXExchange){
        return BindingBuilder.bind(DLXQueue).to(DLXExchange).with("DLX");
    }

发送过期时间为6000ms的消息到正常的交换内,当过期时间到期后,如自动转到的死信交换内,并传递到的死信队列中,监听死信队列的消费者可以直接消费

    @Test
    public void DLXQueue2() {
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setExpiration("6000");
        
        Message msg = new Message("测试死信队列".getBytes(), messageProperties);
        //注意第二个参数,和死信队列routingKey中保持一致,才能有效实现其期待结果
        rabbitTemplate.convertAndSend("ttlExchange", "DLX", msg);
    }

3.2、消费者

此时正常监听正常队列的消费拿不到期望的值的,因为在消息生产的同时设置过期时间,而在规定的时间内没有进行消费掉,则到期后消息自动传递到死信交换内并发送到死信队列中,正在监听死信队列的消费进行消费


    /*
    * 监听死信队列  -->>  ttlQueue
    *
    * */
    @RabbitListener(queues = "DLXQueue")
    public void demo2(String msg) throws InterruptedException {
        System.out.println("拿到死信队列钟的消息" +msg );
    }