kafka 消费消息过程分析

前言

Springboot目前非常火热,如果在Springboot环境下使用Kafka也变得非常重要。本文就Springboot环境下如何配置
和使用kafka进行一下介绍。

Producer 发送消息确认策略

Kafka在消息的发送端提供了三种确认策略,改策略是通过是通过创建KafkaProducer的配置项ack来进行设置的。
它的取值有下面所示的三种值:

  1. 0 不等待确认。此时 retries 重试配置不起作用。
  2. 1 Partition 的Leader进行确认就可以了。 但此时消息没有被同步到其它副本,有丢失消息的风险。
  3. all 等待所有副本都确认了才行。 改机制能保证很强的数据安全性。

Consumer

Consumer是消费Kafka消息的接口,主要的实现类是KafkaConsumer

ConsumerFactory

ConsumerFactory 是一个策略接口,用来创佳Consumer, 默认的实现是:DefaultKafkaConsumerFactory,返回一个KafkaConsumer
的实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public DefaultKafkaConsumerFactory(Map<String, Object> configs) {
truethis(configs, null, null);
}

public DefaultKafkaConsumerFactory(Map<String, Object> configs,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
truethis.configs = new HashMap<>(configs);
truethis.keyDeserializer = keyDeserializer;
truethis.valueDeserializer = valueDeserializer;
}

// 消费者配置项
private Map<String, Object> consumerProps() {
trueMap<String, Object> props = new HashMap<>();
trueprops.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
trueprops.put(ConsumerConfig.GROUP_ID_CONFIG, group);
trueprops.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
trueprops.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
trueprops.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
trueprops.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
trueprops.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
truereturn props;
}

// 消费者工厂实例
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);

MessageListenerContainer

MessageListenerContainer是Spring框架内部对消息消费的一个抽象,一般不应该被外面的类进行实现。
它用来表示一个消息监听者容器。

它有两个重要的实现类:KafkaMessageListenerContainerConcurrentMessageListenerContainer
前者是单线程的,后者支持多线程。

MessageListenerContainer 在启动的时候会主动通过Consumer来获取Kafka上面的消息,并调用消息监听器
来处理消息。

KafkaDataListener

KafkaDataListener是一个标识接口,表示一个消息监听器AbstractMessageListenerContainer在启动时会检查
它所包含的监听器的类型。

消费消息例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
 @Test
public void testAutoCommit() throws Exception {
truelogger.info("Start auto");
trueContainerProperties containerProps = new ContainerProperties(topic1);
truefinal CountDownLatch latch = new CountDownLatch(4);
true//设置消息监听器
truecontainerProps.setMessageListener((MessageListener<Integer, String>) message -> {
truetruelogger.info("received: " + message);
truetruelatch.countDown();
true});
trueKafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
truecontainer.setBeanName("testAuto");
truecontainer.start();
trueThread.sleep(1000); // wait a bit for the container to start
true//send msg
truetestSendMsgSysn();

trueassertTrue(latch.await(60, TimeUnit.SECONDS));
truecontainer.stop();
truelogger.info("Stop auto");
}

Kafka消息消费确认类型

确认类型

Kafka 中消息消费的offset的commit类型由枚举类AckMode来定义,具体如下:

RECORD

每处理一条commit一次

BATCH

每次poll的时候批量提交一次,频率取决于每次poll的调用频率

TIME

每次间隔ackTime的时间去commit

COUNT

累积达到ackCount次的ack去commit

COUNT_TIME

ackTime或ackCount哪个条件先满足,就commit

MANUAL

用户负负责ack,但是背后也是批量上去。此时的监听器类型必须是AcknowledgingMessageListener

MANUAL_IMMEDIATE

listner负责ack,每调用一次,就立即commit。此时的监听器类型必须是AcknowledgingMessageListener

ContainerProperties 中ackMode的默认值为:AckMode.BATCH

Kafka获取消息逻辑分析

获取消息过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
@Override
public void run() {
trueif (this.autoCommit && this.theListener instanceof ConsumerSeekAware) {
truetrue((ConsumerSeekAware) this.theListener).registerSeekCallback(this);
true}
truethis.count = 0;
truethis.last = System.currentTimeMillis();
trueif (isRunning() && this.definedPartitions != null) {
truetrueinitPartitionsIfNeeded();
truetrue// we start the invoker here as there will be no rebalance calls to
truetrue// trigger it, but only if the container is not set to autocommit
truetrue// otherwise we will process records on a separate thread
truetrueif (!this.autoCommit) {
truetruetruestartInvoker();
truetrue}
true}
truelong lastReceive = System.currentTimeMillis();
truelong lastAlertAt = lastReceive;
truewhile (isRunning()) {
truetruetry {
truetruetrue// 不是 autoCommit,则处理消费进度
truetruetrueif (!this.autoCommit) {
truetruetruetrueprocessCommits();
truetruetrue}
truetruetrueprocessSeeks();
truetruetrueif (this.logger.isTraceEnabled()) {
truetruetruetruethis.logger.trace("Polling (paused=" + this.paused + ")...");
truetruetrue}
truetruetrue// 拉取待消费的消息, 默认一秒拉取一次
truetruetrueConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
truetruetrueif (records != null && this.logger.isDebugEnabled()) {
truetruetruetruethis.logger.debug("Received: " + records.count() + " records");
truetruetrue}
truetruetrueif (records != null && records.count() > 0) {
truetruetruetrueif (this.containerProperties.getIdleEventInterval() != null) {
truetruetruetruetruelastReceive = System.currentTimeMillis();
truetruetruetrue}
truetruetruetrue// 如果autoCommit设置为true,则在该线程进行消息的处理
truetruetruetrueif (this.autoCommit) {
truetruetruetruetrueinvokeListener(records);
truetruetruetrue}
truetruetruetrue// 否则将记录提交到一个缓存队列中
truetruetruetrueelse {
truetruetruetruetrueif (sendToListener(records)) {
truetruetruetruetruetrueif (this.assignedPartitions != null) {
truetruetruetruetruetruetrue// avoid group management rebalance due to a slow
truetruetruetruetruetruetrue// consumer
truetruetruetruetruetruetruethis.consumer.pause(this.assignedPartitions);
truetruetruetruetruetruetruethis.paused = true;
truetruetruetruetruetruetruethis.unsent = records;
truetruetruetruetruetrue}
truetruetruetruetrue}
truetruetruetrue}
truetruetrue}
truetruetrue// 处理idle事件
truetruetrueelse {
truetruetruetrueif (this.containerProperties.getIdleEventInterval() != null) {
truetruetruetruetruelong now = System.currentTimeMillis();
truetruetruetruetrueif (now > lastReceive + this.containerProperties.getIdleEventInterval()
truetruetruetruetruetruetrue&& now > lastAlertAt + this.containerProperties.getIdleEventInterval()) {
truetruetruetruetruetruepublishIdleContainerEvent(now - lastReceive);
truetruetruetruetruetruelastAlertAt = now;
truetruetruetruetruetrueif (this.theListener instanceof ConsumerSeekAware) {
truetruetruetruetruetruetrueseekPartitions(getAssignedPartitions(), true);
truetruetruetruetruetrue}
truetruetruetruetrue}
truetruetruetrue}
truetruetrue}
truetruetruethis.unsent = checkPause(this.unsent);
truetrue}
truetruecatch (WakeupException e) {
truetruetruethis.unsent = checkPause(this.unsent);
truetrue}
truetruecatch (Exception e) {
truetruetrueif (this.containerProperties.getGenericErrorHandler() != null) {
truetruetruetruethis.containerProperties.getGenericErrorHandler().handle(e, null);
truetruetrue}
truetruetrueelse {
truetruetruetruethis.logger.error("Container exception", e);
truetruetrue}
truetrue}
true}
trueif (this.listenerInvokerFuture != null) {
truetruestopInvokerAndCommitManualAcks();
true}
truetry {
truetruethis.consumer.unsubscribe();
true}
truecatch (WakeupException e) {
truetrue// No-op. Continue process
true}
truethis.consumer.close();
trueif (this.logger.isInfoEnabled()) {
truetruethis.logger.info("Consumer stopped");
true}
}

kafka 消息确认之autoCommit原理分析

如果配置了enable.auto.commit=true, 那么在创建ConsumerCoordinator时,会提交一个定时任务,
每隔固定的时间auto.commit.interval.ms就会通过后台线程AutoCommitTask来完成消费进度的提交。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private class AutoCommitTask implements DelayedTask {
truepublic void run(final long now) {
truetrue//异步提交 消费进度, subscriptions.allConsumed() 获取每个Partition的消费进度
truetruecommitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback() {
truetruetrue@Override
truetruetruepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
truetruetruetrueif (exception == null) {
truetruetruetruetruereschedule(now + interval);
truetruetruetrue} else {
truetruetruetruetruelog.warn("Auto offset commit failed for group {}: {}", groupId, exception.getMessage());
truetruetruetruetruereschedule(now + interval);
truetruetruetrue}
truetruetrue}
truetrue});
true}
}

SubscriptionState

SubscriptionState
1
2
3
/* the list of partitions currently assigned */
// TopicPartitionState 里面包含了消费的进度, 它的值是在每次拉取数据时更新的。
private final Map<TopicPartition, TopicPartitionState> assignment;

Fetcher

1
2
3
4
5
6
private int append(Map<TopicPartition, List<ConsumerRecord<K, V>>> drained,
PartitionRecords<K, V> partitionRecords,
int maxRecords) {
true// 更新最新的消费进度
truesubscriptions.position(partitionRecords.partition, nextOffset);
}

kafka 消息非autoCommit实现分析

processCommits
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
private void processCommits() {
truehandleAcks();
truethis.count += this.acks.size();
truelong now;
trueAckMode ackMode = this.containerProperties.getAckMode();
trueif (!this.isManualImmediateAck) {
truetrue//这里有重复更新保存消费进度数据的Map
truetrueif (!this.isManualAck) {
truetruetrueupdatePendingOffsets();
truetrue}
truetrueboolean countExceeded = this.count >= this.containerProperties.getAckCount();
truetrueif (this.isManualAck || this.isBatchAck || this.isRecordAck
truetruetruetrue|| (ackMode.equals(AckMode.COUNT) && countExceeded)) {
truetruetrueif (this.logger.isDebugEnabled() && ackMode.equals(AckMode.COUNT)) {
truetruetruetruethis.logger.debug("Committing in AckMode.COUNT because count " + this.count
truetruetruetruetruetrue+ " exceeds configured limit of " + this.containerProperties.getAckCount());
truetruetrue}
truetruetruecommitIfNecessary();
truetruetruethis.count = 0;
truetrue}
truetrue// 处理 AckMode.TIME 或 AckMode.COUNT_TIME
truetrueelse {
truetruetruenow = System.currentTimeMillis();
truetruetrueboolean elapsed = now - this.last > this.containerProperties.getAckTime();
truetruetrue// 如果 当前时间距离上次ack的时间间隔大于配置项ackTime设置的值
truetruetrueif (ackMode.equals(AckMode.TIME) && elapsed) {
truetruetruetrueif (this.logger.isDebugEnabled()) {
truetruetruetruetruethis.logger.debug("Committing in AckMode.TIME " +
truetruetruetruetruetruetrue"because time elapsed exceeds configured limit of " +
truetruetruetruetruetruetruethis.containerProperties.getAckTime());
truetruetruetrue}
truetruetruetrue// 提交保存在 offsets 中的数据 (同步或异步)
truetruetruetruecommitIfNecessary();
truetruetruetruethis.last = now;
truetruetrue}
truetruetrue// 如果 当前时间距离上次ack的时间间隔大于配置项 ackTime 设置的值 或 待ack的记录数大于
truetruetrue// ackCount设置的值
truetruetrueelse if (ackMode.equals(AckMode.COUNT_TIME) && (elapsed || countExceeded)) {
truetruetruetrueif (this.logger.isDebugEnabled()) {
truetruetruetruetrueif (elapsed) {
truetruetruetruetruetruethis.logger.debug("Committing in AckMode.COUNT_TIME " +
truetruetruetruetruetruetruetrue"because time elapsed exceeds configured limit of " +
truetruetruetruetruetruetruetruethis.containerProperties.getAckTime());
truetruetruetruetrue}
truetruetruetruetrueelse {
truetruetruetruetruetruethis.logger.debug("Committing in AckMode.COUNT_TIME " +
truetruetruetruetruetruetruetrue"because count " + this.count + " exceeds configured limit of" +
truetruetruetruetruetruetruetruethis.containerProperties.getAckCount());
truetruetruetruetrue}
truetruetruetrue}
truetruetruetrue// 提交保存在 offsets 中的数据 (同步或异步)
truetruetruetruecommitIfNecessary();
truetruetruetruethis.last = now;
truetruetruetruethis.count = 0;
truetruetrue}
truetrue}
true}
}

handleAcks

逐条处理每一个待ack的记录

handleAcks
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private void handleAcks() {
trueConsumerRecord<K, V> record = this.acks.poll();
truewhile (record != null) {
truetrueif (this.logger.isTraceEnabled()) {
truetruetruethis.logger.trace("Ack: " + record);
truetrue}
truetrueprocessAck(record);
truetruerecord = this.acks.poll();
true}
}

private void processAck(ConsumerRecord<K, V> record) {
true// 如果AckMode的值是`AckMode.MANUAL_IMMEDIATE`
trueif (ListenerConsumer.this.isManualImmediateAck) {
truetruetry {
truetruetrue// 则直接同步更新(syncCommits默认值为true) 或 异步更新 zookeeper中保存的offset的值
truetruetrueackImmediate(record);
truetrue}
truetruecatch (WakeupException e) {
truetruetrue// ignore - not polling
truetrue}
true}
true// 如果不是 `AckMode.MANUAL_IMMEDIATE` 则只是将消费进度offset的值保存到一个Map中
trueelse {
truetrueaddOffset(record);
true}
}
```

### @EnableKafka

`@EnableKafka`注解配合`@Configuration`注解一起只用,会激活Kafka的基于注解驱动的消息消费功能。

```java EnableKafka
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(KafkaBootstrapConfiguration.class)
public @interface EnableKafka {
}

KafkaBootstrapConfiguration

KafkaBootstrapConfiguration 的主要功能有两个:

  1. 注册KafkaListenerAnnotationBeanPostProcessor, 该Bean主要用来处理@KafkaListener注解
  2. 注册KafkaListenerEndpointRegistry, 该Bean用于管理MessageListenerContainer
KafkaBootstrapConfiguration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Configuration
public class KafkaBootstrapConfiguration {

true@SuppressWarnings("rawtypes")
true@Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
true@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
truepublic KafkaListenerAnnotationBeanPostProcessor kafkaListenerAnnotationProcessor() {
truetruereturn new KafkaListenerAnnotationBeanPostProcessor();
true}

true@Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
truepublic KafkaListenerEndpointRegistry defaultKafkaListenerEndpointRegistry() {
truetruereturn new KafkaListenerEndpointRegistry();
true}
}

总结

  1. ackOnError 的默认值为true, 非autoCommit情况下,消费失败的记录会被添加到待ack的队列中。
  2. ListenerConsumererrorHandler 默认值是 LoggingErrorHandler,只会打印日志
文章目录
  1. 1. 前言
    1. 1.1. Producer 发送消息确认策略
  2. 2. Consumer
    1. 2.1. ConsumerFactory
    2. 2.2. MessageListenerContainer
    3. 2.3. KafkaDataListener
    4. 2.4. 消费消息例子
    5. 2.5. Kafka消息消费确认类型
      1. 2.5.1. 确认类型
    6. 2.6. RECORD
      1. 2.6.1. BATCH
      2. 2.6.2. TIME
      3. 2.6.3. COUNT
      4. 2.6.4. COUNT_TIME
      5. 2.6.5. MANUAL
      6. 2.6.6. MANUAL_IMMEDIATE
    7. 2.7. Kafka获取消息逻辑分析
      1. 2.7.1. 获取消息过程
    8. 2.8. kafka 消息确认之autoCommit原理分析
    9. 2.9. kafka 消息非autoCommit实现分析
      1. 2.9.1. handleAcks
    10. 2.10. KafkaBootstrapConfiguration
    11. 2.11. 总结
|