RabbitMQ系列【18】对象序列化机制 您所在的位置:网站首页 Rabbitmq配置序列化 RabbitMQ系列【18】对象序列化机制

RabbitMQ系列【18】对象序列化机制

2024-03-29 23:39| 来源: 网络整理| 查看: 265

有道无术,术尚可求,有术无道,止于术。

文章目录 前言发送对象接收对象使用Jackson 序列化

前言

使用RabbitMQ原生API,发送消息时,发送的是二进制byte[]数据。

void basicPublish(String var1, String var2, byte[] var4) throws IOException;

使用RabbitTemplate.send方法发送Message对象,也是二进制byte[]数据。

public Message(byte[] body) { this(body, new MessageProperties()); }

在接收时,需要将二进制数据转为你想要的数据格式。在JAVA 编程中都是基于对象操作,一般消息都是对象,比如订单、日志。

所以RabbitTemplate提供了convertAndSend方法,可以直接发送对象,那么对象在网络传输,就涉及到了序列化机制。

发送对象

首先我们看下RabbitTemplate.convertAndSend是如何工作及序列化对象的。

发送一个用户User 对象,该对象需要实现Serializable序列化接口。

User user = new User(); user.setName("张三"); rabbitTemplate.convertAndSend("bbdbdbdb","aaa.key",user);

convertAndSend也是调用send方法,只是多了一个convertMessageIfNecessary,将对象转为二进制数组,并封装到Message对象中。

public void convertAndSend(String exchange, String routingKey, Object object, @Nullable CorrelationData correlationData) throws AmqpException { // this.convertMessageIfNecessary(object) 将JAVA 消息对象转为`Message` this.send(exchange, routingKey, this.convertMessageIfNecessary(object), correlationData); }

convertMessageIfNecessary会判断当前消息是否是Message类型,如果是直接返回,不是则调用消息转换器进行转换。

protected Message convertMessageIfNecessary(Object object) { return object instanceof Message ? (Message)object : this.getRequiredMessageConverter().toMessage(object, new MessageProperties()); }

获取消息转换器,直接通过RabbitTemplate.getMessageConverter获取其成员属性,也就是SimpleMessageConverter,这是默认值。

private MessageConverter getRequiredMessageConverter() throws IllegalStateException { // private MessageConverter messageConverter = new SimpleMessageConverter(); MessageConverter converter = this.getMessageConverter(); if (converter == null) { throw new AmqpIllegalStateException("No 'messageConverter' specified. Check configuration of RabbitTemplate."); } else { return converter; } }

接着调用消息转换器的toMessage方法,

public final Message toMessage(Object object, @Nullable MessageProperties messagePropertiesArg, @Nullable Type genericType) throws MessageConversionException { // 1. 创建消息属性对象 MessageProperties messageProperties = messagePropertiesArg; if (messagePropertiesArg == null) { messageProperties = new MessageProperties(); } // 2. 创建Message对象 Message message = this.createMessage(object, messageProperties, genericType); messageProperties = message.getMessageProperties(); if (this.createMessageIds && messageProperties.getMessageId() == null) { messageProperties.setMessageId(UUID.randomUUID().toString()); } return message; }

createMessage创建Message 对象并返回。如果不是 byte[]、String类型,最后会查看消息对象是否实现了Serializable接口,如果是,则进行序列化,并设置ContentType:application/x-java-serialized-object,以上都是不是则会抛出IllegalArgumentException异常。

protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { byte[] bytes = null; // 1. byte[] 类型 if (object instanceof byte[]) { bytes = (byte[])object; // 设置消息属性 ContentType:application/octet-stream messageProperties.setContentType("application/octet-stream"); } else if (object instanceof String) { // 2. String 类型 try { // 转为字节 bytes = ((String)object).getBytes(this.defaultCharset); } catch (UnsupportedEncodingException var6) { throw new MessageConversionException("failed to convert to Message content", var6); } // 设置消息属性 ContentType:text/plain messageProperties.setContentType("text/plain"); // 设置消息属性 ContentEncoding:UTF-8 messageProperties.setContentEncoding(this.defaultCharset); } else if (object instanceof Serializable) { // 3. 实现了 Serializable接口 try { // 转为byte[] bytes = SerializationUtils.serialize(object); } catch (IllegalArgumentException var5) { throw new MessageConversionException("failed to convert to serialized Message content", var5); } // 设置消息属性 ContentType:application/x-java-serialized-object messageProperties.setContentType("application/x-java-serialized-object"); } if (bytes != null) { // 4. 设置长度 messageProperties.setContentLength((long)bytes.length); // 5. 返回`Message`对象 return new Message(bytes, messageProperties); } else { throw new IllegalArgumentException(this.getClass().getSimpleName() + " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName()); } }

Message 创建成功后,调用原生的channel.basicPublish方法,发送消息对象、属性。

protected void sendToRabbit(Channel channel, String exchange, String routingKey, boolean mandatory, Message message) throws IOException { AMQP.BasicProperties convertedMessageProperties = this.messagePropertiesConverter.fromMessageProperties(message.getMessageProperties(), this.encoding); channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, message.getBody()); }

查看控制台,可以看到对象消息的相关信息: 在这里插入图片描述

接收对象

在消费者接收消息时,可以直接接收业务对象。

@RabbitListener(queues = {"dsfsf"}) public void receive003(User user) { System.out.println("收到消息" + user); }

容器监听消息,调用消息转换器SimpleMessageConverter将二进制数据转为相应的对象。

调用的是SimpleMessageConverter.fromMessage方法。

public Object fromMessage(Message message) throws MessageConversionException { Object content = null; // 1. 处理消息属性 MessageProperties properties = message.getMessageProperties(); if (properties != null) { // 获取contentType ,这里为:application/x-java-serialized-object String contentType = properties.getContentType(); // 2. contentType 以text 开头(字符串),二进制转为字符串返回 if (contentType != null && contentType.startsWith("text")) { String encoding = properties.getContentEncoding(); if (encoding == null) { encoding = this.defaultCharset; } try { content = new String(message.getBody(), encoding); } catch (UnsupportedEncodingException var8) { throw new MessageConversionException("failed to convert text-based Message content", var8); } // 3. contentType为 application/x-java-serialized-object(序列化对象), } else if (contentType != null && contentType.equals("application/x-java-serialized-object")) { try { // 反序列化为对象 content = SerializationUtils.deserialize(this.createObjectInputStream(new ByteArrayInputStream(message.getBody()), this.codebaseUrl)); } catch (IllegalArgumentException | IllegalStateException | IOException var7) { throw new MessageConversionException("failed to convert serialized Message content", var7); } } } // 4. 以上都不是,直接返回二进制 if (content == null) { content = message.getBody(); } return content; } 使用Jackson 序列化

可是使用其他序列化方式,比如Jackson。

只需要在RabbitTemplate 、监听容器工厂RabbitListenerContainerFactory中设置转换器即可。

@Bean public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory){ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); return factory; } @Bean public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(); configurer.configure(template, connectionFactory); template.setMessageConverter(new Jackson2JsonMessageConverter()); return template; }


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有