配置2个数据源:
application.properties
# 第一个mq
spring.rabbitmq.first.host=${mq.host}
spring.rabbitmq.first.port=${mq.port}
spring.rabbitmq.first.username=${mq.username}
spring.rabbitmq.first.password=${mq.password}
spring.rabbitmq.first.virtualHost=${mq.vhost}
# 第二个mq
spring.rabbitmq.second.host=${rabbitmq.address}
spring.rabbitmq.second.port=${rabbitmq.port}
spring.rabbitmq.second.username=${rabbitmq.username}
spring.rabbitmq.second.password=${rabbitmq.password}
spring.rabbitmq.second.virtualHost=${rabbitmq.vhost}
配置MQ
RabbitConfig.java
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import java.io.IOException;
@Configuration
public class RabbitConfig {
@Bean(name = "firstConnectionFactory")
@Primary
public ConnectionFactory firstConnectionFactory(
@Value("${spring.rabbitmq.first.host}") String host,
@Value("${spring.rabbitmq.first.port}") int port,
@Value("${spring.rabbitmq.first.username}") String username,
@Value("${spring.rabbitmq.first.password}") String password,
@Value("${spring.rabbitmq.first.virtualHost}") String virtualHost
) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
return connectionFactory;
}
@Bean(name = "secondConnectionFactory")
public ConnectionFactory secondConnectionFactory(
@Value("${spring.rabbitmq.second.host}") String host,
@Value("${spring.rabbitmq.second.port}") int port,
@Value("${spring.rabbitmq.second.username}") String username,
@Value("${spring.rabbitmq.second.password}") String password,
@Value("${spring.rabbitmq.second.virtualHost}") String virtualHost
) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
return connectionFactory;
}
@Bean(name = "firstRabbitTemplate")
@Primary
public RabbitTemplate firstRabbitTemplate(
@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory
) {
RabbitTemplate firstRabbitTemplate = new RabbitTemplate(connectionFactory);
return firstRabbitTemplate;
}
@Bean(name = "secondRabbitTemplate")
public RabbitTemplate secondRabbitTemplate(
@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory
) {
RabbitTemplate secondRabbitTemplate = new RabbitTemplate(connectionFactory);
return secondRabbitTemplate;
}
// 配置监听1
@Bean(name = "firstFactory")
public SimpleRabbitListenerContainerFactory firstFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory
) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
// 配置监听2
@Bean(name = "secondFactory")
public SimpleRabbitListenerContainerFactory secondFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory
) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
/* 创建队列hello1,
因为这里没有指定connectFactory,所以使用的是默认的connectionFactory,
即用@Primary标记的firstConnectionFactory创建的,所以下面这种写法只会再第一个默认的mq种创建队列
注意:这一段并不会在第二个mq中创建队列hello1
*/
@Bean
public Queue firstQueue() {
System.out.println("configuration firstQueue ........................");
return new Queue("hello1");
}
/** 如果要在第二个mq中也创建队列hello1,那么需要指定数据源secondConnectionFactory **/
@Bean
public String chargeQueue(@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) {
try {
connectionFactory.createConnection().createChannel(false).queueDeclare("hello1", true, false, false, null);
}catch (IOException e){
e.printStackTrace();
}
return "hello1";
}
}
注意
这里注意上面创建队列的方式,创建队列时候,如果没有指定ConnectionFactory时,那么只会在默认的ConnectionFactory数据源MQ中创建队列,在其他的MQ中创建队列,需要指定ConnectionFactory
配置监听消费端
Listener1.java
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "hello1", containerFactory="firstFactory")
public class Listener1 {
@RabbitHandler
public void process(String hello) {
System.out.println("queue1-Receiver : " + hello);
}
}
Listener2.java
@Component
@RabbitListener(queues = "hello1", containerFactory="secondFactory")
public class Listener2 {
@RabbitHandler
public void process(String hello) {
System.out.println("queue2-Receiver : " + hello);
}
}
模拟发送
Sender.java
import java.util.Date;
import javax.annotation.Resource;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Component
public class Sender {
@Resource(name="firstRabbitTemplate")
private RabbitTemplate firstRabbitTemplate;
public void send1() {
String context = "quene1:hello1 " + new Date();
System.out.println("Sender : " + context);
this.firstRabbitTemplate.convertAndSend("hello1", context);
}
}
Sender2.java
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Date;
@Component
public class Sender2 {
@Resource(name="secondRabbitTemplate")
private RabbitTemplate secondRabbitTemplate;
public void send1() {
String context = "quene2:hello1 " + new Date();
System.out.println("Sender : " + context);
this.secondRabbitTemplate.convertAndSend("hello1", context);
}
}
测试
TestDemo01.java
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = Application.class)
public class TestDemo01 {
@Autowired
private Sender sender;
@Autowired
private Sender2 sender2;
@Test
public void hello() throws Exception {
sender.send1();
}
@Test
public void hello2() throws Exception {
sender2.send1();
}
}
|