【rabbitMQ】MQ过期时间、死信队列实现-狂飙
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
一、队列参数
简单认识通过@Bean 创建消息队列的同时,设置对象的参数,完成的不同的效果
| 参数名 | 类型 | 作用 |
|---|---|---|
| x-message-tti(Time-To-Live) | int,毫秒 | 对消息设置预期的时间,过期将被丢弃 |
| x-max-length | int | 限制队列最大长度,个数,新增后删除最早的 |
| x-expires | int,毫秒 | 队列没有访问超时时,自动删除时间 |
| x-max-length-bytes | int | 限制队列最大容量 |
| x-dead-letter-exchange | string | 指定死信交换机 |
| x-dead-letter-routing-key | string | 死信路由,指定routingKey |
| x-max-priority | int | 队列优先级 |
| x-queue-mode | 默认"lazy" | 对列模式,默认lazy(将数据放入磁盘,消费时放入内存) |
| x-queue-master-locator | QueueBuilder.MasterLocator | 主队列选择策略,min-masters:选择承载最小绑定主机数量的节点,client-local:选择客户机声明队列连接到的节点,min-masters:随机选择一个节点 |
二、过期时间TTL
2.1、队列过期时间
设置队列的TTL后,每个被放入此队列的消息都有一个过期时间,时间一过,这条消息就会被移除了。
// 创建 direct 交换机
@Bean
public DirectExchange directExchange() {
return new DirectExchange("TTLExchange", true, false);
}
@Bean
public Queue TTLQueue() {
//使用map进行多值的设置,同时可以使用链式进行相同的操作
Map<String, Object> map = new HashMap<>();
// 队列设置消息过期时间 60 秒
map.put("x-message-ttl", 60 * 1000);
// ....
//参数一: 队列的名称
//参数二: 队列是都持久化
//参数三: 队列是否具有排他性(只有同一连接共享此队列,且连接断开时队列删除)排他队列详细说明
//参数四: 队列是否自动删除
//参数五: 封装队列的一些参数
return new Queue("TTLQueue", true, false, false, map);
}
// 绑定关系
@Bean
public Binding smsBinding() {
// 绑定关系。并且设置 'ttl' 的routingKey
return BindingBuilder.bind(TTLQueue()).to(directExchange()).with("ttl");
}
2.1、消息过期时间
设置消息的TTL后,每个消息在生产出来后都有一个过期时间,时间一过,这条消息就会被移除了。
@Test
public void TTLQueue() {
MessageProperties messageProperties = new MessageProperties();
//给消息设置过期时间
messageProperties.setExpiration("6000");
Message msg = new Message("TTL发送的内容".getBytes(), messageProperties );
rabbitTemplate.convertAndSend("TTLExchange", "ttl", msg);
}
需要注意的是:当同时指定了 queue 和 message 的 TTL 值,则两者中较小的那个才会起作用。
三、死信队列
3.1、生产者
DLX,全称为 Dead-Letter-Exchange , 可以称之为死信交换机,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是 DLX ,绑定 DLX 的队列就称之为死信队列。
消息变成死信,可能是由于以下的原因:
- 消息被消费者给拒绝
- 设置的过期时间的消息到期
- 队列达到最大长度
DLX 也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性。当这个队列中存在死信时,Rabbitmq 就会自动地将这个消息重新发布到设置的 DLX 上去,进而被路由到另一个队列,即死信队列。

创建死信和正常交换机、队列进行绑定
//创建死信交换,正常的交换
@Bean
public DirectExchange ttlExchange(){
return new DirectExchange("ttlExchange",true,false);
}
//创建队列,指定死信交换机
@Bean
public Queue ttlQueue2(){
Map<String,Object> map = new HashMap<>();
map.put("x-dead-letter-exchange","DLXExchange");//执行死信交换机
map.put("x-dead-letter-routing-key","DLX");//执行 死信路由
return new Queue("ttlQueue2", true, false, false, map);
}
//绑定死信交换机和队列值之间的关系,并指定routingKey:DLX
@Bean
public Binding bindingttlQueue(Queue ttlQueue2,DirectExchange ttlExchange){
return BindingBuilder.bind(ttlQueue2).to(ttlExchange).with("DLX");
}
//创建普通的交换机
@Bean
public DirectExchange DLXExchange(){
return new DirectExchange("DLXExchange",true,false);
}
//创建普通的队列
@Bean
public Queue DLXQueue() {
return new Queue("DLXQueue");
}
//绑定其关系
@Bean
public Binding bindingDLXQueue(Queue DLXQueue,DirectExchange DLXExchange){
return BindingBuilder.bind(DLXQueue).to(DLXExchange).with("DLX");
}
发送过期时间为6000ms的消息到正常的交换内,当过期时间到期后,如自动转到的死信交换内,并传递到的死信队列中,监听死信队列的消费者可以直接消费
@Test
public void DLXQueue2() {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("6000");
Message msg = new Message("测试死信队列".getBytes(), messageProperties);
//注意第二个参数,和死信队列routingKey中保持一致,才能有效实现其期待结果
rabbitTemplate.convertAndSend("ttlExchange", "DLX", msg);
}
3.2、消费者
此时正常监听正常队列的消费拿不到期望的值的,因为在消息生产的同时设置过期时间,而在规定的时间内没有进行消费掉,则到期后消息自动传递到死信交换内并发送到死信队列中,正在监听死信队列的消费进行消费
/*
* 监听死信队列 -->> ttlQueue
*
* */
@RabbitListener(queues = "DLXQueue")
public void demo2(String msg) throws InterruptedException {
System.out.println("拿到死信队列钟的消息" +msg );
}