48. 从零开始学springboot: 接入RocketMQ 您所在的位置:网站首页 goc_notepad下载 48. 从零开始学springboot: 接入RocketMQ

48. 从零开始学springboot: 接入RocketMQ

#48. 从零开始学springboot: 接入RocketMQ| 来源: 网络整理| 查看: 265

前言

微服务的架构越来越流行, 很多老旧项目面临着解耦重构, 复杂项目的解耦通常会引入一些中间件来帮助我们更好的完成工作, 本章, 我们就来通过实例了解下消息中间件的用法.

市面上比较流行的消息中间件如下 image.png

因为鱼哥的项目上了阿里的云, 所以选择很简单, 就用RocketMQ即可, 看官们根据实际情况择优选择.

RocketMQ

注意, 本文使用了4.0sdk,截止到文章发表, ali已推出5.0SDK 关于阿里的RocketMQ的介绍官方有详细的文档, 这里就不啰嗦了

官方已SDK提供了简单的接入方式, 需要注意的是, 官方提供了两种协议的SDK

image.png

HTTP协议 采用RESTful风格,方便易用,快速接入,跨网络能力强. 支持Java、C++、.NET、Go、Python、Node.js和PHP七种语言客户端

TCP协议 区别于HTTP简单的接入方式,提供更为专业、可靠、稳定的TCP协议的SDK接入服务. 支持的语言包括Java、C/C++ 以及.NET

注意, 使用的话需要自费开通对应的服务, 通过后才能使用, 大部分管理功能都能在阿里云的控制台中找到.

实例

下面我们就springboot来实际使用下RocketMQ. 为了较好的使用, 我们同时演示tcp何http两种方式

pom引入依赖

com.aliyun.openservices ons-client ${ali-mq-tcp.version} com.aliyun.mq mq-http-sdk ${ali-mq-http.version} jar-with-dependencies 接入tcp方式

定义mq配置类

@Getter @Setter @Component @ConfigurationProperties(prefix = "aliyun.rocketmq.tcp") public class MqTcpProperties { //AccessKey ID,身份验证,在RAM控制台创建 private String accessKeyId; //AccessKey Secret,身份验证,在RAM控制台创建 private String accessKeySecret; //实例TCP协议接入地址(内网) private String nameSrvAddr; //发送超时时间: 3s private String sendMsgTimeoutMillis = "3000"; //线程数目:默认20 private String consumeThreadNums = "20"; //订阅方式: 集群 private String messageModel = "CLUSTERING"; public Properties getMqProperties() { Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.AccessKey, this.accessKeyId); properties.setProperty(PropertyKeyConst.SecretKey, this.accessKeySecret); properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr); // 设置发送超时时间,单位毫秒 properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, this.sendMsgTimeoutMillis); // 设置消费者线程数为20个(默认20) properties.setProperty(PropertyKeyConst.ConsumeThreadNums, this.consumeThreadNums); // 订阅方式设置 properties.put(PropertyKeyConst.MessageModel, this.messageModel); return properties; } }

定义Topic配置类

@Getter @Setter @Configuration @ConfigurationProperties(prefix = "aliyun.rocketmq.tcp") public class MqTcpTopicProperties { private String topic; private String groupId; private String tag; }

封装一个工具类

@Slf4j public class MqTcpUtil { @Autowired private ProducerBean producer; @Autowired private MqTcpProperties mqTcpProperties; @Autowired private MqTcpTopicProperties mqTcpTopicProperties; private ConsumerBean consumerBean; /** * 同步发送消息 - 配置默认topic * * @param msgTag 标签,可用于消息小分类标注 * @param messageBody 消息body内容,生产者自定义内容 * @param msgKey 消息key值,建议设置全局唯一,可不传,不影响消息投递 * @return success:SendResult or error:null */ public SendResult sendMsg(String msgTag, String messageBody, String msgKey) { Message msg = new Message(mqTcpTopicProperties.getTopic(), msgTag, msgKey, messageBody.getBytes()); return this.send(msg, Boolean.FALSE); } /** * 同步发送消息 - 配置默认topic - 重试次数 * * @param msgTag 标签,可用于消息小分类标注 * @param messageBody 消息body内容,生产者自定义内容 * @param msgKey 消息key值,建议设置全局唯一,可不传,不影响消息投递 * @param retryTimes 重试次数,注意实际请求次数为 retryTimes + 1 * @return success:SendResult or error:null */ public SendResult sendMsg(String msgTag, String messageBody, String msgKey, Integer retryTimes) { Message msg = new Message(mqTcpTopicProperties.getTopic(), msgTag, msgKey, messageBody.getBytes()); SendResult result = this.send(msg, Boolean.FALSE); if (ObjectUtil.isNotEmpty(result) || retryTimes == 0) { return result; } return this.sendMsg(msgTag, messageBody, msgKey, --retryTimes); } /** * 同步发送消息 * * @param topic topic名 * @param msgTag 标签,可用于消息小分类标注 * @param messageBody 消息body内容,生产者自定义内容 * @param msgKey 消息key值,建议设置全局唯一,可不传,不影响消息投递 * @return success:SendResult or error:null */ public SendResult sendMsg(String topic, String msgTag, String messageBody, String msgKey) { Message msg = new Message(topic, msgTag, msgKey, messageBody.getBytes()); return this.send(msg, Boolean.FALSE); } /** * 同步发送单向消息 * * @param topic topic名 * @param msgTag 标签,可用于消息小分类标注 * @param messageBody 消息body内容,生产者自定义内容 * @param msgKey 消息key值,建议设置全局唯一,可不传,不影响消息投递 */ public void sendOneWayMsg(String topic, String msgTag, String messageBody, String msgKey) { Message msg = new Message(topic, msgTag, msgKey, messageBody.getBytes()); this.send(msg, Boolean.TRUE); } /** * 发送普通消息 * * @param msg 消息 * @param isOneWay 是否单向发送 */ private SendResult send(Message msg, Boolean isOneWay) { try { if (isOneWay) { //由于在 oneway 方式发送消息时没有请求应答处理,一旦出现消息发送失败,则会因为没有重试而导致数据丢失。 //若数据不可丢,建议选用同步或异步发送方式。 producer.sendOneway(msg); success(msg, "单向消息MsgId不返回"); return null; } else { //可靠同步发送 SendResult sendResult = producer.send(msg); //获取发送结果,不抛异常即发送成功 assert sendResult != null; success(msg, sendResult.getMessageId()); return sendResult; } } catch (Exception e) { error(msg, e); return null; } } /** * 成功日志打印 * * @param msg * @param messageId */ private void success(Message msg, String messageId) { log.info("发送MQ消息成功 -- Topic:{} ,msgId:{} , Key:{}, tag:{}, body:{}" , msg.getTopic(), messageId, msg.getKey(), msg.getTag(), new String(msg.getBody())); } /** * 异常日志打印 * * @param msg * @param e */ private void error(Message msg, Exception e) { log.error("发送MQ消息失败-- Topic:{}, Key:{}, tag:{}, body:{}" , msg.getTopic(), msg.getKey(), msg.getTag(), new String(msg.getBody())); log.error("errorMsg", e); } @PostConstruct public void init() { log.info("[Init] tcp consumerBean init"); consumerBean = new ConsumerBean(); } public ConsumerBean getConsumer() { return consumerBean; } public ConsumerBean getDefaultConsumer(MessageListener messageListener) { //配置文件 Properties properties = mqTcpProperties.getMqProperties(); //消费者 properties.setProperty(PropertyKeyConst.GROUP_ID, mqTcpTopicProperties.getGroupId()); //设置消费者线程数为20个(默认20) properties.setProperty(PropertyKeyConst.ConsumeThreadNums, mqTcpProperties.getConsumeThreadNums()); // 广播订阅方式设置 properties.put(PropertyKeyConst.MessageModel, mqTcpProperties.getMessageModel()); consumerBean.setProperties(properties); //订阅消息 Map subscriptionTable = new HashMap(); //订阅消息 Subscription smsSubscription = new Subscription(); smsSubscription.setTopic(mqTcpTopicProperties.getTopic()); smsSubscription.setExpression(mqTcpTopicProperties.getTag()); subscriptionTable.put(smsSubscription, messageListener); consumerBean.setSubscriptionTable(subscriptionTable); return consumerBean; } }

自动注入类

/** * rocketMq服务自动配置类 * 业务系统要使用rocketMq服务服务,需要在配置文件中增加如下配置 * 【 * aliyun.rocketmq.tcp.enable=true * aliyun.rocketmq.tcp.accessKeyId= * aliyun.rocketmq.tcp.accessKeySecret= * aliyun.rocketmq.tcp.nameSrvAddr= * aliyun.rocketmq.tcp.default.topic= * aliyun.rocketmq.tcp.default.groupId= * aliyun.rocketmq.tcp.default.tag= */ @Slf4j @Configuration @EnableConfigurationProperties({MqTcpProperties.class, MqTcpTopicProperties.class}) @ConditionalOnClass({ProducerBean.class, ConsumerBean.class}) @ConditionalOnProperty(prefix = "aliyun.rocketmq.tcp", value = "enable", havingValue = "true") public class MqTcpDefaultAutoConfiguration { @Autowired private MqTcpProperties mqTcpProperties; @PostConstruct public void init() { log.info("[Auto Config] MqTcpDefaultAutoConfiguration loading......"); } @Bean(initMethod = "start", destroyMethod = "shutdown") @ConditionalOnMissingBean public ProducerBean buildProducer() { ProducerBean producer = new ProducerBean(); producer.setProperties(mqTcpProperties.getMqProperties()); return producer; } @Bean public MqTcpUtil mqTcpUtil() { return new MqTcpUtil(); } }

核心功能基本完成, 我们可以通过MqTcpUtil执行消息的发送了 新增个测试类

@Test public void mqTcpTest() { // 自定义一条body内容 JSONObject body = new JSONObject(); UUID uuid = UUID.randomUUID(); body.put("notice", "这是一条tcp通知类信息"); //同步发送消息-不带返回值的(一般使用该方法) log.info(String.valueOf(mqTcpUtil.sendMsg("topic", "tag_dev", body.toJSONString(), String.valueOf(uuid)))); }

最后我们加上Ali RocketMq的配置即可

## AK aliyun.rocketmq.tcp.enable=true aliyun.rocketmq.tcp.accessKeyId= aliyun.rocketmq.tcp.accessKeySecret= ## TCP aliyun.rocketmq.tcp.nameSrvAddr= aliyun.rocketmq.tcp.sendMsgTimeoutMillis=3000 aliyun.rocketmq.tcp.consumeThreadNums=20 aliyun.rocketmq.tcp.messageModel=CLUSTERING ## topic aliyun.rocketmq.tcp.topic= aliyun.rocketmq.tcp.groupId= aliyun.rocketmq.tcp.tag= 消费者

以上完成了核心功能的接入与开发, 我们已经能正常发送MQ消息了, 通俗点讲就是生产者就绪了, 我们还需要消费者去消费这些消息.

合理的拆分是生产者和消费者分服务部署, 这里为了演示, 鱼哥就直接自产自销了.

定义消费者监听器MqMessageListener

@Slf4j @Component public class MqMessageListener implements MessageListener { @Override public Action consume(Message message, ConsumeContext context) { log.info("接收到MQ详细信息:{}", message); log.info("解析MQ-Body自定义内容:{}", new String(message.getBody())); try { //do something.. return Action.CommitMessage; } catch (Exception e) { log.error("消费MQ消息失败,msgId:" + message.getMsgID() + ",ExceptionMsg:" + e.getMessage()); return Action.ReconsumeLater; } } }

申明消费者类, 这里我们定义两种消费者, 一种是默认配置的, 一种是主定义配置的,两者选一即可

@Component public class MqConsumerClient { // @Autowired // private MqTcpProperties mqTcpProperties; @Autowired private MqMessageListener mqMessageListener; // @Autowired // private MqTcpTopicProperties mqTcpTopicProperties; @Autowired private MqTcpUtil mqTcpUtil; // //自定义消费者 // @Bean(initMethod = "start", destroyMethod = "shutdown") // public ConsumerBean messageBuildConsumer() { // ConsumerBean consumerBean = mqTcpUtil.getConsumer(); // //配置文件 // Properties properties = mqTcpProperties.getMqProperties(); // //消费者 // properties.setProperty(PropertyKeyConst.GROUP_ID, mqTopicProperties.getGroupId()); // //设置消费者线程数为20个(默认20) // properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20"); // consumerBean.setProperties(properties); // //订阅消息 // Map subscriptionTable = new HashMap(); // // 广播订阅方式设置 // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING); // //订阅消息 // Subscription smsSubscription = new Subscription(); // smsSubscription.setTopic(mqTopicProperties.getTopic()); // smsSubscription.setExpression(mqTopicProperties.getTag()); // subscriptionTable.put(smsSubscription, mqMessageListener); // consumerBean.setSubscriptionTable(subscriptionTable); // return consumerBean; // } // 默认消费者 @Bean(initMethod = "start", destroyMethod = "shutdown") public ConsumerBean defaultConsumer() { ConsumerBean consumerBean = mqTcpUtil.getDefaultConsumer(mqMessageListener); return consumerBean; } }

最后启动服务, 即可看到消息消费的输出信息

image.png image.png

接入http方式

定义mq配置类

@Getter @Setter @Configuration @ConfigurationProperties(prefix = "aliyun.rocketmq.http") public class MqHttpProperties { //AccessKey ID,身份验证,在RAM控制台创建 private String accessKeyId; //AccessKey Secret,身份验证,在RAM控制台创建 private String accessKeySecret; //实例TCP协议接入地址(内网) private String nameSrvAddr; //实例TCP协议接入地址(内网) private String sendMsgTimeoutMillis = "3000"; //线程数目:默认20 private String consumeThreadNums = "20"; //线程数目:默认20 private String messageModel = "CLUSTERING"; public Properties getMqProperties() { Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.AccessKey, this.accessKeyId); properties.setProperty(PropertyKeyConst.SecretKey, this.accessKeySecret); properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr); // 设置发送超时时间,单位毫秒 properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, this.sendMsgTimeoutMillis); // 设置消费者线程数为20个(默认20) properties.setProperty(PropertyKeyConst.ConsumeThreadNums, this.consumeThreadNums); // 广播订阅方式设置 properties.put(PropertyKeyConst.MessageModel, this.messageModel); return properties; }

定义Topic配置类

@Getter @Setter @Configuration @ConfigurationProperties(prefix = "aliyun.rocketmq.http") public class MqHttpTopicProperties { private String topic; private String groupId; private String tag; }

定义操作工具类

@Slf4j public class MqHttpUtil { @Autowired private MqHttpProperties mqHttpProperties; @Autowired private MqHttpTopicProperties mqHttpTopicProperties; private MQProducer producer; private MQConsumer consumer; private MQClient mqClient; @PostConstruct public void init() { log.info("[Init] http producer init"); mqClient = new MQClient( // 设置HTTP协议客户端接入点,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。 mqHttpProperties.getNameSrvAddr(), // AccessKey ID,身份验证,在阿里云RAM控制台创建。 mqHttpProperties.getAccessKeyId(), // AccessKey Secret,身份验证,在阿里云RAM控制台创建。 mqHttpProperties.getAccessKeySecret() ); } /** * 同步发送消息 * * @param topic topic名 * @param msgTag 标签,可用于消息小分类标注 * @param messageBody 消息body内容,生产者自定义内容 * @param msgKey 消息key值,建议设置全局唯一,可不传,不影响消息投递 * @return success:SendResult or error:null */ public TopicMessage sendMsg(String topic, String msgTag, String messageBody, String msgKey) { producer = mqClient.getProducer(topic); TopicMessage msg = new TopicMessage( messageBody.getBytes(), msgTag); msg.setMessageId(msgKey); try { //可靠同步发送 TopicMessage sendResult = producer.publishMessage(msg); //获取发送结果,不抛异常即发送成功 assert sendResult != null; success(topic, msg); return sendResult; } catch (Exception e) { error(topic, msg, e); return null; } } public MQClient getMqClient() { return mqClient; } public MQConsumer getDefaultConsumer() { consumer = mqClient.getConsumer(mqHttpTopicProperties.getTopic(), mqHttpTopicProperties.getGroupId(), mqHttpTopicProperties.getTag()); return consumer; } /** * 成功日志打印 * * @param msg */ private void success(String topic, TopicMessage msg) { log.info("发送MQ消息成功 -- Topic:{} ,msgId:{} , tag:{}, body:{}" , topic, msg.getMessageId(), msg.getMessageTag(), msg.getMessageBodyString()); } /** * 异常日志打印 * * @param msg * @param e */ private void error(String topic, TopicMessage msg, Exception e) { log.error("发送MQ消息失败-- Topic:{}, msgId:{}, tag:{}, body:{}" , topic, msg.getMessageId(), msg.getMessageTag(), msg.getMessageBodyString()); log.error("errorMsg", e); }

自动注入

/** * rocketMq服务自动配置类 * 业务系统要使用rocketMq服务服务,需要在配置文件中增加如下配置 * 【 * aliyun.rocketmq.http.enable=true * aliyun.rocketmq.http.accessKeyId= * aliyun.rocketmq.http.accessKeySecret= * aliyun.rocketmq.http.nameSrvAddr= * aliyun.rocketmq.http.default.topic= * aliyun.rocketmq.http.default.groupId= * aliyun.rocketmq.http.default.tag= */ @Slf4j @Configuration @ConditionalOnProperty(prefix = "aliyun.rocketmq.http", value = "enable", havingValue = "true") public class MqHttpDefaultAutoConfiguration { @PostConstruct public void init() { log.info("[Auto Config] MqHttpDefaultAutoConfiguration loading......"); } @Bean public MqHttpUtil mqHttpUtil() { return new MqHttpUtil(); } }

最后测试下

@Test public void mqHttpTest() { // 自定义一条body内容 JSONObject body = new JSONObject(); UUID uuid = UUID.randomUUID(); body.put("notice", "这是一条http通知类信息"); //同步发送消息-不带返回值的(一般使用该方法) log.info(String.valueOf(mqHttpUtil.sendMsg("topic", "tag_dev", body.toJSONString(), String.valueOf(uuid)))); }

最后增加配置

#***********************MQ-http********************************* ## AK aliyun.rocketmq.http.enable=true aliyun.rocketmq.http.accessKeyId= aliyun.rocketmq.http.accessKeySecret= ## HTTP aliyun.rocketmq.http.nameSrvAddr= aliyun.rocketmq.http.sendMsgTimeoutMillis=3000 aliyun.rocketmq.http.consumeThreadNums=20 aliyun.rocketmq.http.messageModel=CLUSTERING ## topic aliyun.rocketmq.http.topic=topic aliyun.rocketmq.http.groupId=group_dev aliyun.rocketmq.http.tag=tag_dev

运行测试 image.png image.png

项目地址

注意: 案例需要配置你自己申请的RocketMQ配置,否则无法启动.

https://github.com/MrCoderStack/SpringBootDemo/tree/master/sb-alibaba-rocketmq

请关注我的订阅号

订阅号.png



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有