SpringKafka
spring-kafka是Spring提供的一个Kafka Java SDK的封装库,SpringBoot项目中推荐使用这个库实现Spring与Kafka的集成,这篇笔记我们以SpringBoot3.x为例进行介绍spring-kafka的使用。spring-kafka非常简单,毕竟Kafka的难点其实不在SDK的使用上,如果你是从很早的传统Spring工程时代走过来的,甚至很有可能写过一模一样的框架。
注意:这篇笔记假设你已经非常了解Kafka和Kafka Java SDK了,对于一些基础概念我们不会重复介绍,如果你对这些还不了解,可以参考软件工程/应用架构和中间件/kafka相关章节。
引入Maven依赖
SpringBoot通过Dependency Management维护了spring-kafka的版本号,我们引入如下依赖即可。
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
使用spring-kafka
spring-kafka对Kafka的Producer和Consumer进行了封装。对于消息的生产者,spring-kafka提供了KafkaTemplate类,我们可以在任意Spring Bean中依赖注入这个类向Topic写入消息;对于消息的消费者,spring-kafka提供了相关的注解,我们可以将一个Spring Bean的方法标注为消息消费者,框架会自动从Topic中拉取消息并回调我们的方法。
框架配置
spring-kafka内部实现了自动配置,我们通过简单配置即可使用。
application.properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
配置中,Broker地址如果是集群模式可以使用逗号分隔配置多个地址,此外我们还配置了Producer和Consumer的序列化和反序列化器,这些配置其实和Kafka Java SDK的配置都是类似的。对于一些进阶的配置,spring-kafka把它封装到了一个Map<String, String>类型的properties字段里面,我们也都可以在配置文件中编写,例如开启生产者的幂等性机制。
spring.kafka.producer.properties.enable.idempotence=true
实现Producer
对于Producer,我们直接依赖注入KafkaTemplate,并调用其send()方法即可。
package com.gacfox.demo.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service("messageProducerService")
public class MessageProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
@Autowired
public MessageProducerService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String topic, String msg) {
kafkaTemplate.send(topic, msg);
}
}
如果需要带消息Key发送,send()也有对应的重载方法。
public CompletableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data);
同步式发送消息
如上方式发送消息默认是异步且不管发送结果如何的,send()方法返回值的CompletableFuture<SendResult<K, V>>,如果想要同步发送消息,可以使用get()阻塞,这样我们就能得到具体的返回值SendResult<K, V>,而这个SendResult<K, V>其实就内部封装了Kafka Java SDK的RecordMetadata,我们可以从这个对象中获取很多实用的信息。
package com.gacfox.demo.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import java.util.concurrent.ExecutionException;
@Slf4j
@Service("messageProducerService")
public class MessageProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
@Autowired
public MessageProducerService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String topic, String msg) {
try {
SendResult<String, String> result = kafkaTemplate.send(topic, msg).get();
log.info("发送成功, 分区: {}, offset: {}", result.getRecordMetadata().partition(), result.getRecordMetadata().offset());
} catch (InterruptedException | ExecutionException e) {
log.error("发送失败: ", e);
}
}
}
异步式带回调发送消息
如果需要异步发送并带回调,也是基于CompletableFuture机制实现。
package com.gacfox.demo.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Slf4j
@Service("messageProducerService")
public class MessageProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
@Autowired
public MessageProducerService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String topic, String msg) {
kafkaTemplate.send(topic, msg)
.thenApply(result -> {
log.info("发送成功, 分区: {}, offset: {}", result.getRecordMetadata().partition(), result.getRecordMetadata().offset());
return result;
})
.exceptionally(ex -> {
log.error("发送失败: ", ex);
return null;
});
}
}
实现Consumer
spring-kafka将Consumer封装为了采用注解注册的机制,如下添加@KafkaListener后,Consumer会在应用程序启动时注册。
package com.gacfox.demo.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MessageConsumer {
@KafkaListener(topics = "test.topic", groupId = "group0")
public void listen(String message) {
log.info("收到消息: {}", message);
}
}
@KafkaListener标注的方法中,参数就是消息对象,并且最简单的方式就是直接实用消息的Value类型作为参数,由于我们的消息Value是字符串类型,因此这里也采用String类型接收。
不过如果你想获得更多信息,可以直接使用Kafka Java SDK的ConsumerRecord对象来接收消息。
package com.gacfox.demo.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MessageConsumer {
@KafkaListener(topics = "test.topic", groupId = "group0")
public void listen(ConsumerRecord<String, String> record) {
log.info("===收到消息===");
log.info("key: {}", record.key());
log.info("value: {}", record.value());
log.info("partition: {}", record.partition());
log.info("offset: {}", record.offset());
}
}
开启批量处理
我们可以看到,spring-kafka其实把Consumer封装成单条消息处理的模式了,如果你还是想批量处理,需要额外添加以下配置。
spring.kafka.listener.type=batch
package com.gacfox.demo.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.List;
@Slf4j
@Component
public class MessageConsumer {
@KafkaListener(topics = "test.topic", groupId = "group0")
public void listen(List<ConsumerRecord<String, String>> records) {
for (ConsumerRecord<String, String> record : records) {
log.info("===收到消息===");
log.info("key: {}", record.key());
log.info("value: {}", record.value());
log.info("partition: {}", record.partition());
log.info("offset: {}", record.offset());
}
}
}
注意spring.kafka.listener.type的默认值是single,如果你没有配置这个选项而是参数直接使用List会报错。此外,即使配置为single,虽然spring-kafka会把消息处理的模式封装成单条,但底层Kafka客户端的Consumer还是批量poll的。
手动提交Offset
spring-kafka对手动提交Offset做了进一步封装,要实现手动提交Offset,我们需要先添加相关配置,因为Offset默认是自动提交的。
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual
enable-auto-commit设置为false表示不自动提交,ack-mode设置为manual表示手动控制ACK。对于第二个配置,实际上我们知道Kafka Java SDK的Consumer拉取消息的批量式的,而spring-kafka对其进行了包装,我们可以逐条处理消息,ack-mode使用默认值batch时,一次poll的所有消息处理完成后直接触发ACK,而设置为manual表示需要手动标记消息已处理,一次poll的所有消息都被手动标记完成后再触发ACK。
具体提交时,我们还需要在消费方法上添加一个Acknowledgment对象,提交Offset调用acknowledgment.acknowledge()方法。
package com.gacfox.demo.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MessageConsumer {
@KafkaListener(topics = "test.topic", groupId = "group0")
public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
log.info("===收到消息===");
log.info("key: {}", record.key());
log.info("value: {}", record.value());
log.info("partition: {}", record.partition());
log.info("offset: {}", record.offset());
acknowledgment.acknowledge();
}
}
并发消费
spring-kafka对并发消费也做了封装,我们直接在@KafkaListener注解上指定concurrency参数就可以控制启动的消费线程数,它们可以被视为Consumer Group中的多个Consumer。注意这个参数是字符串类型的,这不是设计失误,而是为了支持SpEL表达式,我们这里也可以使用类似@KafkaListener(concurrency = "${app.consumer-service.concurrency}")的写法。
package com.gacfox.demo.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MessageConsumer {
@KafkaListener(topics = "test.topic", groupId = "group0", concurrency = "3")
public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
log.info("===收到消息===");
log.info("key: {}", record.key());
log.info("value: {}", record.value());
log.info("partition: {}", record.partition());
log.info("offset: {}", record.offset());
acknowledgment.acknowledge();
}
}
重试和死信队列(DLQ)
Kafka本身并没有内置Consumer处理失败超过最大重试次数时发送DLQ的机制,但spring-kafka在框架层面帮我们实现了。@RetryableTopic注解实现了基于kafka Topic的延迟重试机制,值得注意的是它底层其实不是简单的阻塞并延迟重试,而是消费失败 → 生产到retry topic → 再由Consumer重新消费retry topic的循环,因此并不会阻塞Partation的消费。这也意味着使用@RetryableTopic需要新建2个Topic,一个是retry topic,另一个作为DLQ。
我们这里新建test.topic.retry和test.topic.dlt(毕竟在Kafka里没有Queue,叫dlq有点奇怪,因此起名qlt)。这个起名是有意而为之的,因为@RetryableTopic直接中可以直接配置这两个Topic的后缀。下面例子代码演示了相关用法,如下配置会最多进行3次重试,默认每隔1秒重试1次。进入死信队列的消息会被@DltHandler标注的方法消费,它不需要手动提交Offset。
package com.gacfox.demo.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MessageConsumer {
@RetryableTopic(
attempts = "3",
dltTopicSuffix = ".dlt",
retryTopicSuffix = ".retry"
)
@KafkaListener(topics = "test.topic", groupId = "group0", concurrency = "3")
public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
log.info("===收到消息===");
log.info("key: {}", record.key());
log.info("value: {}", record.value());
log.info("partition: {}", record.partition());
log.info("offset: {}", record.offset());
// 假设这里无论如何都报错
throw new RuntimeException("因某种原因失败");
// acknowledgment.acknowledge();
}
@DltHandler
public void handleDlt(ConsumerRecord<String, String> record) {
log.error("===进入死信队列===");
log.info("key: {}", record.key());
log.info("value: {}", record.value());
log.info("partition: {}", record.partition());
log.info("offset: {}", record.offset());
// ... 具体业务处理,例如发送告警通知等
}
}
重试的时间策略是可配置的,它支持指数退避策略,虽然默认是每隔1秒重试1次,这类似于multiplier设置为了1.0,你也可以调整其中的参数。
@RetryableTopic(
attempts = "3",
dltTopicSuffix = ".dlt",
retryTopicSuffix = ".retry",
backoff = @Backoff(
delay = 1000,
multiplier = 2.0,
maxDelay = 10000
)
)
重试的异常类型也是可配置的,下面例子中,我们只对RuntimeException(及其子类)重试,其它异常类型不会重试而是直接进入DLQ。
@RetryableTopic(
attempts = "3",
dltTopicSuffix = ".dlt",
retryTopicSuffix = ".retry",
backoff = @Backoff(
delay = 1000,
multiplier = 2.0,
maxDelay = 10000
),
include = {RuntimeException.class}
)