基本概念和管理操作

正确使用Kafka提供的功能需要了解一些基本概念,这篇笔记我们简单介绍使用kafka涉及的组件概念,以及相关管理操作。

基本概念

Broker:一个Kafka处理节点就是一个Broker,消息生产者和消费者都需要和Broker通信,Kafka集群通常由多个Broker组成。

Topic:主题,可以理解为一个“消息队列”,之前所说的发送消息和消费消息都是需要针对某一个指定的Topic进行的。

Partition:分区或者叫分片,分布式系统中常用分区的概念,比如数据有20TB,Kafka集群有3个节点,然而每个Kafka节点的硬盘只有8TB,那么我们就需要创建至少3个分区,让数据落在不同节点上。

Replica:副本,分布式系统为了实现高可用,当一个节点宕机后仍能正常工作,数据通常会存储副本,Kafka中可以将一个分区的数据同步存储到其它节点上,实现副本数据。

Producer:生产者,即产生消息的客户端。

Consumer Group:消息消费者组,Kafka中没有单独的消费者,消费者是基于groupId定义成组的。消费者必须指定groupId参数,即使只有一个消费者,只要它有唯一的groupId,也会自动被识别为一个消费者组。Kafka的消息是发布/订阅模式,一条消息会被广播给所有消费者组,但一个组内只有一个消费者会收到消息。

关于消费者组,我们还需要知道,Kafka会持久化消息数据,但其内部会维护一个offset变量,记录某一个组消费到了哪条消息,新订阅的组默认会从订阅后新产生的消息开始消费,而已经注册的组即使消费者组全部宕机,重新启动后也能读取到未处理的消息。

管理操作

Kafka默认没有提供图形界面管理工具,只能使用命令行工具进行一些信息的查看和配置,因此不是很方便。不过目前有一些第三方的管理面板,这里不多介绍。

集群管理

启动Kafka服务(守护进程模式):

./bin/kafka-server-start.sh -daemon ./config/server.properties
  • -daemon:指定该参数以守护进程模式启动,默认会启动为前台进程

查看kafka进程可以使用JDK的jps工具:

关闭Kafka服务:

./bin/kafka-server-stop.sh

Topic管理

创建Topic:

./bin/kafka-topics.sh --create --bootstrap-server 192.168.1.164:9092 --topic TEST_TOPIC --partitions 1 --replication-factor 1
  • --bootstrap-server:某一Kafka节点的主机和端口,可以指定任意一个,也可以逗号分隔指定多个(某一个节点宕机会自动切换到其它节点)
  • --topic:具体创建的Topic名
  • --partitions:Topic分区数
  • --replication-factor:Topic副本数

查看所有Topic:

./bin/kafka-topics.sh --bootstrap-server 192.168.1.164:9092 --list

该命令会列出所有Topic。

查看Topic详细信息:

./bin/kafka-topics.sh --bootstrap-server 192.168.1.164:9092 --describe --topic TEST_TOPIC

删除Topic(该命令需谨慎操作):

./bin/kafka-topics.sh --bootstrap-server 192.168.1.164:9092 --delete --topic TEST2_TOPIC

我们使用Java等客户端向Kafka发送消息时,默认如果Topic不存在会自动创建,其实这里不建议这样做。我们一般都希望Topic由运维人员手动创建,具体的Java工程中如果配置的Topic不存在直接报错即可,否则一旦笔误写错Topic名比较难调试。在Kafka配置文件中,如下配置可以关闭Topic自动创建:

auto.create.topics.enable=false

ConsumerGroup管理

查询消费者组列表:

./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.164:9092 --list

查看指定消费者状态:

./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.164:9092 --describe --group group0

输出结果如下:

  • CURRENT-OFFSET:当前消费者偏移
  • LOG-END-OFFSET:消息偏移
  • LAG:积压消息

如果线上环境积压消息越来越大且没有好转的迹象,说明消息消费者的性能已经无法满足当前系统的消息生产量,我们就得想办法解决了。

删除一个消费者组(该命令需谨慎操作):

./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.164:9092 --delete --group group0

将消费者组的偏移量归零(该命令需谨慎操作):

./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.164:9092 --group group0 --reset-offsets --all-topics --to-earliest --execute

偏移量归零后,该消费者组会从Kafka持久化的最初的消息开始拉取。

作者:Gacfox
版权声明:本网站为非盈利性质,文章如非特殊说明均为原创,版权遵循知识共享协议CC BY-NC-ND 4.0进行授权,转载必须署名,禁止用于商业目的或演绎修改后转载。