Kafka是Linkedin公司开源的高性能消息中间件,是一款基于发布/订阅模式的消息队列,由Scala和Java语言开发。Kafka设计之初的目标和传统MQ有一定区别,Kafka是为了解决超大量数据的实时传输,例如前端埋点统计信息、系统日志数据等,Kafka主要是为了考虑解决如下问题:
官方网站:https://kafka.apache.org/
注:Kafka有一套自己的API,因此其使用和JMS、AMQP等又有些不同。虽然消息中间件解决的问题都相似,但我们最好还是先了解其中的基本概念再使用,否则可能造成对一些功能的误用。
Kafka的安装包可以在官网下载,我这里使用的是3.2.0
版本。Kafka早期版本依赖Zookeeper集群进行分布式管理,但新版本可以不使用Zookeeper,而使用内置的Kraft模式,不仅性能更优,我们也省去了搭建Zookeeper集群的麻烦。我们这里准备了3台主机:
192.168.1.164
192.168.1.165
192.168.1.166
将压缩包解压缩后,我们可以找到config/kraft/server.properties
配置文件,基于Kraft搭建Kafka集群可以基于该配置文件进行修改。一个例子配置如下:
# 当前节点的角色,如下配置即使用Kraft模式
process.roles=broker,controller
# 当前节点ID,需要唯一
node.id=0
# 集群节点配置
controller.quorum.voters=0@192.168.1.164:9093,1@192.168.1.165:9093,2@192.168.1.166:9093
# 监听地址和端口
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://192.168.1.164:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
# 数据存储目录
log.dirs=/home/ubuntu/data
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
group.initial.rebalance.delay.ms=0
其中重要的配置都已经标出,Kafka默认使用9092
端口提供服务,9093
端口用于集群协调通信。我们这里将如上配置文件复制3份,注意要对应修改node.id
和监听地址IP,修改完成后分发到三台主机上即可。
修改好配置文件后,我们还需要生成数据存储目录。Kafka默认使用/tmp
分区下目录存储数据,然而一般我们是需要其数据持久化的,使用临时目录是不行的。我们在上面配置文件中将其修改为了/home/ubuntu/data
,但光有这个配置还不够,还需要在其中重新创建必要的文件。
生成随机UUID:
./bin/kafka-storage.sh random-uuid
用同一个UUID在3台主机上分别执行如下命令:
./bin/kafka-storage.sh format -t FvMlnvveS5iw8p4d1nyYtw -c ./config/server0.properties
执行后,我们的Kafka即启动成功。
Kafka没有官方提供的图形化管理界面,但安装包的bin
目录下自带了很多命令行工具,其中kafka-topics.sh
可用于创建Topic。例子如下:
./bin/kafka-topics.sh --create --bootstrap-server 192.168.1.164:9092 --topic TEST_TOPIC --partitions 1 --replication-factor 1
--bootstrap-server
:某一Kafka节点的主机和端口,可以指定任意一个,也可以逗号分隔指定多个(某一个节点宕机会自动切换到其它节点)--topic
:具体创建的Topic名--partitions
:Topic分区数--replication-factor
:Topic副本数创建完成后,我们可以用如下命令查看:
./bin/kafka-topics.sh --describe --bootstrap-server 192.168.1.164:9092 --topic TEST_TOPIC
bin
目录下也自带了收发消息的客户端,我们可以用其进行测试。启动消息消费客户端:
./bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.164:9092 --topic TEST_TOPIC --group group0
--group
:该参数指定消费者组ID,可以随意起名,如果不指定默认会生成一个新的消费者组。Kafka内部会记录某个组当前消费到了哪条消息,如果生成新的消费者组,那么就只能从订阅后新产生的消息进行消费,而不会读取到积压的消息。启动发送消息的交互式客户端:
./bin/kafka-console-producer.sh --bootstrap-server 192.168.1.164:9092 --topic TEST_TOPIC
这样我们发送消息后,在接收端就可以接收到了。
Java中使用Kafka需要引入其客户端库,Maven依赖如下:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.0</version>
</dependency>
除此之外,该库依赖SLF4J输出日志,因此我们的工程中还需要具备SLF4J以及具体的日志实现,这里就不多介绍了。
Kafka消息消费者:
// 创建KafkaConsumer
Properties props = new Properties();
// 集群节点
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.164:9092,192.168.1.165:9092,192.168.1.166:9092");
// 序列化器
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 消费者组ID
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group0");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 接收消息
String topic = "TEST_TOPIC";
consumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
Kafka消息生产者:
// 创建KafkaProducer
Properties props = new Properties();
// 集群节点
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.164:9092,192.168.1.165:9092,192.168.1.166:9092");
// 序列化器
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 发送消息
String topic = "TEST_TOPIC";
String msg = "Hello, world!";
ProducerRecord<String, String> record = new ProducerRecord<>(topic, msg);
producer.send(record);
producer.close();
kafka在Java中使用比较简单,但这里要注意KafkaProducer是线程安全的,我们可以在多线程中共享KafkaProducer;但KafkaConsumer则不是线程安全的,多线程情况下我们需要创建多个KafkaConsumer。