ActiveMQ 消息重发策略

背景

在使用ActiveMQ时,配置了消息重发策略。 但因为对配置项的理解不够深刻,导致虽然消息重新被投递了,单因为时间间隔太小,最终被放入DLQ中。

注意: 我使用的ActiveMQ版本是5.8

错误配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Bean
public RedeliveryPolicy redeliveryPolicy() {
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
//是否在每次尝试重新发送失败后,增长这个等待时间
redeliveryPolicy.setUseExponentialBackOff(true);
//重发次数,默认为6次 这里设置为10次
redeliveryPolicy.setMaximumRedeliveries(10);
//重发时间间隔,默认为1秒
redeliveryPolicy.setInitialRedeliveryDelay(1);
//第一次失败后重新发送之前等待1秒,第二次失败再等待1 * 2秒,这里的2就是value
redeliveryPolicy.setBackOffMultiplier(2);
//是否避免消息碰撞
redeliveryPolicy.setUseCollisionAvoidance(false);
//设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效
redeliveryPolicy.setMaximumRedeliveryDelay(-1);
return redeliveryPolicy;
}

REST API 版本化

要管理应用接口业务逻辑的复杂性,需要你对API进行版本管理。版本控制可帮助你在需要变更服务逻辑时快速迭代。

随着系统业务发展或逻辑复杂度的不断提高,API的变化是不可避免的。 当变更会破坏现有系统客户端的集成时,管理这种变化的影响可能是相当大的挑战。

何时对REST API进行版本化

只有在重大破坏性变更发生时才考虑对API进行版本升级。破坏性变更包括:

  • 一次或多次调用的响应数据格式发生了变化
  • 响应数据类型发生变化(例如,将整数变为浮点数)
  • 删除API返回数据的部分内

发生破坏性变更时,应该总是修改API的主版本号。

非破坏性变更(如添加新的REST端点或新的响应参数)不需要更改主版本号。

SpringBoot应用部署模式

背景

越来越多的人开始使用SpringBoot来实现项目的快速开发。每个团队都会面临一个同样的问题:如何部署SpringBoot应用?大部分人会想:这还需要考虑?当然是同步Fat Jar的方式部署喽。现实是残酷的!可能的原因有:

  • 已有的发布系统不支持
  • 团队成员习惯了war包的部署方式
  • 外置的Servlet容器更容易配置
  • 文件路径相关的代码调整
  • 其它原因

基于Spring构建RESTFUL风格的controller

前言

Spring为开发REST服务提供一流的支持。在本文中,我们将使用Spring 4 @RestController注解开发基于Spring 4 MVC的RESTful JSON服务和RESTful XML服务。

Spring在内部使用HttpMessageConverters将响应转换为所需的格式[JSON / XML / etc ..],这些格式基于类路径中可用的某些库,并可以选择使用请求中的Accept Headers

为了服务JSON,我们将使用Jackson库[jackson-databind.jar]。 对于XML,我们将使用Jackson XML扩展[jackson-dataformat-xml.jar]。 只有在类路径中存在这些库才会触发Spring以所需格式转换输出。 此外,我们将进一步通过使用JAXB批注注释域类来支持XML,以防Jackson的XML扩展库由于某种原因而不可用。

注意:如果你通过在浏览器中输入网址发送请求,则可以添加后缀[.xml / .json],以帮助确定要提供的内容的类型。

spring对缓存的支持及相关注解说明

Spring 3.1 引入了基于注释(annotation)的缓存(cache)技术,但是它本质上不是一个具体的缓存实现方案(例如 EHCache 或者 OSCache),而是一个对缓存使用的抽象,通过在既有代码中添加少量它定义的各种annotation,即能够达到缓存方法的返回对象的效果。

Spring 的缓存技术还具备相当的灵活性,不仅能够使用 SpEL(Spring Expression Language)来定义缓存的 key和各种condition,还提供开箱即用的缓存临时存储方案,也支持和主流的专业缓存例如EHCache集成。

linux系统如何启动

背景知识

intel CPU发展历史简介

  • 1971年,Intel 发布了第一款的微处理器4004。它是一个4位的微处理器。
  • 1972年,Intel 发布了第一款八位处理器8008。它是一个8位的微处理器,地址总线(address bus)是14位的,就是说可以访问到16K的内存空间。
  • 1974年4月,Intel 发布了第二款八位处理器8080。它是8008是增强版,增加了几个累加器,使它可以访问16位(8+8)的内存地址,即64K范围内的地址空间。而且它也是公认的“第一款真正可用的微处理器”。8080的架构对8086产生了很大的影响,并且为 x86系列奠定了基础。
  • 1976年开始设计,1978年中旬Intel 发布了8086。标志了x86王朝的开始。它是一款16位的微处理器,却被设计成可以访问1MB的内存(即20位的地址空间)
  • 1982年,Intel 的80286面世了。它是第一款采用保护模式的x86微处理器。地址总线增加到24位使它可以访问到16M 的内存空间。
  • 1985年,Intel 发布了80386。一个拥有32位的微处理器。并且地址总数(address bus)也是32位的,寻址能力大幅提高到4G。

spring扩展点及springboot自动配置总结

spring扩展点和自动配置

BeanFactoryPostProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
public interface BeanFactoryPostProcessor {  

/**
* Modify the application context's internal bean factory after its standard
* initialization. All bean definitions will have been loaded, but no beans
* will have been instantiated yet. This allows for overriding or adding
* properties even to eager-initializing beans.
* @param beanFactory the bean factory used by the application context
* @throws org.springframework.beans.BeansException in case of errors
*/
void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException;

}

通过该扩展点,我们修改,新增BeanDefinition。因为此时所有的BeanDefinition已经加载,但是没有Bean被创建。一般用在需要覆盖或替换Bean的属性时。

kafka发送消息过程分析

前言

前面的一篇文章分析了消息发送时topic分区选择的问题,本文就分析一下后续的发送逻辑。

消息发送涉及的类

Producer

Producer类是Kafka消息的发送的入口

ProducerFactory

ProducerFactory 是一个策略接口,用来创佳Producer, 默认的实现是:DefaultKafkaProducerFactory,其创建
ProducerCloseSafeProducer, CloseSafeProducerKafkaProducer的一个代理类。

DefaultKafkaProducerFactory 的构造方法接收一个Map对象,用来指的创建Producer的配置。

1
2
3
4
5
6
7
8
9
10
public DefaultKafkaProducerFactory(Map<String, Object> configs) {
truethis(configs, null, null);
}

public DefaultKafkaProducerFactory(Map<String, Object> configs, Serializer<K> keySerializer,
Serializer<V> valueSerializer) {
truethis.configs = new HashMap<>(configs);
truethis.keySerializer = keySerializer;
truethis.valueSerializer = valueSerializer;
}

KafkaTemplate

KafkaTemplate是一个模板类,提供了操作Kafka的高级Api。

1
2
3
4
5
6
7
private KafkaTemplate<Integer, String> createTemplate() {
trueMap<String, Object> senderProps = senderProps();
trueProducerFactory<Integer, String> pf =
truetruetruenew DefaultKafkaProducerFactory<>(senderProps);
trueKafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
truereturn template;
}

发送消息实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 生产者配置项
private Map<String, Object> senderProps() {
trueMap<String, Object> props = new HashMap<>();
trueprops.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
trueprops.put(ProducerConfig.RETRIES_CONFIG, 0);
trueprops.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
trueprops.put(ProducerConfig.LINGER_MS_CONFIG, 1);
trueprops.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
trueprops.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
trueprops.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
truereturn props;
}
// 创建生产者工厂实例
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);

@Test
public void testSendMsgSysn() throws Exception {
trueKafkaTemplate<Integer, String> kafkaTemplate = createTemplate();
truefor (int i = 0; i < 10; i++) {
truetrue// 同步发送
truetrue//kafkaTemplate.send(topic1, "hello ===> " + i).get();
truetrue// 异步发送
truetruekafkaTemplate.send(topic1, "hello ===> " + i);
true}
}

消息发送过程分析

Producer 的一个重要方法就是send, 方法签名如下:

1
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);

从方法的签名我们能够推断出,Kafka的消息发送是异步的。 这个从后面的分析可以得证。

kafka 消费消息过程分析

前言

Springboot目前非常火热,如果在Springboot环境下使用Kafka也变得非常重要。本文就Springboot环境下如何配置
和使用kafka进行一下介绍。

Producer 发送消息确认策略

Kafka在消息的发送端提供了三种确认策略,改策略是通过是通过创建KafkaProducer的配置项ack来进行设置的。
它的取值有下面所示的三种值:

  1. 0 不等待确认。此时 retries 重试配置不起作用。
  2. 1 Partition 的Leader进行确认就可以了。 但此时消息没有被同步到其它副本,有丢失消息的风险。
  3. all 等待所有副本都确认了才行。 改机制能保证很强的数据安全性。

Consumer

Consumer是消费Kafka消息的接口,主要的实现类是KafkaConsumer

ConsumerFactory

ConsumerFactory 是一个策略接口,用来创佳Consumer, 默认的实现是:DefaultKafkaConsumerFactory,返回一个KafkaConsumer
的实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public DefaultKafkaConsumerFactory(Map<String, Object> configs) {
truethis(configs, null, null);
}

public DefaultKafkaConsumerFactory(Map<String, Object> configs,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
truethis.configs = new HashMap<>(configs);
truethis.keyDeserializer = keyDeserializer;
truethis.valueDeserializer = valueDeserializer;
}

// 消费者配置项
private Map<String, Object> consumerProps() {
trueMap<String, Object> props = new HashMap<>();
trueprops.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
trueprops.put(ConsumerConfig.GROUP_ID_CONFIG, group);
trueprops.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
trueprops.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
trueprops.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
trueprops.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
trueprops.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
truereturn props;
}

// 消费者工厂实例
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);

MessageListenerContainer

MessageListenerContainer是Spring框架内部对消息消费的一个抽象,一般不应该被外面的类进行实现。
它用来表示一个消息监听者容器。

它有两个重要的实现类:KafkaMessageListenerContainerConcurrentMessageListenerContainer
前者是单线程的,后者支持多线程。

MessageListenerContainer 在启动的时候会主动通过Consumer来获取Kafka上面的消息,并调用消息监听器
来处理消息。

KafkaDataListener

KafkaDataListener是一个标识接口,表示一个消息监听器AbstractMessageListenerContainer在启动时会检查
它所包含的监听器的类型。

消费消息例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
 @Test
public void testAutoCommit() throws Exception {
truelogger.info("Start auto");
trueContainerProperties containerProps = new ContainerProperties(topic1);
truefinal CountDownLatch latch = new CountDownLatch(4);
true//设置消息监听器
truecontainerProps.setMessageListener((MessageListener<Integer, String>) message -> {
truetruelogger.info("received: " + message);
truetruelatch.countDown();
true});
trueKafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
truecontainer.setBeanName("testAuto");
truecontainer.start();
trueThread.sleep(1000); // wait a bit for the container to start
true//send msg
truetestSendMsgSysn();

trueassertTrue(latch.await(60, TimeUnit.SECONDS));
truecontainer.stop();
truelogger.info("Stop auto");
}

Kafka消息消费确认类型

确认类型

Kafka 中消息消费的offset的commit类型由枚举类AckMode来定义,具体如下:

RECORD

每处理一条commit一次

BATCH

每次poll的时候批量提交一次,频率取决于每次poll的调用频率

TIME

每次间隔ackTime的时间去commit

COUNT

累积达到ackCount次的ack去commit

COUNT_TIME

ackTime或ackCount哪个条件先满足,就commit

MANUAL

用户负负责ack,但是背后也是批量上去。此时的监听器类型必须是AcknowledgingMessageListener

MANUAL_IMMEDIATE

listner负责ack,每调用一次,就立即commit。此时的监听器类型必须是AcknowledgingMessageListener

ContainerProperties 中ackMode的默认值为:AckMode.BATCH

Kafka获取消息逻辑分析

获取消息过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
@Override
public void run() {
trueif (this.autoCommit && this.theListener instanceof ConsumerSeekAware) {
truetrue((ConsumerSeekAware) this.theListener).registerSeekCallback(this);
true}
truethis.count = 0;
truethis.last = System.currentTimeMillis();
trueif (isRunning() && this.definedPartitions != null) {
truetrueinitPartitionsIfNeeded();
truetrue// we start the invoker here as there will be no rebalance calls to
truetrue// trigger it, but only if the container is not set to autocommit
truetrue// otherwise we will process records on a separate thread
truetrueif (!this.autoCommit) {
truetruetruestartInvoker();
truetrue}
true}
truelong lastReceive = System.currentTimeMillis();
truelong lastAlertAt = lastReceive;
truewhile (isRunning()) {
truetruetry {
truetruetrue// 不是 autoCommit,则处理消费进度
truetruetrueif (!this.autoCommit) {
truetruetruetrueprocessCommits();
truetruetrue}
truetruetrueprocessSeeks();
truetruetrueif (this.logger.isTraceEnabled()) {
truetruetruetruethis.logger.trace("Polling (paused=" + this.paused + ")...");
truetruetrue}
truetruetrue// 拉取待消费的消息, 默认一秒拉取一次
truetruetrueConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
truetruetrueif (records != null && this.logger.isDebugEnabled()) {
truetruetruetruethis.logger.debug("Received: " + records.count() + " records");
truetruetrue}
truetruetrueif (records != null && records.count() > 0) {
truetruetruetrueif (this.containerProperties.getIdleEventInterval() != null) {
truetruetruetruetruelastReceive = System.currentTimeMillis();
truetruetruetrue}
truetruetruetrue// 如果autoCommit设置为true,则在该线程进行消息的处理
truetruetruetrueif (this.autoCommit) {
truetruetruetruetrueinvokeListener(records);
truetruetruetrue}
truetruetruetrue// 否则将记录提交到一个缓存队列中
truetruetruetrueelse {
truetruetruetruetrueif (sendToListener(records)) {
truetruetruetruetruetrueif (this.assignedPartitions != null) {
truetruetruetruetruetruetrue// avoid group management rebalance due to a slow
truetruetruetruetruetruetrue// consumer
truetruetruetruetruetruetruethis.consumer.pause(this.assignedPartitions);
truetruetruetruetruetruetruethis.paused = true;
truetruetruetruetruetruetruethis.unsent = records;
truetruetruetruetruetrue}
truetruetruetruetrue}
truetruetruetrue}
truetruetrue}
truetruetrue// 处理idle事件
truetruetrueelse {
truetruetruetrueif (this.containerProperties.getIdleEventInterval() != null) {
truetruetruetruetruelong now = System.currentTimeMillis();
truetruetruetruetrueif (now > lastReceive + this.containerProperties.getIdleEventInterval()
truetruetruetruetruetruetrue&& now > lastAlertAt + this.containerProperties.getIdleEventInterval()) {
truetruetruetruetruetruepublishIdleContainerEvent(now - lastReceive);
truetruetruetruetruetruelastAlertAt = now;
truetruetruetruetruetrueif (this.theListener instanceof ConsumerSeekAware) {
truetruetruetruetruetruetrueseekPartitions(getAssignedPartitions(), true);
truetruetruetruetruetrue}
truetruetruetruetrue}
truetruetruetrue}
truetruetrue}
truetruetruethis.unsent = checkPause(this.unsent);
truetrue}
truetruecatch (WakeupException e) {
truetruetruethis.unsent = checkPause(this.unsent);
truetrue}
truetruecatch (Exception e) {
truetruetrueif (this.containerProperties.getGenericErrorHandler() != null) {
truetruetruetruethis.containerProperties.getGenericErrorHandler().handle(e, null);
truetruetrue}
truetruetrueelse {
truetruetruetruethis.logger.error("Container exception", e);
truetruetrue}
truetrue}
true}
trueif (this.listenerInvokerFuture != null) {
truetruestopInvokerAndCommitManualAcks();
true}
truetry {
truetruethis.consumer.unsubscribe();
true}
truecatch (WakeupException e) {
truetrue// No-op. Continue process
true}
truethis.consumer.close();
trueif (this.logger.isInfoEnabled()) {
truetruethis.logger.info("Consumer stopped");
true}
}

kafka 消息确认之autoCommit原理分析

如果配置了enable.auto.commit=true, 那么在创建ConsumerCoordinator时,会提交一个定时任务,
每隔固定的时间auto.commit.interval.ms就会通过后台线程AutoCommitTask来完成消费进度的提交。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private class AutoCommitTask implements DelayedTask {
truepublic void run(final long now) {
truetrue//异步提交 消费进度, subscriptions.allConsumed() 获取每个Partition的消费进度
truetruecommitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback() {
truetruetrue@Override
truetruetruepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
truetruetruetrueif (exception == null) {
truetruetruetruetruereschedule(now + interval);
truetruetruetrue} else {
truetruetruetruetruelog.warn("Auto offset commit failed for group {}: {}", groupId, exception.getMessage());
truetruetruetruetruereschedule(now + interval);
truetruetruetrue}
truetruetrue}
truetrue});
true}
}

SubscriptionState

SubscriptionState
1
2
3
/* the list of partitions currently assigned */
// TopicPartitionState 里面包含了消费的进度, 它的值是在每次拉取数据时更新的。
private final Map<TopicPartition, TopicPartitionState> assignment;

Fetcher

1
2
3
4
5
6
private int append(Map<TopicPartition, List<ConsumerRecord<K, V>>> drained,
PartitionRecords<K, V> partitionRecords,
int maxRecords) {
true// 更新最新的消费进度
truesubscriptions.position(partitionRecords.partition, nextOffset);
}

kafka 消息非autoCommit实现分析

processCommits
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
private void processCommits() {
truehandleAcks();
truethis.count += this.acks.size();
truelong now;
trueAckMode ackMode = this.containerProperties.getAckMode();
trueif (!this.isManualImmediateAck) {
truetrue//这里有重复更新保存消费进度数据的Map
truetrueif (!this.isManualAck) {
truetruetrueupdatePendingOffsets();
truetrue}
truetrueboolean countExceeded = this.count >= this.containerProperties.getAckCount();
truetrueif (this.isManualAck || this.isBatchAck || this.isRecordAck
truetruetruetrue|| (ackMode.equals(AckMode.COUNT) && countExceeded)) {
truetruetrueif (this.logger.isDebugEnabled() && ackMode.equals(AckMode.COUNT)) {
truetruetruetruethis.logger.debug("Committing in AckMode.COUNT because count " + this.count
truetruetruetruetruetrue+ " exceeds configured limit of " + this.containerProperties.getAckCount());
truetruetrue}
truetruetruecommitIfNecessary();
truetruetruethis.count = 0;
truetrue}
truetrue// 处理 AckMode.TIME 或 AckMode.COUNT_TIME
truetrueelse {
truetruetruenow = System.currentTimeMillis();
truetruetrueboolean elapsed = now - this.last > this.containerProperties.getAckTime();
truetruetrue// 如果 当前时间距离上次ack的时间间隔大于配置项ackTime设置的值
truetruetrueif (ackMode.equals(AckMode.TIME) && elapsed) {
truetruetruetrueif (this.logger.isDebugEnabled()) {
truetruetruetruetruethis.logger.debug("Committing in AckMode.TIME " +
truetruetruetruetruetruetrue"because time elapsed exceeds configured limit of " +
truetruetruetruetruetruetruethis.containerProperties.getAckTime());
truetruetruetrue}
truetruetruetrue// 提交保存在 offsets 中的数据 (同步或异步)
truetruetruetruecommitIfNecessary();
truetruetruetruethis.last = now;
truetruetrue}
truetruetrue// 如果 当前时间距离上次ack的时间间隔大于配置项 ackTime 设置的值 或 待ack的记录数大于
truetruetrue// ackCount设置的值
truetruetrueelse if (ackMode.equals(AckMode.COUNT_TIME) && (elapsed || countExceeded)) {
truetruetruetrueif (this.logger.isDebugEnabled()) {
truetruetruetruetrueif (elapsed) {
truetruetruetruetruetruethis.logger.debug("Committing in AckMode.COUNT_TIME " +
truetruetruetruetruetruetruetrue"because time elapsed exceeds configured limit of " +
truetruetruetruetruetruetruetruethis.containerProperties.getAckTime());
truetruetruetruetrue}
truetruetruetruetrueelse {
truetruetruetruetruetruethis.logger.debug("Committing in AckMode.COUNT_TIME " +
truetruetruetruetruetruetruetrue"because count " + this.count + " exceeds configured limit of" +
truetruetruetruetruetruetruetruethis.containerProperties.getAckCount());
truetruetruetruetrue}
truetruetruetrue}
truetruetruetrue// 提交保存在 offsets 中的数据 (同步或异步)
truetruetruetruecommitIfNecessary();
truetruetruetruethis.last = now;
truetruetruetruethis.count = 0;
truetruetrue}
truetrue}
true}
}

handleAcks

逐条处理每一个待ack的记录

handleAcks
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private void handleAcks() {
trueConsumerRecord<K, V> record = this.acks.poll();
truewhile (record != null) {
truetrueif (this.logger.isTraceEnabled()) {
truetruetruethis.logger.trace("Ack: " + record);
truetrue}
truetrueprocessAck(record);
truetruerecord = this.acks.poll();
true}
}

private void processAck(ConsumerRecord<K, V> record) {
true// 如果AckMode的值是`AckMode.MANUAL_IMMEDIATE`
trueif (ListenerConsumer.this.isManualImmediateAck) {
truetruetry {
truetruetrue// 则直接同步更新(syncCommits默认值为true) 或 异步更新 zookeeper中保存的offset的值
truetruetrueackImmediate(record);
truetrue}
truetruecatch (WakeupException e) {
truetruetrue// ignore - not polling
truetrue}
true}
true// 如果不是 `AckMode.MANUAL_IMMEDIATE` 则只是将消费进度offset的值保存到一个Map中
trueelse {
truetrueaddOffset(record);
true}
}
```

### @EnableKafka

`@EnableKafka`注解配合`@Configuration`注解一起只用,会激活Kafka的基于注解驱动的消息消费功能。

```java EnableKafka
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(KafkaBootstrapConfiguration.class)
public @interface EnableKafka {
}

KafkaBootstrapConfiguration

KafkaBootstrapConfiguration 的主要功能有两个:

  1. 注册KafkaListenerAnnotationBeanPostProcessor, 该Bean主要用来处理@KafkaListener注解
  2. 注册KafkaListenerEndpointRegistry, 该Bean用于管理MessageListenerContainer
KafkaBootstrapConfiguration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Configuration
public class KafkaBootstrapConfiguration {

true@SuppressWarnings("rawtypes")
true@Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
true@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
truepublic KafkaListenerAnnotationBeanPostProcessor kafkaListenerAnnotationProcessor() {
truetruereturn new KafkaListenerAnnotationBeanPostProcessor();
true}

true@Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
truepublic KafkaListenerEndpointRegistry defaultKafkaListenerEndpointRegistry() {
truetruereturn new KafkaListenerEndpointRegistry();
true}
}

总结

  1. ackOnError 的默认值为true, 非autoCommit情况下,消费失败的记录会被添加到待ack的队列中。
  2. ListenerConsumererrorHandler 默认值是 LoggingErrorHandler,只会打印日志
|