RabbitMQ的核心组件和五种常用模式

您所在的位置:网站首页 组件有哪些版型组成的 RabbitMQ的核心组件和五种常用模式

RabbitMQ的核心组件和五种常用模式

2024-07-16 17:40:54| 来源: 网络整理| 查看: 265

RabbitMQ的核心组件

在这里插入图片描述

Broker组件 virtual host组件 Connection组件 Channel组件 Exchange组件 Queue组件 Binding组件 Broker组件 Broker:接受和分发消息的应用,RabbitMQ Server就是Message Broker virtual host组件 出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等 Connection组件 publisher/consumer 和 broker 之间的 TCP 连接 Channel组件 如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销 Exchange组件 message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast) Queue组件 消息最终被送到这里等待 consumer 取走 Binding组件 exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据 1,简单模式

在这里插入图片描述 生产者Producer:

package com.lp.test; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeoutException; public class Producter { public static void main(String[] args) { //创建连接工厂 --配置连接信息 ConnectionFactory factory = new ConnectionFactory(); //设置连接的ip地址 factory.setHost("192.168.31.181"); try { //新建连接 Connection connection = factory.newConnection(); //创建通道 Channel channel = connection.createChannel(); //创建队列 /** * String queue 队列的名称 * boolean durable 是否该队列持久化rabbitMq服务重启后该存放是否存在 * boolean exclusive 是否独占false * boolean autoDelete 是否自动删除 如果长时间没有发生消息,则自动删除 * Map arguments 额外参数,可以给空 */ channel.queueDeclare("lipeng", true, false,false, null); //发生消息 /** * String exchange: 交换机的名称 如果没有则使用”“ 它会自动采用默认 * String routingKey 路由key,如果没有交换机的绑定 使用队列的名称 * BasicProperties props 消息的一些额外配置 可以为null * byte[] body 消息的内容 */ String msg="hello world, I am coming!!!,Nice to meet you"; channel.basicPublish("","lipeng",null,msg.getBytes()); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }

消费者Consumer:

package com.lp.test; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumeer { public static void main(String[] args) { //创建连接工厂 ---配置连接信息 ConnectionFactory factory = new ConnectionFactory(); //设置连接ip factory.setHost("192.168.31.181"); try { //创建连接对象 Connection connection = factory.newConnection(); //创建通道 Channel channel = connection.createChannel(); //接受消息 /** * String queue 对列的名称 * boolean autoAck 是否自动确认 * Consumer callback 回调函数:当队列中存在信息,会自动触发回调函数 */ DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body 接受的信息 System.out.println("消息内容是"+new String(body)); } }; channel.basicConsume("lipeng",true,consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } } 2,工作者模式

在这里插入图片描述 生产者Producer:

package com.lp.worker; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producter { public static void main(String[] args) { //声明全局队列的变量 ---queue1 String queue1="lipeng_worker01"; //创建连接工厂 ---配置连接信息 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.31.181"); try { //创建连接 Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel(); //创建队列 /** * String queue 队列的名称 * boolean durable 是否该队列持久化rabbitMq服务重启后该存放是否存在 * boolean exclusive 是否独占false * boolean autoDelete 是否自动删除 如果长时间没有发生消息,则自动删除 * Map arguments 额外参数,可以给空 */ channel.queueDeclare(queue1,true,false,false,null); //发生消息 /** * String exchange: 交换机的名称 如果没有则使用”“ 它会自动采用默认 * String routingKey 路由key,如果没有交换机的绑定 使用队列的名称 * BasicProperties props 消息的一些额外配置 可以为null * byte[] body 消息的内容 */ for (int i = 0; i e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }

消费者1Consumer01:

package com.lp.worker; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumeer01 { public static void main(String[] args) { //创建连接工厂 ---配置连接信息 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.31.181"); try { //创建连接 Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel(); //接受消息 /** * String queue 对列的名称 * boolean autoAck 是否自动确认 * Consumer callback 回调函数:当队列中存在信息,会自动触发回调函数 */ DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body 接受的信息 System.out.println("消息01:"+new String(body)); } }; channel.basicConsume("lipeng_worker01",true,consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }

消费者2Consumer02:

package com.lp.worker; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumeer02 { public static void main(String[] args) { //创建连接工厂 ---配置连接信息 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.31.181"); try { //创建连接 Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel(); //接受消息 /** * String queue 对列的名称 * boolean autoAck 是否自动确认 * Consumer callback 回调函数:当队列中存在信息,会自动触发回调函数 */ DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body 接受的信息 System.out.println("消息02:"+new String(body)); } }; channel.basicConsume("lipeng_worker01",true,consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } } 3,广播模式

在这里插入图片描述 生产者Producer:

package com.lp.fanout; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producter { public static void main(String[] args) { //创建连接工厂 ---配置连接信息 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.31.181"); try { //创建连接 Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare("lipeng_fanout01",true,false,false,null); channel.queueDeclare("lipeng_fanout02",true,false,false,null); //声明交换机 channel.exchangeDeclare("lipeng_exchange_fanout", BuiltinExchangeType.FANOUT,true); //绑定交换机 channel.queueBind("lipeng_fanout01","lipeng_exchange_fanout",""); channel.queueBind("lipeng_fanout02","lipeng_exchange_fanout",""); for (int i = 0; i e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }

消费者1Consumer01:

package com.lp.fanout; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumeer01 { public static void main(String[] args) { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.31.181"); try { //创建连接 Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body 接受的消息 System.out.println("消息01"+"=="+new String(body)); } }; channel.basicConsume("lipeng_fanout01",true,consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }

消费者2Consumer02:

package com.lp.fanout; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumeer02 { public static void main(String[] args) { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.31.181"); try { //创建连接 Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body 接受消息 System.out.println("消息02"+"=="+new String(body)); } }; channel.basicConsume("lipeng_fanout02",true,consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } } 4,路由模式

在这里插入图片描述 生产者Producer:

package com.lp.routing; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producter { public static void main(String[] args) { //声明用作全局变量的队列变量和交换机变量 String queue1="lp_routing01"; String queue2="lp_routing02"; String exchange1="lp_exchange_direct"; //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.31.181"); try { //创建连接 Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel(); //声明对列 channel.queueDeclare(queue1,true,false,false,null); channel.queueDeclare(queue2,true,false,false,null); //声明交换机 channel.exchangeDeclare(exchange1, BuiltinExchangeType.DIRECT,true); //绑定交换机 channel.queueBind(queue1,exchange1,"error"); channel.queueBind(queue2,exchange1,"error"); channel.queueBind(queue2,exchange1,"info"); channel.queueBind(queue2,exchange1,"warning"); //发生消息 for (int i = 0; i e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }

消费者1Consumer01:

package com.lp.routing; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumeer01 { public static void main(String[] args) { //创建连接对象 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.31.181"); try { //创建连接 Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel(); //接受消息 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body 接受的消息内容 System.out.println("消息01:"+new String(body)); } }; channel.basicConsume("lp_routing01",true,consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }

消费者2Consumer02:

package com.lp.routing; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumeer02 { public static void main(String[] args) { //创建连接对象 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.31.181"); try { //创建连接 Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel(); //接受消息 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body 接受的消息内容 System.out.println("消息02:"+new String(body)); } }; channel.basicConsume("lp_routing01",true,consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } } 5,主题模式

在这里插入图片描述 生产者Producer:

package com.lp.topic; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producter { public static void main(String[] args) { //声明用作全局变量的队列变量和交换价变量 String queue1="lp_topic01"; String queue2="lp_topic02"; String exchange1="lp_exchange_topic"; //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.31.181"); try { //创建连接 Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(queue1,true,false,false,null); channel.queueDeclare(queue2,true,false,false,null); //声明交换机 channel.exchangeDeclare(exchange1, BuiltinExchangeType.TOPIC,true); //绑定队列 channel.queueBind(queue1,exchange1,"*.orange.*"); channel.queueBind(queue2,exchange1,"*.*.rabbit"); channel.queueBind(queue2,exchange1,"lazy.#"); //发生消息 for (int i = 0; i e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }

消费者1Consumer01:

package com.lp.topic; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumeer01 { public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.31.181"); try { Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消息01:"+new String(body)); } }; channel.basicConsume("lp_topic01",true,consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }

消费者2Consumer02:

package com.lp.topic; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumeer02 { public static void main(String[] args) { String queue1="lp_topic02"; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.31.181"); try { Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消息02:"+new String(body)); } }; channel.basicConsume(queue1,true,consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }

每天努力一点点,终会有所收获!!!!



【本文地址】

公司简介

联系我们

今日新闻


点击排行

实验室常用的仪器、试剂和
说到实验室常用到的东西,主要就分为仪器、试剂和耗
不用再找了,全球10大实验
01、赛默飞世尔科技(热电)Thermo Fisher Scientif
三代水柜的量产巅峰T-72坦
作者:寞寒最近,西边闹腾挺大,本来小寞以为忙完这
通风柜跟实验室通风系统有
说到通风柜跟实验室通风,不少人都纠结二者到底是不
集消毒杀菌、烘干收纳为一
厨房是家里细菌较多的地方,潮湿的环境、没有完全密
实验室设备之全钢实验台如
全钢实验台是实验室家具中较为重要的家具之一,很多

推荐新闻


图片新闻

实验室药品柜的特性有哪些
实验室药品柜是实验室家具的重要组成部分之一,主要
小学科学实验中有哪些教学
计算机 计算器 一般 打孔器 打气筒 仪器车 显微镜
实验室各种仪器原理动图讲
1.紫外分光光谱UV分析原理:吸收紫外光能量,引起分
高中化学常见仪器及实验装
1、可加热仪器:2、计量仪器:(1)仪器A的名称:量
微生物操作主要设备和器具
今天盘点一下微生物操作主要设备和器具,别嫌我啰嗦
浅谈通风柜使用基本常识
 众所周知,通风柜功能中最主要的就是排气功能。在

专题文章

    CopyRight 2018-2019 实验室设备网 版权所有 win10的实时保护怎么永久关闭