消息生产者
在上一章中,我们已经详细学习了Topic、Partition、副本、Offset和Consumer Group的基本概念,这篇笔记我们聚焦于消息的生产者,结合代码具体介绍消息生产者的基本用法、配置参数、可靠性等进阶内容。
创建Kafka Producer
使用Kafka客户端发送消息,核心类是KafkaProducer。创建它需要传入一个Properties配置对象,最基础的配置需要三项:集群地址、Key序列化器和Value序列化器。
package com.gacfox.demo;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class Main {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
// ...
}
}
}
代码中,bootstrap.servers配置了Broker的地址和端口,生产环境下Kafka一般采用集群部署,此时可以使用逗号分隔写多个地址,理论上这里其实不需要列出集群中所有节点,前面第一章我们搭建Kafka时就提到过客户端连接Broker是两阶段式的,Broker会返回集群元数据,因此客户端连接其中任意一个节点后会自动发现其余节点,但实际上我们还是建议全部列出或者至少列出2到3个地址,这是为了保证某个节点宕机时客户端仍能初始化成功。至于key.serializer和value.serializer配置了消息Key和消息值的序列化器,我们这里都是用字符串序列化器。
KafkaProducer是线程安全的,整个应用生命周期内只需要创建一个实例,在多个线程中共享使用即可,不过注意使用完毕后应调用producer.close()释放资源,KafkaProducer实现了Java的Closeable接口,上面代码中我们使用try-with-resource语法自动调用了close()方法。
序列化器
Kafka在网络上传输的消息本质是二进制字节数组,因此在发送前需要将Key和Value序列化,在消费端再反序列化还原。Kafka客户端内置了几种常见的序列化器,如下表格。
| 序列化器 | 说明 |
|---|---|
| StringSerializer | 字符串序列化器,将Java的String转为UTF-8编码的字节数组 |
| ByteArraySerializer | 字节数组序列化器,直接发送byte[]不做额外处理 |
| IntegerSerializer | Integer类型序列化器,将Integer类型转为4字节二进制(Big Endian)发送 |
| LongSerializer | Long类型序列化器,将Long类型转为8字节二进制(Big Endian)发送 |
可以看到Kafka客户端库并没有内置JSON等序列化器来处理自定义对象,实际开发中,使用Kafka传输消息通常使用JSON配合StringSerializer,或Avro、Protobuf配合ByteArraySerializer实现。前者简单直观、可读性好;后者性能更高,适合对吞吐量有更高要求的系统。
ProducerRecord
向Kafka发消息需要将消息封装为ProducerRecord对象,它有很多重载的构造函数,常用的包含以下几种。
// 最简形式,指定topic和消息值
new ProducerRecord<>(String topic, V value);
// 指定topic、消息键和消息值
new ProducerRecord<>(String topic, K key, V value);
// 指定topic、分区号、消息键和消息值
new ProducerRecord<>(String topic, Integer partition, K key, V value);
Kafka中对于简单的日志收集场景,使用最简单形式即可;但如果涉及订单等对可靠性有更多具体业务要求的场景,消息的Key(键)就变得非常重要了,它与消息写入哪个分区和消息的顺序性保证有关,我们将在后文详细介绍。
发送消息
Kafka提供了3种发送消息的写法。
异步发送(不等待返回结果)
Kafka中发送消息最简单的方式是异步发送(不等待返回结果),下面代码中调用send()后立刻返回,客户端侧不关心发送是否成功。这种方式吞吐量最高,但如果发送失败也不会得到任何通知,只适用于允许少量消息丢失的场景,例如日志采集、监控埋点等。
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
ProducerRecord<String, String> record = new ProducerRecord<>("test.topic", "Hello, world!");
producer.send(record);
}
异步发送带回调
这种方式也是异步发送消息,客户端不会阻塞主线程,但它支持注册回调函数,客户端的发送结果可通过回调函数通知我们。
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
ProducerRecord<String, String> record = new ProducerRecord<>("test.topic", "Hello, world!");
producer.send(record, (metadata, exception) -> {
if (exception != null) {
log.error("消息发送失败,topic: {}", record.topic(), exception);
} else {
log.info("发送成功,分区: {},Offset: {}", metadata.partition(), metadata.offset());
}
});
}
回调中,metadata包含了这条消息成功发送到Broker后的一些重要信息,它实际上是Kafka的RecordMetadata类型,包括Topic、Partition、Offset等,如果发送出错,exception参数会包含具体的异常信息。
这种发送方式有一点需要注意,Kafka Producer有自己的IO线程(内部称作Sender线程),它负责从缓冲区取出消息打包成请求发送给Broker,以及等待Broker响应。而对于异步发送消息带回调场景,Kafka客户端也会将回调函数的逻辑调度到内部的IO线程上执行。这可能导致一个问题,如果你在回调中做了耗时操作,这回阻塞IO线程,导致Producer的消息发送效率下降。所以建议在回调中只做轻量操作,比如日志记录或计数,不要做阻塞型的耗时IO操作,对于耗时操作,建议将其提交给单独的线程池中执行。
同步发送
异步发送消息时,send()方法的返回值其实是Future<RecordMetadata>,如果我们调用Future的get()方法,发送过程就变成同步式的了。
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
ProducerRecord<String, String> record = new ProducerRecord<>("test.topic", "Hello, world!");
try {
RecordMetadata metadata = producer.send(record).get();
log.info("发送成功,分区: {},Offset: {}", metadata.partition(), metadata.offset());
} catch (ExecutionException | InterruptedException e) {
log.error("消息发送失败,topic: {}", record.topic(), e);
}
}
metadata参数与之前的异步发送回调函数参数一样,包含了发送结果信息,但这里我们的发送是同步式的。同步发送能确保每条消息的发送结果都被客户端侧同步感知,但由于每次都需要等待Broker响应,这阻塞了客户端的主线程,在对性能有较高要求的场景中会造成吞吐量的大幅下降。
消息Key的作用
很多初学者在发送消息时会忽略Key参数,把它留空。实际上,Key是Kafka消息路由和顺序保障的核心,它有两个主要用途。
1. 决定消息写入哪个Partition:当消息指定了Key时,Kafka客户端会对Key做哈希然后对分区数取模,相同Key的消息总会落到同一个Partition。这意味着同一个订单号、同一个用户ID产生的消息会按顺序追加到同一个Partition中,这是Kafka实现局部顺序性的基础。
2. 包含业务语义:Key可以携带业务标识,例如订单ID、用户ID、设备ID等,方便消费端识别消息归属。
不指定Key时,消息会采用轮询策略分配到各个Partition,此时写入的负载是最均衡的,但前面我们说过,Kafka的消息在多个Partition间没有顺序保证,如果你要求消息必须按顺序处理,那就需要指定Key参数。
Producer进阶配置
Producer的各种配置都被封装在了ProducerConfig类中,一些文章中使用这个类的静态常量,一些文章直接用它的字符串值,这两种配置方式是完全等价的,我们可以在这个类的源码中搜索所有的配置字段。
发送确认 acks
acks参数用于控制Producer在认为一次发送成功之前,需要等待多少个副本确认收到消息,这是可靠性最关键的配置之一。
| acks取值 | 含义 | 可靠性 | 吞吐量 |
|---|---|---|---|
0 |
不等待任何确认,发完即认为成功 | 最低,消息可能丢失 | 最高 |
1(默认值) |
等待Leader副本写入成功 | 中等,Leader宕机可能丢消息 | 较高 |
all(或-1) |
等待所有ISR副本都写入成功 | 最高 | 较低 |
对于业务消息(订单、支付、通知等),建议使用acks=all;对于日志、埋点等允许少量丢失的场景,可以考虑acks=1甚至acks=0。
重试 retries
retries参数是发送失败后的重试次数,默认值在新版本Kafka客户端中是Integer.MAX_VALUE,配合delivery.timeout.ms(默认2分钟)共同控制重试行为。一般情况下不需要手动调整,但需要注意重试本身会带来消息重复发送的风险,这个问题在后面幂等性部分会详细讨论。
批量与缓冲
实际上,Kafka的Producer并不是每次调用send()就立刻发出一个网络请求,而是在底层有一套批量缓冲机制,这是它实现高吞吐的重要手段。
buffer.memory:Producer端缓冲区的总内存大小,默认32MB。如果缓冲区满了,send()调用会阻塞,直到超时抛出异常。
batch.size:每个批次的最大字节数,默认16KB。消息会先积攒到批次中,等批次满了再一起发送。调大这个值可以提高吞吐量,但会增加单条消息的平均延迟。
linger.ms:发送等待时间,默认为0(表示有消息就立即发)。配置为大于0的值后,Producer会等待最多linger.ms毫秒,让更多消息凑成一批再发送,这是吞吐量和延迟之间的经典权衡,对于追求吞吐的场景可以设置为5到100毫秒。不过注意即使配置为0也不代表没有批量缓冲,缓冲区仍会暂存消息,如果瞬间有大量消息涌入,它们可能被放入同一个批次发送,配置为0只是表达不主动延迟发送。
同时请求数 max.in.flight.requests.per.connection
这个配置用来控制每个连接上允许“未确认发送”的请求数量。具体来说,Kafka客户端其实允许生产者在等待服务端ACK的期间,把多个请求排队在同一个TCP连接上,虽然IO线程默认只有一个,但它也可以轮询多个未确认请求,利用异步非阻塞网络操作(NIO)实现“同时”发送,这也是一种提高系统吞吐量的手段。max.in.flight.requests.per.connection的默认值是5。
如果将其配置为1,每次只同时发送1条消息,即必须等待确认后才能发送下一条,这可以避免消息乱序,但吞吐量低;配置>1时,意味着客户端生产者可以同时发送多条消息,系统吞吐量更高,但在重试时可能出现消息乱序。
消息丢失场景分析
前面我们提到过acks,很多人以为将其配置为all就万无一失了,实际上消息丢失的风险点存在多个,Kafka本身不能保证消息100%不丢,严格来说在现实世界中没有分布式消息系统能保证真正100%不丢,我们只能说是通过配置大幅降低丢失概率,让它在绝大多数生产场景中接近“可靠”,这里我们来逐一分析。
Producer端丢消息:
场景一:acks=0或acks=1。acks=0时表示发完即走,Broker是否写入客户端侧完全不知道;acks=1时只等Leader确认,如果Leader在同步给Follower之前宕机,消息会丢失。
场景二:未调用close就退出。实际上Kafka客户端的Producer中,调用close()时内部会调用另一个flush()方法,前面我们说过Producer有批量和缓冲机制,这个方法正是用于清空缓冲区的,如果关闭Producer时没有正确调用close(),缓冲区中的消息还没发出进程就结束了,此时消息会丢失。
场景三:发送失败后没有重试或没有感知到失败。使用了异步发送但没有注册回调,发送失败无感知。
Broker端丢消息:
场景四:Broker使用了不安全的刷盘策略。Kafka默认依赖操作系统的Page Cache异步刷盘,如果Broker宕机时数据还在内存中尚未落盘,消息就会丢失。这可以通过配置flush.messages和flush.ms强制同步刷盘,但这会显著影响性能,通常不建议开启。
场景五:副本数配置为1。只有一个副本时,那台Broker宕机数据显然就丢失了。
一个相对安全的配置组合如下:在Topic层面设置副本数大于等于3,在Producer端配置acks=all,并配合Broker端的min.insync.replicas=2(要求至少2个ISR副本确认),这样即使1个Broker宕机,消息也不会丢失。
消息幂等性问题
开启重试是保障消息可靠性的必要手段,但重试会带来一个经典问题:如果消息已经写入Broker成功,但Broker在返回ACK时网络超时,Producer会认为发送失败并触发重试,导致同一条消息被写入两次。这在实际开发中可能造成很多严重问题,比如用户下单1次生成2个订单,用户转账1笔对方受到2笔转账等,在类似这些场景下,我们必须保证消息幂等性,好在Kafka内置提供了消息的幂等性机制。
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
开启幂等性后,Kafka客户端会为每个Producer实例分配1个唯一的Producer ID(PID),并为发送到每个Partition的每条消息维护一个单调递增的序列号(Sequence Number)。Broker收到消息后,会比对序列号:如果和预期一致就正常写入,如果是重复的序列号就直接丢弃,从而实现在单个Producer会话、单个Partition范围内的精确一次写入。
sequenceDiagram
participant P as Producer
participant B as Broker
P->>B: 发送消息 (PID=1, Seq=5)
B->>B: 写入成功
Note over B: ACK返回途中网络超时
P->>B: 重试发送 (PID=1, Seq=5)
B->>B: 发现Seq=5已存在,丢弃重复消息
B->>P: 返回ACK
开启幂等性时,Kafka会自动调整一些关联配置:acks会被强制设置为all,retries会被设置为最大值,max.in.flight.requests.per.connection会被限制为不超过5。这些调整都是为了配合幂等性机制正常工作,开发者无需手动干预。
当然,只是配置个enable.idempotence也并不代表着就万事大吉了,它有一些明确的限制,它只保证单个Producer实例的单个Partition内不重复,不能跨Partition、跨Producer实例保证幂等。如果Producer进程重启(PID会变化),幂等性保障就失效了,此时需要考虑事务机制或消费端的幂等设计。
消息顺序性问题
前面我们反复提到过,Kafka保证的是单个Partition内消息的顺序性,即消息在同一个Partition内按写入顺序追加,消费时也按同样的顺序读取。跨Partition之间没有任何顺序保证。要实现业务上的局部顺序(例如同一个订单的所有状态变更消息按顺序消费),正确的做法就是将业务标识作为Key,让同一订单的消息始终落在同一个Partition。
但有一种情况会破坏这个顺序保证:max.in.flight.requests.per.connection大于1时,如果第一批消息发送失败触发重试,第二批消息已经先发送成功了,重试的第一批消息就会排在第二批之后导致乱序。
解决方案有两种:
方案一:将max.in.flight.requests.per.connection设置为1,确保同一时刻只有一个请求在飞,但这会降低吞吐量。
方案二:开启幂等性,幂等性机制在允许max.in.flight.requests.per.connection最大为5的同时,也能保证Partition内消息的顺序性,Broker会对乱序到达的消息进行缓冲排序。
因此,开启幂等性是同时解决重复和乱序问题的推荐做法。
事务消息
Kafka的幂等性机制解决了单Partition内的幂等性和消息顺序性问题,但如果一次业务操作需要向多个Topic或多个Partition原子性地写入消息(要么全部成功,要么全部失败),此时就需要用到事务机制了。
Kafka事务的典型应用场景是“读-处理-写”链路,举例来说:从Topic A消费一条消息,处理后同时向Topic B和Topic C各写入一条消息,支持事务意味着要求这三个操作作为一个原子单元,即使中途发生失败,也不会出现部分写入的情况,这通常在流处理框架(如Kafka Streams、Flink)中会用到。
使用事务时,首先需要为Producer配置全局唯一的transactional.id(进程重启后也必须沿用同一个ID,Kafka会用它来恢复未完成的事务)并开启幂等性。
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "demo-service-producer-1");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
下面是一个例子。
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "demo-service-producer-1");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
// 初始化事务,只需调用一次
producer.initTransactions();
try {
// 开启事务
producer.beginTransaction();
// 在一个事务中向多个Topic写消息
producer.send(new ProducerRecord<>("order.topic", "ORDER_001", "{\"status\":\"PAID\"}"));
producer.send(new ProducerRecord<>("inventory.topic", "ORDER_001", "{\"action\":\"DEDUCT\"}"));
producer.send(new ProducerRecord<>("notification.topic", "ORDER_001", "{\"type\":\"SMS\"}"));
// 提交事务
producer.commitTransaction();
log.info("事务提交成功");
} catch (Exception e) {
// 发生异常,中止事务
producer.abortTransaction();
log.error("事务中止", e);
}
}
此外,消费端如果需要只读取已提交的事务消息,需要将isolation.level设置为read_committed(默认是read_uncommitted),这一点我们在Consumer章节再详细介绍。
Kafka的事务机制依赖以下几个关键角色。
消息生产者(Producer):消息生产者必须配置全局唯一的标识TRANSACTIONAL_ID_CONFIG,Kafka需要用它来恢复未完成的事务,此外还需要开启幂等性确保单Partition内不会重复消息。
事务协调器(Transaction Coordinator, TC):Kafka Broker负责管理事务状态,每个Producer会被分配对应一个TC,它会记录事务的生命周期Ongoing → Committing → Complete/Aborted。
事务日志(Transaction Log):Kafka在内部有一个特殊的Topic(通常叫 __transaction_state)来存储事务元数据,这个日志记录每个事务的状态和涉及的Partition元信息。
具体来说,事务协调大致流程如下。
首先Producer初始化的同时初始化事务。
producer.initTransactions();
- Producer向事务协调器注册,获取事务状态
- 如果之前有未完成的事务,事务协调器会帮助Producer恢复未提交的事务
- 每个
transactional.id对应一个epoch,用于防止旧事务与新事务冲突
接下来开启事务。
producer.beginTransaction();
- Producer向事务协调器标记事务状态为Ongoing
- Producer将即将发送的消息缓存在内部缓存,不直接对外可见
接下来发送消息。
producer.send(record);
- 消息写入对应Partition,消息带有事务标记
- Broker将消息临时存储,不会立即对Consumer可见
- TC会记录涉及的Partition列表
最后提交消息。
producer.commitTransaction();
- Producer告诉TC事务可以提交
- TC标记事务为
COMMITTING - TC更新
__transaction_state中的日志,写入事务状态(提交完成) - 当事务被标记为提交后,所有相关Partition上的消息才对Consumer可见
- 如果提交过程中发生异常,协调器会重试提交操作,保证事务最终一致性
如果发生终止事务。
producer.abortTransaction();
- Producer或事务协调器发现异常时,将事务标记为 Aborted
- 事务涉及的消息对Consumer不可见,Producer会丢弃未提交的消息