Kafka简介和集群搭建

Kafka是Linkedin公司开源的高性能消息中间件,是一款基于发布/订阅模式的消息队列,由Scala和Java语言开发。Kafka设计之初的目标和传统MQ有一定区别,Kafka是为了解决超大量数据的实时传输,例如前端埋点统计信息、系统日志数据等,Kafka主要是为了考虑解决如下问题:

  1. 吞吐量/延时
  2. 消息持久化
  3. 负载均衡和故障转移
  4. 伸缩性

官方网站:https://kafka.apache.org/

注:Kafka有一套自己的API,因此其使用和JMS、AMQP等又有些不同。虽然消息中间件解决的问题都相似,但我们最好还是先了解其中的基本概念再使用,否则可能造成对一些功能的误用。

Kafka集群搭建

Kafka的安装包可以在官网下载,我这里使用的是3.2.0版本。Kafka早期版本依赖Zookeeper集群进行分布式管理,但新版本可以不使用Zookeeper,而使用内置的Kraft模式,不仅性能更优,我们也省去了搭建Zookeeper集群的麻烦。我们这里准备了3台主机:

192.168.1.164
192.168.1.165
192.168.1.166

将压缩包解压缩后,我们可以找到config/kraft/server.properties配置文件,基于Kraft搭建Kafka集群可以基于该配置文件进行修改。一个例子配置如下:

# 当前节点的角色,如下配置即使用Kraft模式
process.roles=broker,controller
# 当前节点ID,需要唯一
node.id=0
# 集群节点配置
controller.quorum.voters=0@192.168.1.164:9093,1@192.168.1.165:9093,2@192.168.1.166:9093

# 监听地址和端口
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://192.168.1.164:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

# 数据存储目录
log.dirs=/home/ubuntu/data
num.partitions=1
num.recovery.threads.per.data.dir=1

offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

group.initial.rebalance.delay.ms=0

其中重要的配置都已经标出,Kafka默认使用9092端口提供服务,9093端口用于集群协调通信。我们这里将如上配置文件复制3份,注意要对应修改node.id和监听地址IP,修改完成后分发到三台主机上即可。

修改好配置文件后,我们还需要生成数据存储目录。Kafka默认使用/tmp分区下目录存储数据,然而一般我们是需要其数据持久化的,使用临时目录是不行的。我们在上面配置文件中将其修改为了/home/ubuntu/data,但光有这个配置还不够,还需要在其中重新创建必要的文件。

生成随机UUID:

./bin/kafka-storage.sh random-uuid

用同一个UUID在3台主机上分别执行如下命令:

./bin/kafka-storage.sh format -t FvMlnvveS5iw8p4d1nyYtw -c ./config/server0.properties

执行后,我们的Kafka即启动成功。

创建Topic

Kafka没有官方提供的图形化管理界面,但安装包的bin目录下自带了很多命令行工具,其中kafka-topics.sh可用于创建Topic。例子如下:

./bin/kafka-topics.sh --create --bootstrap-server 192.168.1.164:9092 --topic TEST_TOPIC --partitions 1 --replication-factor 1
  • --bootstrap-server:某一Kafka节点的主机和端口,可以指定任意一个,也可以逗号分隔指定多个(某一个节点宕机会自动切换到其它节点)
  • --topic:具体创建的Topic名
  • --partitions:Topic分区数
  • --replication-factor:Topic副本数

创建完成后,我们可以用如下命令查看:

./bin/kafka-topics.sh --describe --bootstrap-server 192.168.1.164:9092 --topic TEST_TOPIC

收发消息

bin目录下也自带了收发消息的客户端,我们可以用其进行测试。启动消息消费客户端:

./bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.164:9092 --topic TEST_TOPIC --group group0
  • --group:该参数指定消费者组ID,可以随意起名,如果不指定默认会生成一个新的消费者组。Kafka内部会记录某个组当前消费到了哪条消息,如果生成新的消费者组,那么就只能从订阅后新产生的消息进行消费,而不会读取到积压的消息。

启动发送消息的交互式客户端:

./bin/kafka-console-producer.sh --bootstrap-server 192.168.1.164:9092 --topic TEST_TOPIC

这样我们发送消息后,在接收端就可以接收到了。

Java中使用Kafka

Java中使用Kafka需要引入其客户端库,Maven依赖如下:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.2.0</version>
</dependency>

除此之外,该库依赖SLF4J输出日志,因此我们的工程中还需要具备SLF4J以及具体的日志实现,这里就不多介绍了。

Kafka消息消费者:

// 创建KafkaConsumer
Properties props = new Properties();
// 集群节点
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.164:9092,192.168.1.165:9092,192.168.1.166:9092");
// 序列化器
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 消费者组ID
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group0");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// 接收消息
String topic = "TEST_TOPIC";
consumer.subscribe(Collections.singletonList(topic));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println(record.value());
    }
}

Kafka消息生产者:

// 创建KafkaProducer
Properties props = new Properties();
// 集群节点
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.164:9092,192.168.1.165:9092,192.168.1.166:9092");
// 序列化器
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// 发送消息
String topic = "TEST_TOPIC";
String msg = "Hello, world!";
ProducerRecord<String, String> record = new ProducerRecord<>(topic, msg);
producer.send(record);

producer.close();

kafka在Java中使用比较简单,但这里要注意KafkaProducer是线程安全的,我们可以在多线程中共享KafkaProducer;但KafkaConsumer则不是线程安全的,多线程情况下我们需要创建多个KafkaConsumer。

作者:Gacfox
版权声明:本网站为非盈利性质,文章如非特殊说明均为原创,版权遵循知识共享协议CC BY-NC-ND 4.0进行授权,转载必须署名,禁止用于商业目的或演绎修改后转载。
Copyright © 2017-2024 Gacfox All Rights Reserved.
Build with NextJS | Sitemap