kafka

~信~仰~ 2019-06-09

Consumer zookeeper kafka TOPIC kafka-manager

kafka架构

Kafka是分布式、分区的、多副本的、多订阅者,高吞吐率,支持水平扩展,基于zookeeper协调的分布式消息系统。常见用于web/nginx日志、访问日志,消息服务等。主要应用场景是:日志收集系统和消息系统。

一个典型的 kafka 集群包含若干 Producer,若干个 Broker(kafka )、若干个 Consumer Group,以及一个 zookeeper 集群。kafka 通过 zookeeper 管理集群配置及服务协同。Producer 使用 push 模式将消息发布到 broker,consumer 通过监听使用 pull 模式从 broker 订阅并消费消息。

多个 broker 协同工作,producer 和 consumer 部署在各个业务逻辑中,三者通过 zookeeper 管理协调请求和转发。这样就组成了一个高性能的分布式消息发布和订阅系统。

和其他 mq 中间件不同的是,producer 发送消息到 broker 的过程是 push,而 consumer 从 broker 消费消息的过程是 pull,consumer主动去拉数据,而不是 broker 把数据主动发送给 consumer。
1

必须了解的概念

Topic

kafka中没有queue的概念,只有topic,topic 是逻辑存储概念,是一系列消息的集合。每条发送到 kafka 集群的消息都会属于一个topic。物理上来说,不同的 topic 的消息是分开存储的,每个 topic 可以有多个生产者向它发送消息,也可以有多个消费者去消费其中的消息。

partition

一个topic可以有多个partition。topic理解为表,而partition为分区,同mysql的分区概念。同一 topic 下的不同partition中包含的消息是不同的。每个消息在被添加到分区时,都会被分配一个 offset(称之为偏移量),它是消息在此分区中的唯一编号,kafka 通过 offset保证消息在分区内的顺序,offset 的顺序不跨分区,即 kafka只保证在同一个分区内的消息是有序的。

设置多个partition后,这些partition会接近均匀的分布在kafka各个节点之上。

每一条消息发送到 broker 时,会根据 partition 的规则选择存储到哪一个 partition。如果 partition 规则设置合理,那么所有的消息会均匀的分布在不同的partition中。

Broker

Kafka集群包含一个或多个服务器,这种服务器被称为broker;

Producer

负责发布消息到Kafka broker;

Consumer

消息消费者,从Kafka broker读取消息的客户端;

Consumer Group

逻辑上的订阅者,每个Consumer都从属于一个特定的Consumer Group,消息的单播和多播都是基于消费组来实现的,消费组中的消费者不是越多越好,消费者数量超过分区数量时,回导致消费者分配不到资源,造成资源浪费。

Offset

:每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息。

Message

消息是Kafka中最基本的数据单元,主要由key和value构成;真正有效的消息是value数据,key只作为消息路由分区使用,kafka根据key决定将当前消息存储在哪个分区。

消息处理发送与分区存储

消息发送

从kafka1.0以后默认异步发送,将消息放入后台队列中,然后由单独线程去从队列中取出消息然后发送。

消息分区路由

消息要保存到分区中,分区选择算法为:

如果在发消息的时候指定了分区,则消息投递到指定的分区

如果没有指定分区,但是消息的key不为空,则基于key的哈希值来选择一个分区

如果既没有指定分区,且消息的key也是空,则用轮询的方式选择一个分区根据key进行哈希取

消息存储

kafka消息全部持久化到磁盘,其使用日志文件的方式来保存。Partition 以文件的形式存储在文件系统中,partition命名规则为:

<topic_name>-<partition_id>

比如创建一个名为 firstTopic 的 topic,其中有 3 个 partition,那么在kafka 的数据目录(/data/kafka-log)中就有 3 个文件:

firstTopic-0

firstTopic-1

firstTopic-2

消息消费

同一时刻,一条消息只能被group中的一个消费者实例消费,一个topic下的每个partition只从属于group中的一个消费者,不可能出现group中的两个消费者消费同一个分区。

为了提高消费端的消费能力,一般会通过多个consumer 去消费同一个 topic ,如果 分区为1、2、3,消费者也为1、2、3,那么会给每个消费者分配且仅分配一个分区来消费。如果消费者还有个4,那么4会空闲,因为每个分区都有了一个消费者。如果消费者为1、2,那么其中有一个消费者消费两个分区,另一个消费一个分区。

所以消费者的数量不要大于分区,否则会造成资源浪费。消费者的数量最好控制为分区的数量,这样能为消费者合理分配分区。

为什么一个分区只能由一个消费者消费?

Kafka消息在分区内有序,消费者消费消息时也要按照分区内消息顺序进行消费,有序消费就要保证消息是由消费者主动拉取的(pull),其次还要保证一个分区只能由一个消费者负责。kafka消费者自己可以控制读取消息的offset,如果两个消费者负责同一个分区,就有可能C1读到2,C1还没处理完,C2已经读到3了,因为这就相当于多线程读取同一个消息,造成消息处理的重复,且不能保证消息的顺序,这就跟主动推送(push)无异。

消费者的offset是保存在zookeeper中的,通过kafka监控工具可以看到。

新建的group没有offset,这时候就可以选择none或latest、earlist策略。如果是earlist,新建group时,会从最早的一条消息开始取,即使这些消息可能被其他group已经消费过了,又因为kafka消息是全部持久化到磁盘,所以还能拿到。

Rebalance

kafka consuemr 的 rebalance 机制规定了一个 consumer group 下的所有 consumer 如何达成一致来分配订阅 topic的每个分区。
当出现以下几种情况时,kafka 会进行一次rebalance分区分配操作:

1. 同一个 consumer group 内新增了消费者

2. 消费者离开当前所属的 consumer group,比如主动停机或者宕机

3. topic 新增了分区(也就是分区数量发生了变化)

coordinator 角色来执行 Rebalance 以及管理 group,当 group 中的 consumer 启动的时候,它会去和 kafka server 确定谁是它们组的 coordinator。sever会返回一个负载最 小 的 broker 节点的 id 作为该组的 coordinator,该组内所有成员都会和该 coordinator 进行协调通信。

rebalance 的过程分为两个步骤,Join 和 Sync:

join: 在这一步中,所有的成员都会向 coordinator 发送 joinGroup 的请求。一旦所有成员都发送了 joinGroup 请求,那么 coordinator 会选择一个 consumer 担任 leader 角色,并把组成员信息和订阅信息发送消费者。

sync:leader制定分区分配方案发送给coordinator,coordinator将分配方案发送到组中所有消费者。这样所有成员都知道自己应该消费哪个分区。

generation_id:年代信息,类似于 zookeeper 的 epoch,每一轮 rebalance 都会导致 generation_id 递增,防止脑裂问题。

保存消费端的消费位置

kafka 通过 offset 保证消息在分区内的顺序,对于应用层的消费来说,每次消费一个消息并且提交以后,会保存当前消费到的最近的一个 offset。

kafka 提供了__consumer_offsets_* 的 topic , 保存了每个 consumer group 某一时刻提交的 offset 信息。__consumer_offsets 默认有50 个分区。

查看指定group offset信息保存在哪个__consumer_offsets公式:

Math.abs(“groupid”.hashCode())%groupMetadataTopicPartitionCount

零拷贝

kafka在磁盘上是顺序写入的,这样读取时效率更高。然而还有其他方式提高读写性能。例如消费者获取消息时,服务器先从硬盘读取数据到内存,然后把内存中的数据原封不动的通过 socket 发送给消费者。步骤如下:

▪ 操作系统将数据从磁盘读入到内核空间的页缓存

▪ 应用程序将数据从内核空间读入到用户空间缓存中

▪ 应用程序将数据写回到内核空间到 socket 缓存中

▪ 操作系统将数据从 socket 缓冲区复制到网卡缓冲区,以便将数据经网络发出

这个过程涉及到 4 次上下文切换以及 4 次数据复制,但过程中数据没有变化,仅仅是从磁盘复制到网卡缓冲区。
2

“零拷贝”可以去掉这些没必要的数据复制操作,同时也会减少上下文切换次数。Linux 中通过 sendfile 系统调用来完成。Java 提供了访问这个系统调用的方法:

FileChannel.transferTo API

这样只需要一次拷贝,操作系统将数据直接从页缓存发送到网络上。

消息的文件存储机制

kafka日志的存储,例如查看 test-num-0 分区日志:

root@bogon:/data/kafka-logs/test-num-0# ll
total 20488
drwxr-xr-x  2 root root     4096 May 30 06:31 ./
drwxr-xr-x 27 root root     4096 May 31 20:02 ../
-rw-r--r--  1 root root 10485760 May 30 06:32 00000000000000000000.index
-rw-r--r--  1 root root        0 May 30 06:31 00000000000000000000.log
-rw-r--r--  1 root root 10485756 May 30 06:32 00000000000000000000.timeindex
-rw-r--r--  1 root root        0 May 30 06:31 leader-epoch-checkpoint

producer 不断发送消息,会引起 partition 文件的无限扩张,这样对消息文件的维护以及日志清理带来很大困难。因此 kafka 通过分段的方式将 Log 分为多个 LogSegment,LogSegment 是逻辑上的概念,一个 LogSegment 对应磁盘上的日志文件和一个索引文件,其中日志文件用来记录消息。索引文件是用来保存消息的索引。

相当于一个巨型 log 文件被平均分配到多个大小相等的 segment 数据文件中(每个 segment 文件中的消息数不一定相等),这种特性方便清理已经被消费的消息,提高磁盘利用率。

segment file 由两部分组成,index file 和 data file,.index 和 .timeindex 文件为索引文件,.data 文件为数据文件。
segment 文件命名规则:partion 全局的第一个 segment 从 0 开始,后续每个 segment 文件名为上一个 segment 文件最后一条消息的 offset 值进行递增,例如如下三个文件(假设每个log存储5000消息):

00000000000000000000.log

00000000000000005001.log

00000000000000010001.log

查看 kafka 消息日志的内容:

sh /server/deployment/kafka_2.12-1.1.0/bin/kafka-run-class.sh  kafka.tools.DumpLogSegments --files /data/kafka-logs/test-num-0/00000000000000000000.log --print-data-log

为了提高查找消息的性能,为每一个日志文件添加 2 个索引文件:OffsetIndex 和 TimeIndex,分别对应 .index 以及 .timeindex 文件, TimeIndex 索引文件格式是映射时间戳和相对 offset,index 中存储了索引以及物理偏移量。
3

查 看 索 引 内 容 :

sh /server/deployment/kafka_2.12-1.1.0/bin/kafka-run-class.sh  kafka.tools.DumpLogSegments --files /data/kafka-logs/test-num-0/00000000000000000000.index --print-data-log

例如查找 offset 为 7 的message:

首先用二分查找确定它是在哪个 LogSegment 中,自然是在第一个 Segment 中;

打开这个 Segment 的index文件,二分查找找到 offset 小于或者等于指定offset的索引条目中最大的那个offset,为图中 offset 为 6 的索引,通过索引文件知道 offset 为 6 的 Message 的position为1407,即在数据文件中的位置为1407;

打开数据文件,从位置为1407处开始顺序扫描直到找到 offset 为 7 的那条Message。

即 [6, 1407] 在 log 文件中,对应的是第 6 条记录,其物理偏移量(position)为 1407。得到 position 后,再到对应的 log 文件中,从 position 为1407 处开始顺序查找 offset 对应的消息,将每条消息的 offset 与目标 offset 进行比较,直到找到消息。

这套机制是建立在offset是有序的。索引文件被映射到内存中,所以查找的速度还是很快的。

日志的清除策略以及压缩策略

日志清除策略

1. 根据消息的保留时间,当消息在 kafka 中保存的时间超过了指定的时间,就会触发清理过程
2. 根据 topic 存储的数据大小,当 topic 所占的日志文件大小大于一定的阀值,则可以开始删除最旧的消息。

kafka 会启动一个后台线程,定期检查是否存在可以删除的消息,通过 log.retention.bytes 和 log.retention.hours 这两个参数来设置,当其中任意一个达到要求,都会执行删除。

相关参数:

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168 # 默认的保留时间是:7 天

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824 # 每个segment 的 log文件大小,超过此大小则新建segment

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

日志压缩策略

Kafka 还提供了“日志压缩(Log Compaction)”功能,通过这个功能可以有效的减少日志文件的大小,缓解磁盘紧张的情况,在很多实际场景中,消息的 key 和 value 的值之间的对应关系是不断变化的,就像数据库中的数据会不断被修改一样,消费者只关心 key 对应的最新的 value。因此,我们可以开启 kafka 的日志压缩功能,服务端会在后台启动启动Cleaner 线程池,定期将相同的 key 进行合并,只保留最新的 value 值。

Kafka 中的 Log Compaction 功能是指在默认的日志删除(Log Deletion)规则之外提供的一种清理过时数据的方式,通过这个功能可以有效的减少日志文件的大小,缓解磁盘紧张的情况。

在实际场景中,kafka 消息的 key 对应的 value 值是不断变化的,就像数据库中的数据会不断被 update 一样。Log Compaction 对于有相同 key 的不同 value 值,只保留最后一个版本。

如果应用只关心key对应的最新 value 值,可以开启Kafka的日志清理功能,Kafka会定期将相同key的消息进行合并,只保留最新的value值。其原理如下:
4

partition 的高可用副本机制

kafka 通过副本机制来实现冗余备份。

每个分区可以有多个副本,并且在副本集合中会存在一个 leader 的副本,所有的读写请求都是由 leader 副本来进行处理。其他副本为 follower 副本,follower 副本 会 从 leader 副本同步消息日志。

一般情况下,同一个分区的多个副本会被均匀分配到集群中的不同 broker 上,当 leader 副本所在的 broker 出现故障后,可以重新选举新的 leader 副本继续对外提供服务。通过这样的副本机制来提高 kafka 集群的可用性。

副本分配算法

kafka 副本机制中的几个概念

leader 副本:响应 clients 端读写请求的副本

follower 副本:被动地备份 leader 副本中的数据,不能响应 clients 端读写请求

ISR 副本:包含了 leader 副本和所有与 leader 副本保持同步的 follower 副本

在 zookeeper 服务器获取分区的状态信息:

[zk: localhost:2181(CONNECTED) 0] get /brokers/topics/test-num/partitions/0/state
{"controller_epoch":10,"leader":1,"version":1,"leader_epoch":4,"isr":[1,2,0]}

leader 表示当前分区的 leader 的 broker-id。如果 leader 发生故障或挂掉,一个新 leader 会被选举并被接受客户端的消息写入。Kafka 确保从同步副本列表中选举一个副本为 leader。

leader 负责维护和跟踪 ISR(in-Sync replicas , 副本同步队列)中所有 follower 的状态。当 producer 发送一条消息到 broker 后,leader 写入消息并提交之后才会复制到所有的同步 follower 副本中。

既然有副本机制,就一定涉及到数据同步的概念,那接下来分析下数据是如何同步的?

每个 Kafka 副本对象都有两个重要的属性:LEO 和 HW。注意是所有的副本,而不只是 leader 副本。

LEO

即日志末端位移(log end offset),记录了该副本底层日志(log)中下一条消息的位移值。也就是说,如果 LEO=10,那么表示该副本保存了 10 条消息,位移值范围是[0, 9]。另外,leader LEO 和 follower LEO 的更新是有区别的。

HW

水位值。HW 标记了一个特殊的 offset,当消费者处理消息的时候,只能拉去到 HW 之前的消息,HW 之后的消息对消费者来说是不可见的。也就是说,取 partition 对应 ISR 中最小的 LEO 作为 HW,consumer 最多只能消费到 HW 所在的位置。每个 replica 都有 HW,leader 和 follower 各自维护更新自己的 HW 的状态。

一条消息只有被 ISR 里的所有 Follower 都从 Leader 复制过去才会被认为已提交。这样就避免了部分数据被写进了 Leader,还没来得及被任何 Follower 复制就宕机了,而造成数据丢失。而对于 roducer 而言,它可以选择是否等待消息 commit,这可以通过 acks 来设置。这种机制确保了只要 ISR 有一个或以上的 Follower,一条被 commit 的消息就不会丢失。

对于同一个副本对象而言,其HW 值不会大于 LEO 值。小于等于 HW 值的所有消息都被认为是“已备份”的(replicated)。

副本协同机制

写请求首先由 Leader 副本处理,之后 follower 副本会从 leader 上拉取写入的消息,这个过程会有一定的延迟,导致 follower 副本中保存的消息略少于 leader 副本,但是只要没有超出阈值都可以容忍。但是如果一个 follower 副本出现异常,比如宕机、网络断开等原因长时间没有同步到消息,那这个时候,leader 就会把它踢出去。

ISR

kafka 通过 ISR 集合来维护一个分区副本信息。ISR 表示目前“可用且消息量与 leader 相差不多的副本集合,这是整个副本集合的一个子集”。

ISR 集合中的副本必须满足两个条件:

1. 副本所在节点必须维持着与 zookeeper 的连接

2. 副本最后一条消息的 offset 与 leader 副本的最后一条消息的 offset 之 间 的 差 值 不 能 超 过 指定 的阈值

replica.lag.time.max.ms

如果该 follower 在此时间间隔内一直没有追上过 leader 的所有消息,则该 follower 就会被剔除 isr 列表。

数据的同步过程

了解了副本的协同过程以后,还有一个最重要的机制,就是数据的同步过程。它需要解决

1. 怎么传播消息

2. 在向消息发送端返回 ack 之前需要保证多少个 Replica 已经接收到这个消息

数据的处理过程

Producer 在 发 布 消 息 到 某 个 Partition 时 , 先 通 过 ZooKeeper 找到该 Partition 的 Leader:

get /brokers/topics/<topic>/partitions/2/state

然后无论该 Partition 有多少个 Replica,Producer 只将该消息发送到该 Partition 的 Leader。Leader 会将该消息写入其本地 Log。每个 Follower 都从 Leader pull 数据。这种方式上,Follower 存储的数据顺序与 Leader 保持一致。Follower 在收到该消息并写入其Log 后,向 Leader 发送 ACK。

一旦 Leader 收到了 ISR 中的所有 Replica 的 ACK,该消息就被认为已经 commit 了,Leader 将增加 HW 并且向 Producer 发送 ACK。

初始状态

初始状态下,leader 和 follower 的 HW 和 LEO 都是 0,leader 副本会保存 remote LEO,表示所有 follower LEO 也会被初始化为 0,这个时候,producer 没有发送消息。

follower 会不断地个 leader 发送 FETCH 请求,但是因为没有数据,这个请求会被 leader 寄存,当在指定的时间之后会 强 制 完 成 请 求 , 这 个 时 间 配 置 是:

replica.fetch.wait.max.ms

如果在指定时间内 producer 有消息发送过来,那么 kafka 会唤醒 fetch 请求,让 leader 继续处理。

数据丢失的问题

当且仅当 acks 参数设置为-1 (表示需要所有副本确认)时,如下参数会被启用:

min.insync.replicas=1

设定 ISR 中至少需要多少个副本同步才能表示消息是提交的,默认值为 1。当 min.insync.replicas=1 的时候一旦消息被写入 leader 端 log 即被认为是“已提交”。

但是 follower 的 HW 值是异步延迟更新的,倘若此时 leader 发生变更,那么成为新 leader 的 follower 的 HW 值就有可能是过期的,使得 clients 端认为是成功提交的消息被删除。

kafka 使用 leader epoch 来解决数据丢失问题,leader epoch 实际上是一对(epoch,offset),epoch 表示 leader 版本号,当 leader 变更时 epoch 就会+1,而 offset 则对应于该 epoch 版本的 leader 写入第一条消息的位移。

例如 (0,0) ; (1,50); 表示第一个 leader 从 offset=0 开始写消息,一共写了 50 条,第二个 leader 版本号是 1,从 50 条处开始写消息。这个信息保存在对应分区的本地磁盘文件中,文 件 名 为 :

/data/kafka-log/topic/leader-epoch-checkpoint

每次副本重新成为 leader 时会查询这部分缓存,获取出对应 leader 版本的 offset,其中offset较大的副本会增加成为leader的概率,因为它的消息数最全,所以,集群中 min.insync.replicas 参数最好要大于 1,这样leader挂了之后仍有其他副本包含最新 offset 的消息。

如何处理所有的 Replica 不工作的情况

在 ISR 中至少有一个 follower 时,Kafka 可以确保已经 commit 的数据不丢失,但如果某个 Partition 的所有 Replica 都宕机了,就无法保证数据不丢失了,此时可以:

1. 等待 ISR 中的任一个 Replica“活”过来,并且选它作为 Leader

2. 选择第一个“活”过来的 Replica(不一定是 ISR 中的)作为 Leader

这就需要在可用性和一致性当中作出一个简单的折衷。

如果一定要等待 ISR 中的 Replica“活”过来,那不可用的时间就可能会相对较长。而且如果 ISR 中的所有 Replica 都无法“活”过来了,或者数据都丢失了,这个 Partition 将永远不可用。

选择第一个“活”过来的 Replica 作为 Leader,而这个 Replica 不是 ISR 中的 Replica,那即使它并不保证已经包含了所有已 commit 的消息,它也会成为 Leader 而作为 consumer 的数据源。

ISR 的设计原理

同步复制

如果采用同步复制,那么需要要求所有能工作的 Follower 副本都复制完,这条消息才会被认为提交成功,一旦有一个 follower 副本出现故障,就会导致 HW 无法完成递增,消息就无法提交,消费者就获取不到消息。这种情况下,故障的 Follower 副本会拖慢整个系统的性能,导致系统不可用。

异步复制

如果采用异步复制,leader 副本收到生产者推送的消息后,就认为次消息提交成功。follower 副本则异步从 leader 副本同步。这种设计虽然避免了同步复制的问题,但是假设所有 follower 副本的同步速度都比较慢,则它们保存的消息量远远落后于 leader 副本。而此时 leader 副本所在的 broker 突然宕机,则会重新选举新的 leader 副本,而新的 leader 副本中没有原来 leader 副本的消息。这就出现了消息的丢失。

kafka 权衡了同步和异步的两种策略,采用 ISR 集合,巧妙解决了两种方案的缺陷:当 follower 副本延迟过高,leader 副本则会把该 follower 副本踢出 ISR 集合,消息依然可以快速提交。

当 leader 副本所在的 broker 突然宕机,会优先将 ISR 集合中 follower 副本选举为 leader,新 leader 副本包含了 HW 之前的全部消息,这样就避免了消息的丢失。

登录 后评论
下一篇
阿里巴巴云原生小助手
7651人浏览
2020-02-11
相关推荐
Kafka监控系统Kafka Eagle剖析
1708人浏览
2018-07-27 01:02:00
OpenStack 上搭建 Kafka 集群
1551人浏览
2018-10-15 09:09:00
Kafka
1034人浏览
2019-04-20 21:08:59
kafka简单安装部署
624人浏览
2016-07-02 21:39:00
kafka原理及Docker环境部署
16911人浏览
2018-10-26 13:11:38
Kafka常用命令收录
1127人浏览
2018-09-09 18:08:19
mac kafka 环境搭建
506人浏览
2017-03-15 16:31:13
Apache Kafka常用命令
2754人浏览
2019-07-26 11:19:01
安装和测试Kafka
3022人浏览
2016-04-08 13:25:50
Kafka Java API示例
722人浏览
2016-04-01 13:21:00
kafka集群的搭建
1040人浏览
2017-11-16 20:29:00
1
2
0
2676