SpringBoot+RabbitMQ | 您所在的位置:网站首页 › rabbitmq消息确认机制手动和自动的区别 › SpringBoot+RabbitMQ |
一、前言
使用RabbitMQ,会有知晓消息是否成功、如果消费失败重试的需求,这篇文章主要讲的是消息确认机制(ACK)和消息重试机制。 二、消息确认机制RabbitMQ的消费者确认机制用来确认消费者是否成功消费了队列中的消息。 消息确认分为几种情况: AcknowledgeMode.NONE:不确认,不发送任何ack确认;只要消息发送完成会立即在队列移除,不会重发。 AcknowledgeMode.AUTO:自动确认,消费消息后会自动发送ack确认;在消费者发生异常时,消息会不断的被重发,直到处理成功。不会丢失消息,即便服务挂掉,没有处理完成的消息会重回队列,但是异常会让消息不断重试。 AcknowledgeMode.MANUAL:手动确认,手动发送ack确认;只有服务端收channel.basicAck(message.getMessageProperties().getDeliveryTag(),false)的确认信号,消息才会移除,确认成功后不管后面是异常还是断开服务消息已经被移除了。如果在确认之前抛出异常,消息不会移除,也不会重试,监听程序会因为异常停止不再处理消息 ,如果此时断开服务,消息重新回到队列。 具体实现: 1、开启消息手动确认,修改application配置文件 1spring.rabbitmq.listener.simple.acknowledge-mode=manual2、消费者 123456789101112131415161718 @RabbitListener(queues = "meat_queue") @RabbitHandler public void processMeatOne(String content, Channel channel, Message message) throws IOException{ try { System.out.println("processMeatOne---开始消费队列meat_queue的消息: " + content); // 模拟执行任务 Thread.sleep(1000); // 模拟异常 String is = null; is.toString(); // 确认收到消息,false只确认当前consumer一个消息收到,true确认所有consumer获得的消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { // 为了避免MQ中Unacked的消息堆积,消费失败也进行ack确认 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); e.printStackTrace(); } }但是异常有可能是网络等问题,还想再重试一次,再决定是否确认或者抛弃这个异常,我们在异常捕获里进行处理, 12345678910111213 catch (Exception e) { System.out.println("=====================异常了========================"); if (message.getMessageProperties().getRedelivered()) { System.out.println("================消息已重复处理失败,拒绝再次接收======================" + content); // 拒绝消息,requeue=false 表示不再重新入队,如果配置了死信队列则进入死信队列 channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } else { System.out.println("====================消息即将再次返回队列处理=========================" + content); // requeue为是否重新回到队列,true重新入队 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } //e.printStackTrace(); }如果单纯的做channel.basicNack处理,可能造成死循环。 在手动确认的模式下,不管是消费成功还是消费失败,一定要记得确认消息,不然消息会一直处于unack状态,直到消费者进程重启或者停止。 3、方法详解: 确认收到一个或多个消息: 1void basicAck(long deliveryTag, boolean multiple) throws IOException;deliveryTag:消息的传递标识。 multiple: 如果为false,只确认当前consumer一个消息;如果为true,则确认所有consumer获得的所有小于deliveryTag的消息。 拒绝一个或多个消息: 1void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;deliveryTag:消息的传递标识。 multiple: 如果为true,则拒绝所有consumer获得的小于deliveryTag的消息。 requeue: 设置为true 会把消费失败的消息从新添加到队列的尾端,设置为false不会重新回到队列。 拒绝一个消息: 1void basicReject(long deliveryTag, boolean requeue) throws IOException;deliveryTag:消息的传递标识。 requeue: 设置为false 表示不再重新入队,如果配置了死信队列则进入死信队列。 channel.basicNack 与 channel.basicReject 的区别在于basicNack可以批量拒绝多条消息,而basicReject一次只能拒绝一条消息。 三、消息重试重试并不是RabbitMQ重新发送了消息,仅仅是消费者内部进行的重试,换句话说就是重试跟mq没有任何关系; 自动ack模式下,如果超出了重试次数,队列中的数据会被ack掉; 自动ack模式下,使用catch捕获异常也是会导致不触发重试的;因此消费者代码不能添加try{}catch(){}捕获异常,一旦捕获了异常,就相当于消息正确处理了,消息直接被确认掉了,不会触发重试; 具体实现: 1、开启消息自动确认,开启消息重试,修改application配置文件 123456789101112# 开启重试,默认是false spring.rabbitmq.listener.simple.retry.enabled=true # 重试次数,默认为3次 spring.rabbitmq.listener.simple.retry.max-attempts=5 # 重试最大间隔时间 spring.rabbitmq.listener.simple.retry.max-interval=10000 # 重试初始间隔时间 spring.rabbitmq.listener.simple.retry.initial-interval=2000 # 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间 spring.rabbitmq.listener.simple.retry.multiplier=2 # 消费自动确认 spring.rabbitmq.listener.simple.acknowledge-mode=auto2、消费者 123456789 @RabbitListener(queues = "meat_queue") @RabbitHandler public void processMeatTwo(String message) throws InterruptedException { System.out.println("processMeatTwo消费了队列meat_queue的消息:" + message); Thread.sleep(1000); //模拟异常 String is = null; is.toString(); }3、运行结果 控制台中可以看到,消息一共重试了5次,之后会抛出ListenerExecutionFailedException的异常,后面附带着Retry Policy Exhausted文字,提示我们重试次数已经用尽了。 消息重试次数用尽后,消息就会被抛弃,但是我们不想消息被抛弃,可以采用死信队列的方式处理重试失败的消息。 4、死信队列 声明死信交换机dlx_exchange,死信队列dlx_queue,业务队列添加死信交换机以及死信路由键的配置。队列,交换机,以及绑定关系的声明如下: 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869@Configuration public class RabbitConfig { /** * 声明2个业务队列 **/ @Bean public Queue fruitsQueue(){ return new Queue("fruits_queue"); } @Bean public Queue meatQueue(){ Map params = new HashMap(); params.put("x-dead-letter-exchange","dlx_exchange");//声明当前队列绑定的死信交换机 params.put("x-dead-letter-routing-key","dlx");//声明当前队列的死信路由键 return QueueBuilder.durable("meat_queue").withArguments(params).build(); } /** * 死信队列 * @return */ @Bean public Queue dlxQueue(){ return new Queue("dlx_queue"); } /** * 声明一个Direct类型的交换机 **/ @Bean DirectExchange directExchange(){ return new DirectExchange("xue_exchange"); } /** * 死信交换机 * @return */ @Bean public DirectExchange dlxExchange(){ return new DirectExchange("dlx_exchange"); } /** * 将上面的2个队列绑定到Direct交换机,在绑定的时候指定BindingKey **/ @Bean Binding bindExchangeFruits(Queue fruitsQueue, DirectExchange directExchange){ return BindingBuilder.bind(fruitsQueue).to(directExchange).with("fruits"); } @Bean Binding bindExchangeMeat(Queue meatQueue,DirectExchange directExchange){ return BindingBuilder.bind(meatQueue).to(directExchange).with("meat"); } /** * 死信队列绑定死信交换机 * @param dlxQueue 队列 * @param dlxExchange 交换机 * @return */ @Bean Binding dlcBinding(Queue dlxQueue, DirectExchange dlxExchange){ return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("dlx"); } }5、测试一下 正在消费时: 、 重试次数使用完之后,会看到dlx_queue队列中有一条待消费的消息: 也就是说重试次数使用完之后,消息会从业务队列中删除,同时发送到死信队列中。 会发现meat_queue队列比fruits_queue队列多出了DLX和DLK两个标识。 DLX ,指的是x-dead-letter-exchange 这个属性,全称为 Dead-Letter-Exchange ,可以称之为死信交换机。当消息在一个队列中变成死信 (dead message) 之后,它能被重新被发送到另一个交换机中,这个交换机就是 DLX,绑定 DLX 的队列就称之为死信队列。DLX 也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定 , 实际上就是设置某个队列的属性。当这个队列中存在死信时 , RabbitMQ 就会自动地将这个消息重新发布到设置的 DLX 上去 ,进而被路由到另一个队列,即死信队列,可以监听这个队列中消息做相应的处理。 DLK,指的是x-dead-letter-routing-key 这个属性,关联的是routingKey,也就是路由。 ~OVER~ |
CopyRight 2018-2019 实验室设备网 版权所有 |