springboot配置activemq

前言

网上有好多介绍springboot集成activemq的文章,看了一些文章感觉比较零散,还是抽时间自己详细总结一个如何使用,需要注意哪些点。尤其是关于连接池的配置,需要重点关注,否则在消息量大的情况下会把服务器搞挂。

快速配置

如果你只是连接一个activemq集群或节点,那么配置非常简单(这也是springboot便捷的原因)。

如下:

1
2
3
spring.activemq.broker-url=tcp://127.0.0.1:61616?connectionTimeout=3000&soTimeout=500&tcpNoDelay=true&jms.redeliveryPolicy.maximumRedeliveries=1&jms.redeliveryPolicy.initialRedeliveryDelay=10
spring.activemq.user=admin
spring.activemq.password=admin

就这么简单!有了上面的配置你就可以发送消息了(通过JmsTemplate)。这背后的原理是通过springboot提供的ActiveMQAutoConfiguration来实现的。

1
2
3
4
5
6
7
8
9
10
11
@Configuration
@AutoConfigureBefore(JmsAutoConfiguration.class)
@AutoConfigureAfter({ JndiConnectionFactoryAutoConfiguration.class })
@ConditionalOnClass({ ConnectionFactory.class, ActiveMQConnectionFactory.class })
@ConditionalOnMissingBean(ConnectionFactory.class)
@EnableConfigurationProperties(ActiveMQProperties.class)
@Import({ ActiveMQXAConnectionFactoryConfiguration.class,
truetrueActiveMQConnectionFactoryConfiguration.class })
public class ActiveMQAutoConfiguration {

}

ActiveMQAutoConfiguration的代码能得知,只要你的classpath里面存在ConnectionFactory.class和ActiveMQConnectionFactory.class 并且容器里面没有类型为ConnectionFactory.class的Bean,那么该自动配置组件就会生效。

通过ActiveMQAutoConfiguration,我们在spring容器中就能自动获取一个类型为ConnectionFactory.class的Bean 和 JmsTemplate.class的Bean。

发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles(profiles = {"dev"})
public class TestSendAmqMsg {

@Resource
private JmsTemplate jmsTemplate;

@Test
public void testSendMsgDefault() throws Exception {
jmsTemplate.convertAndSend("java");
//等待消费,因为是自发自消费
Thread.sleep(1000 * 20);
}
}

上面的代码其实是不能正常工作的。原因是jmsTemplate.convertAndSend没有指定Destination

Destination的指定有两种方式,一种是通过方法参数指定。如下所示:

1
jmsTemplate.convertAndSend("hello-jms-queue", "java");

一种是通过在application.properties文件中指定一个默认值:

1
spring.jms.template.default-destination=hello-jms-default

还有一点需要注意的是Destination类型,是Topic还是Queue。默认是Queue。Destination类型也可以通过两种方式设置。

一种是通过在application.properties文件中指定一个默认值:

1
2
# false 表示是Queue
spring.jms.pub-sub-domain=false

一种是通过API

1
jmsTemplate.setPubSubDomain(false);

注意:上面的例子虽然能实现消息的发送和接收,但是非常有局限性。一个ActiveMQ上既有Topic也有Queue,我们通过JmsTemplate发送和消费消息时,最好是通过参数Destination来指定目的地,热不是一个字符串(不知道是具体是什么类型,只能通过全局配置)。

消费消息

有了上面的配置,我们可以有两种消费消息的方式。

  1. 通过JmsTemplate的API来主动消费。这个就不详细讲了。
  2. 通过@JmsListener来被动消费

通过@JmsListener来实现消息消费,配置如下。

1
2
3
4
5
6
7
8
9
@Configuration
@EnableJms
public class JmsConfig {

@JmsListener(containerFactory = "jmsListenerContainerFactory", destination = "hello-jms")
public void consumerMsg(String msg) {
System.out.println("############# Received message is : [" + msg + "]*************");
}
}

@EnableJms的作用是启用spring的Jms的注解驱动能力。注册了JmsListenerAnnotationBeanPostProcessorBean。原理如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(JmsBootstrapConfiguration.class)
public @interface EnableJms {
}

@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class JmsBootstrapConfiguration {

@Bean(name = JmsListenerConfigUtils.JMS_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public JmsListenerAnnotationBeanPostProcessor jmsListenerAnnotationProcessor() {
truetruereturn new JmsListenerAnnotationBeanPostProcessor();
true}

true@Bean(name = JmsListenerConfigUtils.JMS_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
truepublic JmsListenerEndpointRegistry defaultJmsListenerEndpointRegistry() {
truetruereturn new JmsListenerEndpointRegistry();
true}

}

JmsAnnotationDrivenConfiguration该配置类非常关键:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Configuration
@ConditionalOnClass(EnableJms.class)
class JmsAnnotationDrivenConfiguration {
@Bean
@ConditionalOnMissingBean
public DefaultJmsListenerContainerFactoryConfigurer jmsListenerContainerFactoryConfigurer() {
DefaultJmsListenerContainerFactoryConfigurer configurer = new DefaultJmsListenerContainerFactoryConfigurer();
configurer.setDestinationResolver(this.destinationResolver.getIfUnique());
configurer.setTransactionManager(this.transactionManager.getIfUnique());
configurer.setMessageConverter(this.messageConverter.getIfUnique());
configurer.setJmsProperties(this.properties);
return configurer;
}

true@Bean
true@ConditionalOnMissingBean(name = "jmsListenerContainerFactory")
truepublic DefaultJmsListenerContainerFactory jmsListenerContainerFactory(
DefaultJmsListenerContainerFactoryConfigurer configurer,
ConnectionFactory connectionFactory) {
truetrueDefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
truetrueconfigurer.configure(factory, connectionFactory);
truetruereturn factory;
true}
}

配置JmsTemplate属性

1
2
3
4
5
6
7
8
spring.jms.template.default-destination=hello-jms-default
spring.jms.template.delivery-mode=non_persistent
spring.jms.template.priority=100
spring.jms.template.qos-enabled=true
spring.jms.template.time-to-live=50
# 设置消息延迟投递时间 需要jms 2.0 支持
#spring.jms.template.delivery-delay=1
spring.jms.template.receive-timeout=100

配置消费属性

1
2
3
4
5
# 消息消费
spring.jms.listener.acknowledge-mode=client
spring.jms.listener.auto-startup=true
spring.jms.listener.concurrency=10
spring.jms.listener.max-concurrency=20

连接池配置

通过上面的学习,我们已经能实现消息的发送和消费了。但是有一个问题就是,我们会和ActiveMQ Broker建立大量的短连接。在高并发下肯定是不可以的。通过在application.properties中简单配置,我们就能获得连接池能力。

添加依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.14.3</version>
</dependency>

修改配置

1
2
3
4
5
6
7
8
9
10
11
12
13
# 只有配置了该选项,才会启用activemq连接池功能,参见:ActiveMQConnectionFactoryConfiguration
spring.activemq.pool.enabled=true
# 配置连接池参数
spring.activemq.pool.configuration.max-connections=10
spring.activemq.pool.configuration.idle-timeout=30000
spring.activemq.pool.configuration.expiry-timeout=0

spring.activemq.pool.configuration.create-connection-on-startup=false
spring.activemq.pool.configuration.time-between-expiration-check-millis=60000
spring.activemq.pool.configuration.maximum-active-session-per-connection=100
spring.activemq.pool.configuration.reconnect-on-exception=true
spring.activemq.pool.configuration.block-if-session-pool-is-full=true
spring.activemq.pool.configuration.block-if-session-pool-is-full-timeout=3000

连接池自动配置实现原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@ConditionalOnClass(PooledConnectionFactory.class)
static class PooledConnectionFactoryConfiguration {

true@Bean(destroyMethod = "stop")
true@ConditionalOnProperty(prefix = "spring.activemq.pool", name = "enabled", havingValue = "true", matchIfMissing = false)
true@ConfigurationProperties(prefix = "spring.activemq.pool.configuration")
truepublic PooledConnectionFactory pooledJmsConnectionFactory(
ActiveMQProperties properties) {
truetruePooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(
truetruetruetruenew ActiveMQConnectionFactoryFactory(properties)
truetruetruetruetruetrue.createConnectionFactory(ActiveMQConnectionFactory.class));

truetrueActiveMQProperties.Pool pool = properties.getPool();
truetruepooledConnectionFactory.setMaxConnections(pool.getMaxConnections());
truetruepooledConnectionFactory.setIdleTimeout(pool.getIdleTimeout());
truetruepooledConnectionFactory.setExpiryTimeout(pool.getExpiryTimeout());
truetruereturn pooledConnectionFactory;
true}
}

有了这个实现作为参考,如果我们不想使用springboot提供的ActiveMQ自动配置功能,我们自己写代码配置,也能实现连接池的功能,无非就是普通的ActiveMQConnectionFactoryFactory进行包装而已。

有一个细节需要注意:前置为spring.activemq.pool.configuration的配置属性是如何设置到PooledConnectionFactory的呢?但是是通过ConfigurationPropertiesBindingPostProcessor 该类会处理注解ConfigurationProperties 指定的属性,通过反射设置到生成的Bean中(在Bean初始化前)。

总结

相关代码地址:springboot-learn

文章目录
  1. 1. 前言
  2. 2. 快速配置
    1. 2.1. 发送消息
    2. 2.2. 消费消息
  3. 3. 配置JmsTemplate属性
  4. 4. 配置消费属性
  5. 5. 连接池配置
    1. 5.1. 添加依赖
    2. 5.2. 修改配置
    3. 5.3. 连接池自动配置实现原理
  6. 6. 总结
|