rabbitmq消费端限流:一次只能消费一条消息

1. 什么是消费端的限流?

场景:在订单高峰期,rabbitmq上已经堆积了很多消息等待消费,如果没有任何限流措施,贸然启动一个消费者时,如此多的消息瞬间推送给消费者,消费者可能因无法处理这么多的消息而承受巨大压力,甚至崩溃!

2. 解决方案

rabbitmq 提供了basicQos方法实现了限流,也就是在关闭了消费端的自动ack的前提 下,我们可以设置阈值(出队)的消息数。 没有手动确认,那么就不会推送新的消息过来!可以有效防止消费者压力过大而崩溃。

 /**
  * 限流设置:  prefetchSize:每条消息大小的设置,0是无限制
  * prefetchCount:标识每次推送多少条消息 一般是一条
  * global:false标识channel级别的  true:标识消费者级别的
  */
 channel.basicQos(0,1,false);

3. 代码示例

生产者:

public class Producer {
    public static void main(String[] args) throws Exception{
        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setPort(5672);
        factory.setHost("192.168.200.130");
        factory.setUsername("mqs");
        factory.setPassword("mqs123");
        // 2、创建连接
        Connection connection = factory.newConnection();
        // 3、获取通道
        Channel channel = connection.createChannel();
        // 4、声明交换机和路由
        String exchangeName = "limit_exchange";
        String routingKey = "limit.key";
        //消息体
        String msg = "send message test limit mandatory ";
 
        // 5、生产者发送消息
        for (int i = 0; i < 6; i++){
            channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes());
        }
    }
}

消费者:

    public static void main(String[] args)throws Exception {
        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setPort(5672);
        factory.setHost("192.168.200.130");
        factory.setUsername("mqs");
        factory.setPassword("mqs123");
        // 2、创建连接
        Connection connection = factory.newConnection();
        // 3、获取通道
        Channel channel = connection.createChannel();
        // 4、声明
        String exchangeName = "limit_exchange";
        String routingKey = "limit.key";
        String exchangeType = "direct";
        String queueName = "limit_queue";
        // 5、声明一个交换器
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, null);
        // 6、声明一个队列
        channel.queueDeclare(queueName, true, false, false, null);
        // 7、绑定队列到交换器
        channel.queueBind(queueName, exchangeName, routingKey);
       
        /**
         * 限流设置:  
         * prefetchSize:每条消息大小的设置,0是无限制
         * prefetchCount:标识每次推送多少条消息 一般是一条
         * global:false标识channel级别的  true:标识消费者级别的
         */
        channel.basicQos(0, 1, false);
       
        // 8、消费者,要想做限流必须将自动ack设置为false,代表手动ack,一条条的消费
        // MyConsumer  自定义消费者
        channel.basicConsume(queueName, false, new MyConsumer(channel));
 
    }
}

限流必须将自动ack设置为false,代表手动ack,不然限流是不生效的。
channel.basicConsume(queueName, false, new MyConsumer(channel));