相关文章推荐
憨厚的绿豆  ·  Canon : imagePROGRAF ...·  1 周前    · 
性感的桔子  ·  iOS Crash Dump ...·  3 周前    · 
跑龙套的台灯  ·  java - not able to ...·  8 月前    · 
健壮的大熊猫  ·  Binary Numbers List - ...·  1 年前    · 
任性的小熊猫  ·  reactjs - Type ...·  1 年前    · 

本文参考了 springboot官方文档 ,主要介绍依赖springboot-amqp模块实现与RabbitMQ服务端的连接。

ConnectionFactory

spring AMQP默认使用 CachingConnectionFactory 创建一个应用程序共享的连接工厂,也是用途最广泛的 ConnectionFactory 构建方法,也为Junit提供了 SingleConnectionFactory SingleConncetionFactory 不常用,不再赘述。

与AMQP通信的工作单元实际上是Channel,TCP连接可以共享。connectionFactory分为两种模式,一种是缓存channel,一种是缓存connection(同时也缓存该connection的channel)。默认是缓存channel的模式, 高可用集群场景下(镜像队列) ,通过负载均衡器连接至集群中不同的实例时,可以通过setCacheMode设置为缓存connection的模式。代码示例中给出了缓存connection的模式,同时也设置了 channelCacheSize

1
2
3
4
5
6
7
8
9
10
11
12
private CachingConnectionFactory buildConnFactory(String addresses, String username, String password, String vhost) {    
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(addresses);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(vhost);
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
connectionFactory.setConnectionCacheSize(10);
connectionFactory.setChannelCacheSize(200);

return connectionFactory;
}

如果需要采用缓存Channel的模式,示例如下:

1
2
3
4
5
6
7
8
9
10
11
private CachingConnectionFactory buildConnFactory(String addresses, String username, String password, String vhost) {    
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(addresses);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(vhost);
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
connectionFactory.setChannelCacheSize(channelCacheSize);

return connectionFactory;
}

在缓存connection模式下,不支持自动声明队列、exchange、binding等,rabbitmq-client默认只提供了5个线程处理connection,因此,当connection较多时,应该自定义线程池,并配置到 CachingConnectionFactory 中。自定义的线程池将会被所有connection共享,建议线程池的最大线程数设置的与预期connection数相等,因为可能存在对于大部分connection都有多个channel的情况。示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ExecutorService executor = new 
ThreadPoolExecutor(corePoolSize,
maxPoolSize, keepAliveSeconds,TimeUnit.SECONDS,
new ArrayBlockingQueue<>(queueSize),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r)
{
Thread thread = new Thread(r);
thread.setName("myThread" + autoInt.getAndIncrement());
thread.setDaemon(false);
return thread;
}
},
new ThreadPoolExecutor.CallerRunsPolicy());

connectionFactory.setExecutor(executor);

channelCacheSize

前面了解到,Spring AMQP通过缓存channel或connection提高吞吐量。 connectionFactory 分为两种模式,一种是缓存channel,一种是缓存connection(同时也缓存该connection的channel)。本节主要介绍 channelCacheSize ConnectionCacheSize 也是类似的,不赘述。

默认仅限制缓存的channelSize

默认地,最大channelSize是没有限制的,限制的仅仅是缓存的channelSize(connection也一样),默认值是25,缓存channel的目的是减小高并发多线程环境中频繁创建销毁channel的开销,比如:在某一时刻有100个channel处于工作状态,当channel空闲后,只会缓存channelSize个channel,剩下的都会被销毁。

通过RabbitMQ的Web管理插件观察到channel在频繁的被创建和销毁时,应及时的提高 channelCacheSize 。建议最少要保证线程数< channelCacheSize 。可以通过压力测试,观察高峰期channel动态平衡的数量,从而决定 channelCacheSize 的大小。

channelSize限制

也可以通过 channelCheckoutTimeout 参数设置 connectionFactory channelSize 限制,当该参数大于0时,表示最大的 channel数目 = channelCacheSize ,达到 channelCacheSize 的上限后,调用 createChannel 的线程会阻塞,直至有空闲 channel 出现或阻塞时间超过 chanelCheckoutTimeout ,在超时的情况下,抛出 AmqpTimeoutException ,可以设置一些Retry的策略来处理这些异常。

连接命名

通过 ConnectionNameStrategy 属性设置 connection 名称。

1
2
3
4
5
6
7
8
connectionFactory.setConnectionNameStrategy(new ConnectionNameStrategy() {   
@Override
public String obtainNewConnectionName(ConnectionFactory connectionFactory) {
return connectionFactory.getHost() + atomicInteger.getAndIncrement();
}
});

return connectionFactory;

使用RabbitMQ Java API提供的connectionFactory

Spring AMQP也支持使用RabbitMQ Java API提供的 connectionFactory ,即Rabbit Client的 connectionFactory ,位于包 com.rabbitmq.client.ConnectionFactory 中,通过构造器参数设置在 CachingConnectionFactory 中,示例如下:

1
2
ConnectionFactory factory = new ConnectionFactory();
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(factory);

自定义RabbitClient的属性

上述示例为RabbitClient的 connectionFactory —–> Spring AMQP的 connectionFactiory 的转换,Spring也提供了Spring AMQP的 connectionFactiory —–> RabbitClient的 connectionFactory 的转换。

比如可以通过CachingConnectionFactory设置RabbitClient connectionFactory 的属性。

示例如下:

1
connectionFactory.getRabbitConnectionFactory().getClientProperties().put("foo", "bar");

连接恢复

Rabbit Client4.0后的 connectionFactory 和Spring AMQP的· connectionFactory 均默认提供了自动恢复连接的机制;虽然两者的自动恢复机制是兼容的,但使用构造注入 conncetionFactory 时建议关闭其中一个。

否则,在MQ服务器节点可用但连接尚未恢复时,出现 AutoRecoverConnectionNotCurrentlyOpenException 异常。比如:如果在 RabbitTemplate 中配置 RetryTemplate (Spring AMQP的手动恢复),甚至在故障转移到集群中的另一个代理时,可能抛出上述异常。

关闭Rabbit Client的自动恢复

由于Spring AMQP的自动恢复连接在计时器上恢复,因此可以使用SpringAMQP的恢复机制更快地恢复连接。

springboot-amqp1.7版本以后,默认关闭RabbitClient的 connectionFactory 的自动重连, 但是,在通过构造参数注入RabbitClient的 connectionFactory 时,是没有办法默认关闭的,需要手动设置

1
2
3
ConnectionFactory factory = new ConnectionFactory();
factory.setAutomaticRecoveryEnabled(false); // 显示关闭RabbitClient的自动重连
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(factory);

一般地,关闭掉Rabbit Client的自动恢复,使用Spring AMQP可以满足绝大多数使用场景,而且框架提供的自动恢复机制已经很完善。

SSL连接

建议通过注入RabbitMQ client connectionFactory 的方式配置SSL连接,示例如下:

1
2
3
4
5
6
7
8
9
10
ConnectionFactory factory = new ConnectionFactory();

factory.setAutomaticRecoveryEnabled(false);

File keyFile = new File(keyPath);
File certFile = new File(cerPath);
SSLContexts sslCtx = SslContextBuilder.forServer(certFile, keyFile).build();
factory.useSslProtocol(sslCtx);

CachingConnectionFactory connectionFactory = new CachingConnectionFactory(factory);

避免Connection死锁

在内存不足或MQ服务端出现异常时,可能会出现连接阻塞,对于默认的CachingConnectionFactory,MQ服务端导致连接阻塞时,客户端会主动地关闭。

因此,如果生产者消费者共用同一个connectionFactory,MQ服务端导致生产者客户端与消费者客户端关闭,可能死锁的情况:生产者与消费者持有相同的连接资源时,MQ服务器异常触发生产者和消费者中断与服务端的连接,可能会出现死锁。

为避免死锁的产生, 建议对于生产者和消费者分别配置不同的connectionFactory 。需要注意的是:如果生产者消费者处于同一个事物时,不建议生产者消费者配置相同的connectionFactory,因为消费者(或生产者)需要复用对方的channel。

1
2
3
4
5
6
7
@Bean(name = "myRabbitTemplate")
public RabbitTemplate basicCloudRabbitTemplate(CachingConnectionFactory f) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(f);
rabbitTemplate.setUsePublisherConnection(true);

return rabbitTemplate;
}

Routing ConnectionFactory实现多数据源收发消息

spring AMQP提供 SimpleRoutingConnectionFactory负责在运行时根据查找键动态选择connectionFactory,通常,以线程的上下文作为查找键,比如地址、vHost 等, SimpleRoutingConnectionFactory 继承了 AbstractRoutingConnectionFactory ,通过SimpleResourceHolder获取当前线程的查找键。示例如下:

首先,为RabbitTemplate配置一个以vHost作为查找键的 SimpleRoutingConnectionFactory 。key分别为 factory1 virtualHost factory2 virtualHost ,当然,也可以使用addresses作为查找键。使用方式为 factoryMap.put("#{factory1.addresses}", factory1)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private SimpleRoutingConnectionFactory myFactory() {    
SimpleRoutingConnectionFactory routingFactory = new SimpleRoutingConnectionFactory();
CachingConnectionFactory factory1 = new CachingConnectionFactory();
factory1.setAddresses("localhost:5672");
factory1.setUsername("username1");
factory1.setPassword("password1");
factory1.setVirtualHost("v1");

CachingConnectionFactory factory2 = new CachingConnectionFactory();
factory2.setAddresses("202.130.1.1:5672");
factory2.setUsername("username2");
factory2.setPassword("password2");
factory2.setVirtualHost("v2");

Map<Object, ConnectionFactory> factoryMap = new HashMap<>(5);
factoryMap.put("#{factory1.virtualHost}", factory1);
factoryMap.put("#{factory2.virtualHost}", factory2);

routingFactory.setTargetConnectionFactories(factoryMap);

return routingFactory;
}

@Bean("myRabbitTemplate")
public RabbitTemplate myRabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(myFactory());
return rabbitTemplate;
}

使用方法如下,SimpleResourceHolder的bind和unbind都是必须的,分别指获取当前线程查找键,释放查找键。bind有两个参数,第一个为待获取的connectFactory,第二个为key

1
2
3
4
5
6
7
8
9
10
11
12
13
public class MyService {

@Autowired
@Qualifier("myRabbitTemplate")
private RabbitTemplate rabbitTemplate;

public void service(String vHost, String payload) {
SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), vHost);
rabbitTemplate.convertAndSend(payload);
SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
}

}

消费者使用多数据源的方式略有不同 ,首先,配置两个不同的containerFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Bean("myContainerFactory1")
public SimpleRabbitListenerContainerFactory containerFactory1() {
SimpleRabbitListenerContainerFactory containerFactory1 = new SimpleRabbitListenerContainerFactory();

CachingConnectionFactory factory1 = new CachingConnectionFactory();
factory1.setAddresses("localhost:5672");
factory1.setUsername("username1");
factory1.setPassword("password1");
factory1.setVirtualHost("v1");

containerFactory1.setConnectionFactory(factory1);

return containerFactory1;
}

@Bean("myContainerFactory2")
public SimpleRabbitListenerContainerFactory containerFactory2() {
SimpleRabbitListenerContainerFactory containerFactory2 = new SimpleRabbitListenerContainerFactory();

CachingConnectionFactory factory2 = new CachingConnectionFactory();
factory2.setAddresses("host2:5672");
factory2.setUsername("username2");
factory2.setPassword("password2");
factory2.setVirtualHost("v2");

containerFactory2.setConnectionFactory(factory2);

return containerFactory2;
}

使用@RabbitListener接收消息时指定containerFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
@RabbitListener(queues = "queue.test", containerFactory = "myContainerFactory1")
public void receiveMsg1(Message message, Channel channel) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
logger.debug("rcv msg {}", msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

@RabbitListener(queues = "queue.test", containerFactory = "myContainerFactory2")
public void receiveMsg2(Message message, Channel channel) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
logger.debug("rcv msg {}", msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

集群中的连接管理

MQ的集群根据集群中各节点队列信息区分为镜像队列和普通队列。只有队列所在节点知道该队列的所有信息,默认情况下,MQ是普通队列,队列只存活于集群中的一个节点上,称为主队列。镜像队列与普通队列的相同点是:队列的主拷贝仅存在于一个节点上(主队列,master节点)。不同点是,镜像节点在集群中的其他节点上拥有从队列的拷贝。一旦队列主节点不可用,最老的从队列自动被选举为新的主队列。

镜像队列的原理:在非镜像队列的集群中,信道负责将消息路由至合适的队列。当加入镜像队列后,信道除了负责将消息路按照路由绑定规则路由至合适的队列外,它也要将消息投递到镜像队列的从拷贝,在某种程度上,可以将镜像队列视为拥有一个隐藏的fanout交换器,它指示着信道将消息分发到队列的从拷贝上。

无论对于普通队列还是镜像队列,所要面临的问题是:主节点崩溃时,消费者该与哪个节点建立连接。

普通队列

对于普通队列,使用 CachingConnectionFactory 就足够了,它支持配置多个连接地址,当一个连接失败时,会按顺序尝试与其他地址建立连接。

1
2
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setAddresses("host1:5672, host2:5672");

高可用队列(镜像队列)