SpringKafka

spring-kafka是Spring提供的一个Kafka Java SDK的封装库,SpringBoot项目中推荐使用这个库实现Spring与Kafka的集成,这篇笔记我们以SpringBoot3.x为例进行介绍spring-kafka的使用。spring-kafka非常简单,毕竟Kafka的难点其实不在SDK的使用上,如果你是从很早的传统Spring工程时代走过来的,甚至很有可能写过一模一样的框架。

注意:这篇笔记假设你已经非常了解Kafka和Kafka Java SDK了,对于一些基础概念我们不会重复介绍,如果你对这些还不了解,可以参考软件工程/应用架构和中间件/kafka相关章节。

引入Maven依赖

SpringBoot通过Dependency Management维护了spring-kafka的版本号,我们引入如下依赖即可。

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

使用spring-kafka

spring-kafka对Kafka的Producer和Consumer进行了封装。对于消息的生产者,spring-kafka提供了KafkaTemplate类,我们可以在任意Spring Bean中依赖注入这个类向Topic写入消息;对于消息的消费者,spring-kafka提供了相关的注解,我们可以将一个Spring Bean的方法标注为消息消费者,框架会自动从Topic中拉取消息并回调我们的方法。

框架配置

spring-kafka内部实现了自动配置,我们通过简单配置即可使用。

application.properties

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

配置中,Broker地址如果是集群模式可以使用逗号分隔配置多个地址,此外我们还配置了Producer和Consumer的序列化和反序列化器,这些配置其实和Kafka Java SDK的配置都是类似的。对于一些进阶的配置,spring-kafka把它封装到了一个Map<String, String>类型的properties字段里面,我们也都可以在配置文件中编写,例如开启生产者的幂等性机制。

spring.kafka.producer.properties.enable.idempotence=true

实现Producer

对于Producer,我们直接依赖注入KafkaTemplate,并调用其send()方法即可。

package com.gacfox.demo.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service("messageProducerService")
public class MessageProducerService {
    private final KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public MessageProducerService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String topic, String msg) {
        kafkaTemplate.send(topic, msg);
    }
}

如果需要带消息Key发送,send()也有对应的重载方法。

public CompletableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data);

同步式发送消息

如上方式发送消息默认是异步且不管发送结果如何的,send()方法返回值的CompletableFuture<SendResult<K, V>>,如果想要同步发送消息,可以使用get()阻塞,这样我们就能得到具体的返回值SendResult<K, V>,而这个SendResult<K, V>其实就内部封装了Kafka Java SDK的RecordMetadata,我们可以从这个对象中获取很多实用的信息。

package com.gacfox.demo.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;

import java.util.concurrent.ExecutionException;

@Slf4j
@Service("messageProducerService")
public class MessageProducerService {
    private final KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public MessageProducerService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String topic, String msg) {
        try {
            SendResult<String, String> result = kafkaTemplate.send(topic, msg).get();
            log.info("发送成功, 分区: {}, offset: {}", result.getRecordMetadata().partition(), result.getRecordMetadata().offset());
        } catch (InterruptedException | ExecutionException e) {
            log.error("发送失败: ", e);
        }
    }
}

异步式带回调发送消息

如果需要异步发送并带回调,也是基于CompletableFuture机制实现。

package com.gacfox.demo.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Slf4j
@Service("messageProducerService")
public class MessageProducerService {
    private final KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public MessageProducerService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String topic, String msg) {
        kafkaTemplate.send(topic, msg)
                .thenApply(result -> {
                    log.info("发送成功, 分区: {}, offset: {}", result.getRecordMetadata().partition(), result.getRecordMetadata().offset());
                    return result;
                })
                .exceptionally(ex -> {
                    log.error("发送失败: ", ex);
                    return null;
                });
    }
}

实现Consumer

spring-kafka将Consumer封装为了采用注解注册的机制,如下添加@KafkaListener后,Consumer会在应用程序启动时注册。

package com.gacfox.demo.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class MessageConsumer {
    @KafkaListener(topics = "test.topic", groupId = "group0")
    public void listen(String message) {
        log.info("收到消息: {}", message);
    }
}

@KafkaListener标注的方法中,参数就是消息对象,并且最简单的方式就是直接实用消息的Value类型作为参数,由于我们的消息Value是字符串类型,因此这里也采用String类型接收。

不过如果你想获得更多信息,可以直接使用Kafka Java SDK的ConsumerRecord对象来接收消息。

package com.gacfox.demo.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class MessageConsumer {
    @KafkaListener(topics = "test.topic", groupId = "group0")
    public void listen(ConsumerRecord<String, String> record) {
        log.info("===收到消息===");
        log.info("key: {}", record.key());
        log.info("value: {}", record.value());
        log.info("partition: {}", record.partition());
        log.info("offset: {}", record.offset());
    }
}

开启批量处理

我们可以看到,spring-kafka其实把Consumer封装成单条消息处理的模式了,如果你还是想批量处理,需要额外添加以下配置。

spring.kafka.listener.type=batch
package com.gacfox.demo.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.List;

@Slf4j
@Component
public class MessageConsumer {
    @KafkaListener(topics = "test.topic", groupId = "group0")
    public void listen(List<ConsumerRecord<String, String>> records) {
        for (ConsumerRecord<String, String> record : records) {
            log.info("===收到消息===");
            log.info("key: {}", record.key());
            log.info("value: {}", record.value());
            log.info("partition: {}", record.partition());
            log.info("offset: {}", record.offset());
        }
    }
}

注意spring.kafka.listener.type的默认值是single,如果你没有配置这个选项而是参数直接使用List会报错。此外,即使配置为single,虽然spring-kafka会把消息处理的模式封装成单条,但底层Kafka客户端的Consumer还是批量poll的。

手动提交Offset

spring-kafka对手动提交Offset做了进一步封装,要实现手动提交Offset,我们需要先添加相关配置,因为Offset默认是自动提交的。

spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual

enable-auto-commit设置为false表示不自动提交,ack-mode设置为manual表示手动控制ACK。对于第二个配置,实际上我们知道Kafka Java SDK的Consumer拉取消息的批量式的,而spring-kafka对其进行了包装,我们可以逐条处理消息,ack-mode使用默认值batch时,一次poll的所有消息处理完成后直接触发ACK,而设置为manual表示需要手动标记消息已处理,一次poll的所有消息都被手动标记完成后再触发ACK。

具体提交时,我们还需要在消费方法上添加一个Acknowledgment对象,提交Offset调用acknowledgment.acknowledge()方法。

package com.gacfox.demo.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class MessageConsumer {
    @KafkaListener(topics = "test.topic", groupId = "group0")
    public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
        log.info("===收到消息===");
        log.info("key: {}", record.key());
        log.info("value: {}", record.value());
        log.info("partition: {}", record.partition());
        log.info("offset: {}", record.offset());

        acknowledgment.acknowledge();
    }
}

并发消费

spring-kafka对并发消费也做了封装,我们直接在@KafkaListener注解上指定concurrency参数就可以控制启动的消费线程数,它们可以被视为Consumer Group中的多个Consumer。注意这个参数是字符串类型的,这不是设计失误,而是为了支持SpEL表达式,我们这里也可以使用类似@KafkaListener(concurrency = "${app.consumer-service.concurrency}")的写法。

package com.gacfox.demo.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class MessageConsumer {
    @KafkaListener(topics = "test.topic", groupId = "group0", concurrency = "3")
    public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
        log.info("===收到消息===");
        log.info("key: {}", record.key());
        log.info("value: {}", record.value());
        log.info("partition: {}", record.partition());
        log.info("offset: {}", record.offset());

        acknowledgment.acknowledge();
    }
}

重试和死信队列(DLQ)

Kafka本身并没有内置Consumer处理失败超过最大重试次数时发送DLQ的机制,但spring-kafka在框架层面帮我们实现了。@RetryableTopic注解实现了基于kafka Topic的延迟重试机制,值得注意的是它底层其实不是简单的阻塞并延迟重试,而是消费失败 → 生产到retry topic → 再由Consumer重新消费retry topic的循环,因此并不会阻塞Partation的消费。这也意味着使用@RetryableTopic需要新建2个Topic,一个是retry topic,另一个作为DLQ。

我们这里新建test.topic.retrytest.topic.dlt(毕竟在Kafka里没有Queue,叫dlq有点奇怪,因此起名qlt)。这个起名是有意而为之的,因为@RetryableTopic直接中可以直接配置这两个Topic的后缀。下面例子代码演示了相关用法,如下配置会最多进行3次重试,默认每隔1秒重试1次。进入死信队列的消息会被@DltHandler标注的方法消费,它不需要手动提交Offset。

package com.gacfox.demo.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class MessageConsumer {
    @RetryableTopic(
            attempts = "3",
            dltTopicSuffix = ".dlt",
            retryTopicSuffix = ".retry"
    )
    @KafkaListener(topics = "test.topic", groupId = "group0", concurrency = "3")
    public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
        log.info("===收到消息===");
        log.info("key: {}", record.key());
        log.info("value: {}", record.value());
        log.info("partition: {}", record.partition());
        log.info("offset: {}", record.offset());

        // 假设这里无论如何都报错
        throw new RuntimeException("因某种原因失败");

        // acknowledgment.acknowledge();
    }

    @DltHandler
    public void handleDlt(ConsumerRecord<String, String> record) {
        log.error("===进入死信队列===");
        log.info("key: {}", record.key());
        log.info("value: {}", record.value());
        log.info("partition: {}", record.partition());
        log.info("offset: {}", record.offset());

        // ... 具体业务处理,例如发送告警通知等
    }
}

重试的时间策略是可配置的,它支持指数退避策略,虽然默认是每隔1秒重试1次,这类似于multiplier设置为了1.0,你也可以调整其中的参数。

@RetryableTopic(
        attempts = "3",
        dltTopicSuffix = ".dlt",
        retryTopicSuffix = ".retry",
        backoff = @Backoff(
                delay = 1000,
                multiplier = 2.0,
                maxDelay = 10000
        )
)

重试的异常类型也是可配置的,下面例子中,我们只对RuntimeException(及其子类)重试,其它异常类型不会重试而是直接进入DLQ。

@RetryableTopic(
        attempts = "3",
        dltTopicSuffix = ".dlt",
        retryTopicSuffix = ".retry",
        backoff = @Backoff(
                delay = 1000,
                multiplier = 2.0,
                maxDelay = 10000
        ),
        include = {RuntimeException.class}
)
作者:Gacfox
版权声明:本网站为非盈利性质,文章如非特殊说明均为原创,版权遵循知识共享协议CC BY-NC-ND 4.0进行授权,转载必须署名,禁止用于商业目的或演绎修改后转载。