RabbitMQ高级特性之消息确认机制 您所在的位置:网站首页 rabbitmq保证消息一致性 RabbitMQ高级特性之消息确认机制

RabbitMQ高级特性之消息确认机制

2023-05-12 04:19| 来源: 网络整理| 查看: 265

前言

为了有了消息发送方确认ack,以及消息队列,交换机,消息持久化。还需要消费者确认机制呢?

生产者发送确认机制和消息的持久化存储机制,然而这依然无法完全保证整个过程的可靠性,因为如果消息被消费过程中业务处理失败了但是消息却已经出列了(被标记为已消费了),我们又没有任何重试,那结果跟消息丢失没什么分别。

mq中消息已经被我们消费了,但是我们又没有将失败的消息再放到mq中,不能保证消息以及数据的一致性。

解决

针对消费者端消费消息失败情况

我们可以增加消费重试机制,也可以将失败的消息保存下来,定时器再进行生产等。 RabbitMQ在消费端会有Ack机制,即消费端消费消息后需要发送Ack确认报文给Broker端,告知自己是否已消费完成,否则可能会一直重发消息直到消息过期(AUTO模式)。 一般而言,我们有如下处理手段: 采用NONE模式,消费的过程中自行捕获异常,引发异常后直接记录日志并落到异常恢复表,再通过后台定时任务扫描异常恢复表尝试做重试动作。 如果业务不自行处理则有丢失数据的风险 采用AUTO(自动Ack)模式,不主动捕获异常,当消费过程中出现异常时会将消息放回Queue中, 然后消息会被重新分配到其他消费者节点(如果没有则还是选择当前节点)重新被消费,默认会一直重发消息并直到消费完成返回Ack或者一直到过期 采用MANUAL(手动Ack)模式,消费者自行控制流程并手动调用channel相关的方法返回Ack 具体实现

首先我们定义生产者

package gaojitexing.ack; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 生产者生产消息 */ public class Produce { public static void main(String[] args) throws IOException, TimeoutException { //连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //amqp 协议端口 connectionFactory.setPort(5672); //简历连接 Connection connection = connectionFactory.newConnection(); //获取通道 Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare("queue.ack", true, false, false, null); //声明交换器 交换器名称, 交换器类型,是否持久化,是否自动删除的, 属性map几乎 channel.exchangeDeclare("ex.ack", BuiltinExchangeType.DIRECT, false, false, false, null); //发送消息 交换器 路由key,属性,消息 -> amqp协议会将消息发送出去 channel.queueBind("queue.ack", "ex.ack", "ack.key"); String aeration = "hello-"; for (int i = 0; i < 8; i++) { channel.basicPublish("ex.ack", "ack.key", null, (aeration + " --- > " + i).getBytes()); } channel.close(); connection.close(); } } 复制代码

用于生产消息

而我们消费端进行消费消息,消费端既可以分为推消息和拉消息

package gaojitexing.ack; import com.rabbitmq.client.*; import java.io.IOException; import java.net.URISyntaxException; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.util.concurrent.TimeoutException; public class LisenterComsumer { public static void main(String[] args) throws NoSuchAlgorithmException, KeyManagementException, URISyntaxException, IOException, TimeoutException { //连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setUri("amqp://guest:guest@localhost:5672/%2f"); // connectionFactory.setVirtualHost("/"); // connectionFactory.setUsername("guest"); // connectionFactory.setPassword("guest"); // // //amqp 协议端口 // connectionFactory.setPort(5672); //建立连接 Connection connection = connectionFactory.newConnection(); //获取通道 Channel channel = connection.createChannel(); //定义消息队列 channel.queueDeclare("queue.ack", true, false, false, null); //声明交换器 channel.exchangeDeclare("ex.ack", BuiltinExchangeType.DIRECT, false, false, false, null); //绑定队列到交换器 路由 debug 的日志 channel.queueBind("queue.ack", "ex.ack", "ack.key"); //监听消费(用于推消息) //false 表示不自动确认 channel.basicConsume("queue.ack", false, "myConsume", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("获取到的消息" + new String(body)); //envelope.getDeliveryTag() deliveryTag表示消息的唯一标志, //手动确认消息 第二个参数表示表示是否是批量确认 // channel.basicAck(envelope.getDeliveryTag(), false); //手动确认 者处理失败 //第二个参数表示不确认多个还是一个消息,最后一个参数表示不确认的消息是否重新放回队列 //可以拒收多条消息 // channel.basicNack(envelope.getDeliveryTag(),false,true); //手动拒绝消息(只能拒收一条消息)。第二个参数表示是否重新入列,是否重新入列,然后重发会一直重发重试 // channel.basicReject(envelope.getDeliveryTag(), true); } }); // channel.close(); // connection.close(); } } 复制代码

拉消息模式

package gaojitexing.ack; import com.rabbitmq.client.*; import java.io.IOException; import java.net.URISyntaxException; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.util.concurrent.TimeoutException; public class GetComsumer { public static void main(String[] args) throws NoSuchAlgorithmException, KeyManagementException, URISyntaxException, IOException, TimeoutException { //连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setUri("amqp://guest:guest@localhost:5672/%2f"); // connectionFactory.setVirtualHost("/"); // connectionFactory.setUsername("guest"); // connectionFactory.setPassword("guest"); // // //amqp 协议端口 // connectionFactory.setPort(5672); //建立连接 Connection connection = connectionFactory.newConnection(); //获取通道 Channel channel = connection.createChannel(); //定义消息队列 channel.queueDeclare("queue.ack", true, false, false, null); //声明交换器 channel.exchangeDeclare("ex.ack", BuiltinExchangeType.DIRECT, false, false, false, null); //绑定队列到交换器 路由 debug 的日志 channel.queueBind("queue.ack", "ex.ack", "ack.key"); //消费(用于拉消息) GetResponse getResponse = channel.basicGet("queue.ack", false); System.out.println("拉取的消息: " + new String(getResponse.getBody())); Envelope envelope = getResponse.getEnvelope(); //拒绝消息 第二个表示表示是否需要重新入列 // channel.basicReject(envelope.getDeliveryTag(), false); // channel.basicReject(envelope.getDeliveryTag(), true); //拒绝消息 第二个表示是否拒绝多个,第三个表示是否需要重新入列 channel.basicNack(envelope.getDeliveryTag(), false, false); //确认消息 第二个参数是否确认多个 // channel.basicAck(envelope.getDeliveryTag(), false); channel.close(); connection.close(); } } 复制代码


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有