SpringBoot+RabbitMQ 您所在的位置:网站首页 rabbitmq消息确认机制手动和自动的区别 SpringBoot+RabbitMQ

SpringBoot+RabbitMQ

2023-07-21 23:57| 来源: 网络整理| 查看: 265

一、前言

使用RabbitMQ,会有知晓消息是否成功、如果消费失败重试的需求,这篇文章主要讲的是消息确认机制(ACK)和消息重试机制。

二、消息确认机制

RabbitMQ的消费者确认机制用来确认消费者是否成功消费了队列中的消息。

消息确认分为几种情况:

AcknowledgeMode.NONE:不确认,不发送任何ack确认;只要消息发送完成会立即在队列移除,不会重发。 AcknowledgeMode.AUTO:自动确认,消费消息后会自动发送ack确认;在消费者发生异常时,消息会不断的被重发,直到处理成功。不会丢失消息,即便服务挂掉,没有处理完成的消息会重回队列,但是异常会让消息不断重试。 AcknowledgeMode.MANUAL:手动确认,手动发送ack确认;只有服务端收channel.basicAck(message.getMessageProperties().getDeliveryTag(),false)的确认信号,消息才会移除,确认成功后不管后面是异常还是断开服务消息已经被移除了。如果在确认之前抛出异常,消息不会移除,也不会重试,监听程序会因为异常停止不再处理消息 ,如果此时断开服务,消息重新回到队列。

具体实现:

1、开启消息手动确认,修改application配置文件

1spring.rabbitmq.listener.simple.acknowledge-mode=manual

2、消费者

123456789101112131415161718    @RabbitListener(queues = "meat_queue")     @RabbitHandler     public void processMeatOne(String content, Channel channel, Message message) throws IOException{         try {             System.out.println("processMeatOne---开始消费队列meat_queue的消息: " + content);             // 模拟执行任务             Thread.sleep(1000);             // 模拟异常             String is = null;             is.toString();             // 确认收到消息,false只确认当前consumer一个消息收到,true确认所有consumer获得的消息             channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);         } catch (Exception  e) {             // 为了避免MQ中Unacked的消息堆积,消费失败也进行ack确认             channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);             e.printStackTrace();         }     }

但是异常有可能是网络等问题,还想再重试一次,再决定是否确认或者抛弃这个异常,我们在异常捕获里进行处理,

12345678910111213    catch (Exception  e) {             System.out.println("=====================异常了========================");             if (message.getMessageProperties().getRedelivered()) {                 System.out.println("================消息已重复处理失败,拒绝再次接收======================" + content);                 // 拒绝消息,requeue=false 表示不再重新入队,如果配置了死信队列则进入死信队列                 channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);             } else {                 System.out.println("====================消息即将再次返回队列处理=========================" + content);                 // requeue为是否重新回到队列,true重新入队                 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);             }             //e.printStackTrace();         }

如果单纯的做channel.basicNack处理,可能造成死循环。

在手动确认的模式下,不管是消费成功还是消费失败,一定要记得确认消息,不然消息会一直处于unack状态,直到消费者进程重启或者停止。

3、方法详解:

确认收到一个或多个消息:

1void basicAck(long deliveryTag, boolean multiple) throws IOException;

deliveryTag:消息的传递标识。

multiple: 如果为false,只确认当前consumer一个消息;如果为true,则确认所有consumer获得的所有小于deliveryTag的消息。

拒绝一个或多个消息:

1void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

deliveryTag:消息的传递标识。

multiple: 如果为true,则拒绝所有consumer获得的小于deliveryTag的消息。

requeue: 设置为true 会把消费失败的消息从新添加到队列的尾端,设置为false不会重新回到队列。

拒绝一个消息:

1void basicReject(long deliveryTag, boolean requeue) throws IOException;

deliveryTag:消息的传递标识。

requeue: 设置为false 表示不再重新入队,如果配置了死信队列则进入死信队列。

channel.basicNack 与 channel.basicReject 的区别在于basicNack可以批量拒绝多条消息,而basicReject一次只能拒绝一条消息。

三、消息重试

重试并不是RabbitMQ重新发送了消息,仅仅是消费者内部进行的重试,换句话说就是重试跟mq没有任何关系;

自动ack模式下,如果超出了重试次数,队列中的数据会被ack掉;

自动ack模式下,使用catch捕获异常也是会导致不触发重试的;因此消费者代码不能添加try{}catch(){}捕获异常,一旦捕获了异常,就相当于消息正确处理了,消息直接被确认掉了,不会触发重试;

具体实现:

1、开启消息自动确认,开启消息重试,修改application配置文件

123456789101112# 开启重试,默认是false spring.rabbitmq.listener.simple.retry.enabled=true # 重试次数,默认为3次 spring.rabbitmq.listener.simple.retry.max-attempts=5 # 重试最大间隔时间 spring.rabbitmq.listener.simple.retry.max-interval=10000 # 重试初始间隔时间 spring.rabbitmq.listener.simple.retry.initial-interval=2000 # 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间 spring.rabbitmq.listener.simple.retry.multiplier=2 # 消费自动确认 spring.rabbitmq.listener.simple.acknowledge-mode=auto

2、消费者

123456789    @RabbitListener(queues = "meat_queue")     @RabbitHandler     public void processMeatTwo(String message) throws InterruptedException {         System.out.println("processMeatTwo消费了队列meat_queue的消息:" + message);         Thread.sleep(1000);         //模拟异常         String is = null;         is.toString();     }

3、运行结果

控制台中可以看到,消息一共重试了5次,之后会抛出ListenerExecutionFailedException的异常,后面附带着Retry Policy Exhausted文字,提示我们重试次数已经用尽了。

消息重试次数用尽后,消息就会被抛弃,但是我们不想消息被抛弃,可以采用死信队列的方式处理重试失败的消息。

4、死信队列

声明死信交换机dlx_exchange,死信队列dlx_queue,业务队列添加死信交换机以及死信路由键的配置。队列,交换机,以及绑定关系的声明如下:

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869@Configuration public class RabbitConfig {     /**      * 声明2个业务队列      **/     @Bean     public Queue fruitsQueue(){         return new Queue("fruits_queue");     }     @Bean     public Queue meatQueue(){         Map params = new HashMap();         params.put("x-dead-letter-exchange","dlx_exchange");//声明当前队列绑定的死信交换机         params.put("x-dead-letter-routing-key","dlx");//声明当前队列的死信路由键         return QueueBuilder.durable("meat_queue").withArguments(params).build();     }     /**      * 死信队列      * @return      */     @Bean     public Queue dlxQueue(){         return new Queue("dlx_queue");     }     /**      * 声明一个Direct类型的交换机      **/     @Bean     DirectExchange directExchange(){         return new DirectExchange("xue_exchange");     }         /**      * 死信交换机      * @return      */     @Bean     public DirectExchange dlxExchange(){         return new DirectExchange("dlx_exchange");     }     /**      * 将上面的2个队列绑定到Direct交换机,在绑定的时候指定BindingKey      **/     @Bean     Binding bindExchangeFruits(Queue fruitsQueue, DirectExchange directExchange){         return BindingBuilder.bind(fruitsQueue).to(directExchange).with("fruits");     }     @Bean     Binding bindExchangeMeat(Queue meatQueue,DirectExchange directExchange){         return BindingBuilder.bind(meatQueue).to(directExchange).with("meat");     }     /**      * 死信队列绑定死信交换机      * @param dlxQueue 队列      * @param dlxExchange 交换机      * @return      */     @Bean     Binding dlcBinding(Queue dlxQueue, DirectExchange dlxExchange){         return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("dlx");     } }

5、测试一下

正在消费时:

重试次数使用完之后,会看到dlx_queue队列中有一条待消费的消息:

也就是说重试次数使用完之后,消息会从业务队列中删除,同时发送到死信队列中。

会发现meat_queue队列比fruits_queue队列多出了DLX和DLK两个标识。

DLX ,指的是x-dead-letter-exchange 这个属性,全称为 Dead-Letter-Exchange ,可以称之为死信交换机。当消息在一个队列中变成死信 (dead message) 之后,它能被重新被发送到另一个交换机中,这个交换机就是 DLX,绑定 DLX 的队列就称之为死信队列。DLX 也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定 , 实际上就是设置某个队列的属性。当这个队列中存在死信时 , RabbitMQ 就会自动地将这个消息重新发布到设置的 DLX 上去 ,进而被路由到另一个队列,即死信队列,可以监听这个队列中消息做相应的处理。

DLK,指的是x-dead-letter-routing-key 这个属性,关联的是routingKey,也就是路由。

~OVER~



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有