RabbitMQ的核心组件和五种常用模式 |
您所在的位置:网站首页 › 组件有哪些版型组成的 › RabbitMQ的核心组件和五种常用模式 |
RabbitMQ的核心组件
消费者Consumer: package com.lp.test; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumeer { public static void main(String[] args) { //创建连接工厂 ---配置连接信息 ConnectionFactory factory = new ConnectionFactory(); //设置连接ip factory.setHost("192.168.31.181"); try { //创建连接对象 Connection connection = factory.newConnection(); //创建通道 Channel channel = connection.createChannel(); //接受消息 /** * String queue 对列的名称 * boolean autoAck 是否自动确认 * Consumer callback 回调函数:当队列中存在信息,会自动触发回调函数 */ DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body 接受的信息 System.out.println("消息内容是"+new String(body)); } }; channel.basicConsume("lipeng",true,consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } } 2,工作者模式
消费者1Consumer01: package com.lp.worker; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumeer01 { public static void main(String[] args) { //创建连接工厂 ---配置连接信息 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.31.181"); try { //创建连接 Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel(); //接受消息 /** * String queue 对列的名称 * boolean autoAck 是否自动确认 * Consumer callback 回调函数:当队列中存在信息,会自动触发回调函数 */ DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body 接受的信息 System.out.println("消息01:"+new String(body)); } }; channel.basicConsume("lipeng_worker01",true,consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }消费者2Consumer02: package com.lp.worker; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumeer02 { public static void main(String[] args) { //创建连接工厂 ---配置连接信息 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.31.181"); try { //创建连接 Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel(); //接受消息 /** * String queue 对列的名称 * boolean autoAck 是否自动确认 * Consumer callback 回调函数:当队列中存在信息,会自动触发回调函数 */ DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body 接受的信息 System.out.println("消息02:"+new String(body)); } }; channel.basicConsume("lipeng_worker01",true,consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } } 3,广播模式
消费者1Consumer01: package com.lp.fanout; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumeer01 { public static void main(String[] args) { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.31.181"); try { //创建连接 Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body 接受的消息 System.out.println("消息01"+"=="+new String(body)); } }; channel.basicConsume("lipeng_fanout01",true,consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }消费者2Consumer02: package com.lp.fanout; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumeer02 { public static void main(String[] args) { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.31.181"); try { //创建连接 Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body 接受消息 System.out.println("消息02"+"=="+new String(body)); } }; channel.basicConsume("lipeng_fanout02",true,consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } } 4,路由模式
消费者1Consumer01: package com.lp.routing; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumeer01 { public static void main(String[] args) { //创建连接对象 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.31.181"); try { //创建连接 Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel(); //接受消息 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body 接受的消息内容 System.out.println("消息01:"+new String(body)); } }; channel.basicConsume("lp_routing01",true,consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }消费者2Consumer02: package com.lp.routing; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumeer02 { public static void main(String[] args) { //创建连接对象 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.31.181"); try { //创建连接 Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel(); //接受消息 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body 接受的消息内容 System.out.println("消息02:"+new String(body)); } }; channel.basicConsume("lp_routing01",true,consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } } 5,主题模式
消费者1Consumer01: package com.lp.topic; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumeer01 { public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.31.181"); try { Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消息01:"+new String(body)); } }; channel.basicConsume("lp_topic01",true,consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }消费者2Consumer02: package com.lp.topic; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumeer02 { public static void main(String[] args) { String queue1="lp_topic02"; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.31.181"); try { Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消息02:"+new String(body)); } }; channel.basicConsume(queue1,true,consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }每天努力一点点,终会有所收获!!!! |
今日新闻 |
点击排行 |
|
推荐新闻 |
图片新闻 |
|
专题文章 |
CopyRight 2018-2019 实验室设备网 版权所有 win10的实时保护怎么永久关闭 |