消息消费者
Consumer(消费者)是Kafka数据流转的“目的地”,它负责从Topic中读取消息并进行具体的业务处理。这篇笔记我们详细学习Consumer的核心机制,并掌握如何编写稳定可靠的消费代码。
创建Kafka Consumer
下面例子创建了一个最基本的Consumer,它至少需要指定4个参数,Broker地址、消息Key反序列化器、消息Value反序列化器和消费者组ID。
package com.gacfox.demo;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;
public class Main {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group0");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList("test.topic"));
// ...
}
}
}
Consumer的配置中,前面3个配置参数在Producer章节我们已经详细介绍过了,至于消费者组ID,在Kafka中Consumer必须是成组的,即使只有一个Consumer也需要组ID。此外,Consumer必须订阅Topic,订阅的Topic可以是多个,使用List类型指定,此外订阅也支持正则表达式,例如consumer.subscribe(Pattern.compile("order.*"));。
消息的poll机制
Kafka Consumer采用的是pull(拉)模型,而不是push(推)模型。这意味着Consumer需要主动向Broker请求数据,而不是Broker主动推送。这样设计的好处是Consumer可以按自己的处理能力控制消费速度避免被压垮。Consumer拉取消息代码例子如下。
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group0");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList("test.topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
log.info("partition={}, offset={}, key={}, value={}",
record.partition(), record.offset(), record.key(), record.value());
}
}
}
poll()方法的参数是超时时间,如果在这段时间内没有新消息方法会返回空集合。通常我们会在一个循环中持续调用poll(),这样Consumer就能不断地拉取并处理消息。这里一定要注意,poll()方法不仅仅是拉取消息,你必须定期调用poll(),否则超过一定时间后Consumer会被认为已离线,触发Rebalance。
Consumer Group机制
在Kafka中,Consumer必须属于某个消费者组(Consumer Group)。即使只有一个Consumer,它也需要指定group.id参数。Consumer Group的核心规则如下:
- 一条消息会被广播给所有Consumer Group,但在同一个Consumer Group内一条消息只会被组内的一个Consumer消费
- 同一个Partition同一时间只能被组内一个Consumer消费
我们看下面这个例子。
graph LR
T[Topic: ORDER<br/>3个分区] --> P0[Partition 0]
T --> P1[Partition 1]
T --> P2[Partition 2]
P0 --> C1A[Consumer A<br/>Group: group1]
P1 --> C1A
P2 --> C1B[Consumer B<br/>Group: group1]
P0 --> C2A[Consumer C<br/>Group: group2]
P1 --> C2A
P2 --> C2A
在这个例子中,group1有两个消费者,Partition 0和1分配给了Consumer A,Partition 2分配给了Consumer B;group2只有一个消费者Consumer C,它消费所有3个分区。两个组都能收到完整的消息,互不影响。
分区分配策略
当一个Consumer Group有多个Consumer时,Kafka需要决定哪个Consumer负责消费哪些Partition。Kafka提供了几种分区分配策略。
Range(范围分配):按分区序号范围分配,是分配策略的默认值。假设有10个分区,3个Consumer,则可能Consumer 0负责分区0-3,Consumer 1负责分区4-6,Consumer 2负责分区7-9。
RoundRobin(轮询分配):将所有分区依次轮流分配给Consumer,负载更均衡。
Sticky(粘性分配):尽可能保持之前的分配方案,减少Rebalance时的分区迁移。
代码中,我们可以通过配置指定分配策略。
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
Offset管理
什么是Offset
Offset是Kafka中每条消息在分区中的唯一序号,从0开始递增,Consumer可以通过记录自己消费到的Offset来跟踪消费进度。
graph LR
subgraph Partition 0
M0[offset=0] --> M1[offset=1] --> M2[offset=2] --> M3[offset=3] --> M4[offset=4]
end
M2 -.->|当前消费到| C[Consumer]
style M2 fill:#90EE90
Broker内部会对每个Consumer Group消费的每个Partition维护一个Offset,这样即使Consumer重启,也能从上次停止的位置继续消费。
Offset自动提交 vs Offset手动提交
Kafka默认开启Offset自动提交模式。
// 默认配置
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
如上配置,自动提交的工作机制为每隔auto.commit.interval.ms(默认5秒),客户端会根据配置定时提交Offset,提交是异步的,不会阻塞消费者线程。
自动提交的优点是简单,但缺点也很明显,如果在处理消息时发生异常而此时Offset已经自动提交,Consumer Group就无法再收到这条消息了。而Offset手动提交能让我们更精确的控制提交时机。
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
// 假设这里指定处理消息的业务逻辑
processMessage(record.value());
} catch (Exception e) {
// 处理失败,不提交offset,下次会重新消费
log.error("处理消息失败", e);
continue;
}
}
// 所有消息都处理成功后再提交
consumer.commitSync();
}
值得注意的是,手动提交会有一些潜在的问题。我们可以看到,Kafka消费消息时poll的结果也是批量的,手动提交时,如果只有一条消息处理失败,其它消息业务处理成功了,但整个批次并没有一个“部分提交”功能,实际上其它Consumer后续还会收到已经处理过的消息;另一个小概率问题是两次poll之间发生了Rebalance,假如客户端对某个分区的权限恰好被收回了,客户端试图提交Offset时就会收到CommitFailedException异常,也没提交成功,其它Consumer又会重新收到这批消息,造成了消息的重复消费。这几种情况下通常都需要我们手动在Consumer实现消息的幂等性。
除此之外,手动提交还引出了一个经典的无限重试问题。代码中,我们对报错的处理是不提交直接continue,但如果某条消息就是触发了下游服务processMessage()中的Bug或消息数据本身存在异常,那一批消息里一旦带上这条消息,它就永远无法提交成功,且会永远循环出现,影响Consumer的消费。Kafka本身没有处理这种问题的机制,这需要我们手动记录某条消息的最大重试次数,如果一条消息连续3次或5次都没处理成功,那么就不能让再继续重复被消费了,实际开发中遇到这种情况一般是记录异常到数据库或实现DLQ(死信队列)请求人工介入。Kafka本身没有DLQ,一般做法是新建一个Topic作为DLQ,并允许超过最大重试次数的消息将原始信息写入其中,便于后续重放和问题定位。
前面consumer.commitSync()采用的是同步提交Offset,我们也可以使用异步提交。
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
log.error("提交Offset失败", exception);
}
});
但异步提交又可能产生另一个问题,“Offset回退”,即已经提交过的消费进度被更小的Offset覆盖了,导致消费者下次从更早的位置重新消费,从而出现重复消费。对于关键业务,同步提交可能更加可靠,不过无论哪种提交方式,我们都需要实现幂等性作为兜底机制。
earliest vs latest
当一个新的消费者组首次订阅Topic时,它不知道从哪里开始消费。此时auto.offset.reset配置会起作用。
// 从最早的消息开始消费(分区头部)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 从最新的消息开始消费(分区尾部,默认值)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
一般来说,数据处理任务、账单计算等需要处理全量数据的应使用earliest,实时监控、日志采集等只关心最新数据的使用latest。
注意:这个配置只在消费者组第一次订阅时才会生效,如果消费者组已有Offset记录,会从记录的Offset继续消费。
消费语义:至少一次、至多一次、精确一次
至少一次(At Least Once)
至少一次(At Least Once)表达消息可能被重复消费,但不会丢失。具体代码中,我们的逻辑是先处理,再提交Offset,如果处理成功但提交失败,下次会重新消费导致重复消费。
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processMessage(record.value());
}
consumer.commitSync();
}
虽然至少一次(At Least Once)可能导致重复消费,但我们可以在业务逻辑中实现消费的幂等性,我们可以基于Key或某种业务主键标识消息,或者基于业务逻辑的状态机判断,使其只处理一次,达到幂等性的目的。
至多一次(At Most Once)
至多一次(At Most Once)中消息可能丢失,但不会重复消费。代码中,我们先提交Offset,后处理。
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
consumer.commitSync();
for (ConsumerRecord<String, String> record : records) {
processMessage(record.value());
}
}
精确一次(Exactly Once)
精确一次(Exactly Once)表达消息严格只处理一次,既不会丢失也不会重复,这需要Producer开启幂等性和事务支持,幂等保证了同一条消息即使重试也只写入一次,事务机制能让生产者操作和Offset提交在一次原子操作中,消费者的isolation.level也需要配合设置为read_committed,只消费已提交事务的消息。
Kafka的精确一次(Exactly Once)主要服务于流处理(read‑process‑write)场景,对于Consumer来说,通常涉及接收消息,处理后再调用Producer写入另一个Topic。
// 消费者配置关闭自动提交,设置事务隔离级别
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
// 生产者配置事务 ID
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "demo-service-producer-1");
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
producer.initTransactions();
下面代码实现这个事务逻辑。
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// 准备消费位移信息,用于事务提交
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
producer.beginTransaction();
try {
for (ConsumerRecord<String, String> record : records) {
// 业务处理并产生输出消息
producer.send(new ProducerRecord<>("output.topic", record.key(), process(record.value())));
// 记录每个分区的消费位移
offsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
}
// 将消费位移提交关联到当前事务
producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
log.error("事务异常,回滚", e);
}
}
如果你只是单纯的Consumer Only场景(无下游写入),精准一次需要外部状态存储配合(如在数据库中把业务操作和Offset记录放在同一本地事务中),Kafka本身的事务机制主要服务于“消费‑处理‑生产”这种流式链路,普通的Consumer Only可以用“手动提交 + 幂等业务处理”来实现等效于精确一次的效果。
处理Rebalance
Rebalance是Consumer Group重新分配Partition的过程,Rebalance会在以下情况触发:
- Consumer加入或离开
- Consumer被认为意外离线(心跳超时)
- Topic分区数变化
Rebalance期间,Consumer Group会短暂停止消费,即“stop-the-world”,如果Rebalance频繁发生,会严重影响消费吞吐量。如何减少Rebalance?我们可以考虑以下几种策略。
方式一:增加消费者的心跳超时时间。如果我们的Consumer负载很高,或者处理一条消息需要阻塞较长时间,可以适当增大心跳超时时间,避免Consumer被频繁误认为离线。
// 心跳超时时间,默认10秒
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
方式二:增加poll间隔,理由与方式一类似。
// 两次poll的最大间隔,默认5分钟
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "600000");
方式三:减少单次poll的数据量,降低单次poll的消息处理任务量。
// 单次poll返回的最大记录数,默认500
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
此外在Consumer端,我们还可以监听Rebalance事件。
consumer.subscribe(Collections.singletonList("test.topic"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 分区被回收前调用
log.info("分区被回收: {}", partitions);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 分区分配后调用
log.info("分区被分配: {}", partitions);
}
});
消息积压
消息积压(Lag)是指生产速度大于消费速度,导致未消费消息堆积。我们可以通过Kafka UI或Kafka提供的命令脚本查看Lag。
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test.group
消息积压主要有以下常见原因:
- Consumer处理逻辑太慢:处理单条消息耗时长(数据库写入、外部API调用等)或没有使用批量操作
- Consumer数量不足:Consumer数远少于Partition数,导致单个Consumer负载过高
- Consumer频繁Rebalance:导致Consumer反复暂停和恢复
- 网络或资源瓶颈:Consumer所在机器CPU、内存、网络打满
改善这种情况最直接的方法是增加Consumer实例,但注意消费者数不应超过分区数,否则多余的Consumer会空闲。其次是实现批量和异步的业务处理,Kafka Consumer收消息本身就是批量式的,处理消息时,例如写入数据库就可以实现批量写入方法,写入逻辑也可以提交为异步任务,提升消息的处理效率。