Kafka 面试题整理

Kafka架构

Kafka 整体结构介绍

  • Producer :消息生产者,就是向kafka broker发消息的客户端。
  • Consumer :消息消费者,向kafka broker取消息的客户端
  • Topic :可以理解为一个队列。
  • Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic。
  • Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
  • Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序。
  • Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka

Consumer 与topic 的关系

本质上kafka只支持Topic;

  • 每个group中可以有多个consumer,每个consumer属于一个consumer group;

    通常情况下,一个group中会包含多个consumer,这样不仅可以提高topic中消息的并发消费能力,而且还能提高”故障容错”性,如果group中的某个consumer失效那么其消费的partitions将会有其他consumer自动接管。

  • 对于Topic中的一条特定的消息,只会被订阅此Topic的每个group中的其中一个consumer消费,此消息不会发送给一个group的多个consumer;

    那么一个group中所有的consumer将会交错的消费整个Topic,每个group中consumer消息消费互相独立,我们可以认为一个group是一个”订阅”者。

  • 在kafka中,一个partition中的消息只会被group中的一个consumer消费(同一时刻)

    一个Topic中的每个partions,只会被一个”订阅者”中的一个consumer消费,不过一个consumer可以同时消费多个partitions中的消息。

  • kafka的设计原理决定,对于一个topic,同一个group中不能有多于partitions个数的consumer同时消费,否则将意味着某些consumer将无法得到消息。

    kafka只能保证一个partition中的消息被某个consumer消费时是顺序的;事实上,从Topic角度来说,当有多个partitions时,消息仍不是全局有序的。

kafka 相对传统技术的优势

  • 快速:单一的Kafka代理可以处理成千上万的客户端,每秒处理数兆字节的读写操作。
  • 可伸缩:在一组机器上对数据进行分区和简化,以支持更大的数据
  • 持久:消息是持久性的,并在集群中进行复制,以防止数据丢失。
  • 设计:它提供了容错保证和持久性

Kafka 使用zookeeper 的作用

  • Zookeeper主要用于在集群中不同节点之间进行通信
  • controller的选举(/controller 分布式锁):controller负责集群节点的上下线监控,时刻关注leader是否down机,负责leader的选举,为ISR队列选举备胎
  • offset的存储(kafka0.9之前)
  • 保存broker节点信息
  • 除此之外,它还执行其他活动,如: leader检测、分布式同步、配置管理、识别新节点何时离开或连接、集群、节点实时状态等等。

0.9之前的数据保存在zk集群中,一定程度上增加了zk集群负担
0.9版本及之后默认将consumer offset保存在Kafka的一个内置topic(__consumer_offsets)中。

Kafka 判断一个节点是否还活着有那两个条件?

(1)节点必须可以维护和 ZooKeeper 的连接,Zookeeper 通过心跳机制检查每个节点的连接

(2)如果节点是个 follower,他必须能及时的同步 leader 的写操作,延时不能太久

Kafka分区

Kafka生产者分区机制

轮询策略

也称 Round-robin 策略,即顺序分配。比如一个主题下有 3 个分区,那么第一条消息被发送到分区 0,第二条被发送到分区 1,第三条被发送到分区 2,以此类推。当生产第 4 条消息时又会重新开始,即将其分配到分区 0。

随机策略

也称 Randomness 策略。所谓随机就是我们随意地将消息放置到任意一个分区上,如下面这张图所示。

按消息key保存策略

也称 Key-ordering 策略。这个可以理解为是自定义的策略之一。

Kafka 允许为每条消息定义消息键,简称为 Key。这个 Key 的作用非常大,它可以是一个有着明确业务含义的字符串,比如客户代码、部门编号或是业务 ID 等;也可以用来表征消息元数据。特别是在 Kafka 不支持时间戳的年代,在一些场景中,工程师们都是直接将消息创建时间封装进 Key 里面的。一旦消息被定义了 Key,那么你就可以保证同一个 Key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略

Kafka 生产者

producer 是否直接将数据发送到 broker 的 leader(主节点)?

producer 直接将数据发送到 broker 的 leader(主节点),不需要在多个节点进行分发。

为了帮助 producer 做到这点,所有的 Kafka 节点都可以及时的告知:哪些节点是活动的,目标topic 目标分区的 leader 在哪。这样 producer 就可以直接将消息发送到目的地了

Kafka 消费者

Kafa consumer 是否可以消费指定分区消息?

Kafa consumer 消费消息时,向 broker 发出”fetch”请求去消费特定分区的消息,consumer指定消息在日志中的偏移量(offset),就可以消费从这个位置开始的消息,customer 拥有了 offset 的控制权,可以向后回滚去重新消费之前的消息,这是很有意义的

Kafka 消息是采用 Pull 模式,还是 Push 模式?

Kafka 最初考虑的问题是,customer 应该从 brokes 拉取消息还是 brokers 将消息推送到consumer,也就是 pull 还 push。在这方面,Kafka 遵循了一种大部分消息系统共同的传统的设计:producer 将消息推送到 broker,consumer 从 broker 拉取消息

Kafka 数据一致性

ISR解释

分区中的所有副本统称为AR(Assigned Replicas)。所有与leader副本保持一定程度同步的副本(包括leader)组成ISR(in-sync replicas)。而与leader副本同步滞后过多的副本(不包括leader),组成OSR(out-sync replicas),所以,AR = ISR + OSR。在正常情况下,所有的follower副本都应该与leader副本保持一定程度的同步,即AR = ISR,OSR集合为空。

leader副本负责维护和跟踪ISR中所有follower的滞后状态,当follower落后太多或者长时间没有向leader发起同步请求,leader副本就会认为它出问题了,会把它从ISR中移除,这时候这个follower就会放入OSR集合中,直到某个时候这个follower同步跟上了leader,然后这个副本又会被加入到ISR中。此外,当leader副本挂了,只有ISR中的follower副本才有资格成为leader,OSR中的则没有资格。

Isr列表的维护

Leader维护了一个动态的ISR列表(同步副本的作用),只需要这个列表的中的follower和leader同步;当ISR中的follower完成数据的同步之后,leader就会给生产者发送ack,如果follower长时间未向leader同步数据,则该follower将被剔除ISR(为了保证同步的效率),这个时间阈值也是自定义的;同样leader故障后,就会从ISR中选举新的leader

怎么选择ISR的节点呢?

首先通信的时间要快,要和leader要可以很快的完成通信,这个时间默认是10s

然后就看leader数据差距,消息条数默认是10000条(后面版本被移除)

为什么移除:因为kafka发送消息是批量发送的,所以会一瞬间leader接受完成,但是follower还没有拉取,所以会频繁的踢出加入ISR,这个数据会保存到zk和内存中,所以会频繁的更新zk和内存。

但是对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等ISR中的follower全部接受成功

HW和LEO解释

这里介绍的数据一致性主要是说不论是老的 Leader 还是新选举的 Leader,Consumer 都能读到一样的数据。那么 Kafka 是如何实现的呢?

1594010991499

假设分区的副本为3,其中副本0是 Leader,副本1和副本2是 follower,并且在 ISR 列表里面。虽然副本0已经写入了 Message4,但是 Consumer 只能读取到 Message2。因为所有的 ISR 都同步了 Message2,只有 High Water Mark 以上的消息才支持 Consumer 读取,而 High Water Mark 取决于 ISR 列表里面偏移量最小的分区,对应于上图的副本2,这个很类似于木桶原理。

这样做的原因是还没有被足够多副本复制的消息被认为是“不安全”的,如果 Leader 发生崩溃,另一个副本成为新 Leader,那么这些消息很可能丢失了。如果我们允许消费者读取这些消息,可能就会破坏一致性。试想,一个消费者从当前 Leader(副本0) 读取并处理了 Message4,这个时候 Leader 挂掉了,选举了副本1为新的 Leader,这时候另一个消费者再去从新的 Leader 读取消息,发现这个消息其实并不存在,这就导致了数据不一致性问题。

当然,引入了 High Water Mark 机制,会导致 Broker 间的消息复制因为某些原因变慢,那么消息到达消费者的时间也会随之变长(因为我们会先等待消息复制完毕)。延迟时间可以通过参数 replica.lag.time.max.ms 参数配置,它指定了副本在复制消息时可被允许的最大延迟时间。

总结:

LEO(log end offset,每个副本中最大的offset),HW(high watermark,所有副本最小的offset)。对于消费者来说不能消费HW之后的数据。

Broker故障后ISR的处理

  1. follower故障
    follower发生故障后会被临时踢出ISR,待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步。等该follower的LEO大于等于该Partition的HW,即follower追上leader之后,就可以重新加入ISR了。
  2. leader故障
    leader发生故障之后,会从ISR中选出一个新的leader,之后,为保证多个副本之间的数据一致性,其余的follower会先将各自的log文件高于HW的部分截掉,然后从新的leader同步数据。

Kafka 的ISR中有follower落后,怎么处理?

leader收到数据,所有follower都开始同步数据,但有一个follower,因为某种故障,迟迟不能与leader进行同步,那leader就要一直等下去,直到它完成同步,才能发送ack。这个问题怎么解决呢?
Leader维护了一个动态的in-sync replica set (ISR),意为和leader保持同步的follower集合。当ISR中的follower完成数据的同步之后,leader就会给follower发送ack。如果follower长时间未向leader同步数据,则该follower将被踢出ISR,该时间阈值由replica.lag.time.max.ms参数(默认10秒)设定。Leader发生故障之后,就会从ISR中选举新的leader。

kafka Broker 数据一致性

什么情况下broker会数据不一致
  1. 未设置ack=-1, leader和follower未完成同步,leader挂掉, 数据未来得及同步造成数据丢失
  2. 数据存储在page cache中, 还未来得及刷盘,这时宕机(极端情况)
  3. 生产者没有开启幂等的话, 会造成数据重复的情况
怎么实现broker数据一致性
  1. 首先由于leo、hw的机制会保证消费者消费的数据的一致性

  2. 设置ack=all, 加上isr队列会保证isr中的broker都完成了同步

    • 0: producer不会等待broker发送ack
    • 1: 当leader接收到消息之后发送ack
    • -1: 当所有的follower都同步消息成功后发送ack
  3. 如果需要保证数据不重复,可以开启幂等功能

怎么实现producer的数据一致性

什么情况下producer会数据不一致
  1. broker未采用同步发送的方式,使用默认异步发送
  2. 异步发送未设置ack=-1,未等待broker端isr全部同步
  3. 在发送的queue中的消息还没有完全发送, 这时发生了宕机(极端情况)
  4. 没有对超过重试次数的消息进行处理
  5. 没有开启幂等,在数据重试的情况下会产生数据重复的情况
怎么保证producer的数据一致
  1. 设置ack=all + 同步发送,可以基本保证数据不丢失,但是不推荐
  2. 设置ack=all + 异步发送 + 对超过重试次数的请求进行记录 or 重试 (在生产者宕机的极端情况,会丢失数据)
  3. 保证数据不重复只有开启幂等一种方法了

Kafka 文件系统

Kafka 零复制技术

正常情况下,先把数据读到内核空间,在从内核空间把数据读到用户空间,然后在调操作系统的io接口写到内核空间,最终在写到硬盘中

Kafka是这样做的,直接在内核空间流转io流,所以kafka的性能非常高

kafka利用了Linux的sendFile技术(NIO),省去了进程切换和一次数据拷贝,让性能变得更好。

Kafka 文件存储机制

Kafka 文件存储基本结构

  • 在Kafka文件存储中,同一个topic下有多个不同partition,每个partition为一个目录,partiton命名规则为topic名称+有序序号,第一个partiton序号从0开始,序号最大值为partitions数量减1。
  • 每个partion(目录)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件中。但每个段segment file消息数量不一定相等,这种特性方便old segment file快速被删除。默认保留7天的数据。

1592037825676

  • 每个partiton只需要支持顺序读写就行了,segment文件生命周期由服务端配置参数决定。(什么时候创建,什么时候删除)

1592037834703

数据有序的讨论?

​ 一个partition的数据是否是有序的? 间隔性有序,不连续

​ 针对一个topic里面的数据,只能做到partition内部有序,不能做到全局有序。

​ 特别加入消费者的场景后,如何保证消费者消费的数据全局有序的?伪命题。

​ 只有一种情况下才能保证全局有序?就是只有一个partition。

Kafka Partition Segment

  • Segment file组成:由2大部分组成,分别为index file和data file,此2个文件一 一对应,成对出现,后缀”.index”和“.log”分别表示为segment索引文件、数据文件。

1592037845195

  • Segment文件命名规则:partion全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。
  • 索引文件存储大量元数据,数据文件存储大量消息,索引文件中元数据指向对应数据文件中message的物理偏移地址。

1592037856146

​ 3,497:当前log文件中的第几条信息,存放在磁盘上的那个地方

​ 上述图中索引文件存储大量元数据,数据文件存储大量消息,索引文件中元数据指向对应数据文件中message的物理偏移地址。

​ 其中以索引文件中元数据3,497为例,依次在数据文件中表示第3个message(在全局partiton表示第368772个message)

​ 以及该消息的物理偏移地址为497。

  • segment data file由许多message组成, 物理结构如下:

1592037918624

Kafka 查找Message

读取offset=368776的message,需要通过下面2个步骤查找。

1592037926311

1.查找segment file

00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0

00000000000000368769.index的消息量起始偏移量为368770 = 368769 + 1

00000000000000737337.index的起始偏移量为737338=737337 + 1

其他后续文件依次类推。

以起始偏移量命名并排序这些文件,只要根据offset 二分查找文件列表,就可以快速定位到具体文件。当offset=368776时定位到00000000000000368769.index和对应log文件。

2.通过segment file 查找 message

当offset=368776时,依次定位到00000000000000368769.index的元数据物理位置和00000000000000368769.log的物理偏移地址

然后再通过00000000000000368769.log顺序查找直到offset=368776为止。

生产者常用配置

bootstrap.servers=host:port1,host2:port2

key.serializer 和 value.serializer一般不建议自己实现,采用默认string实现即可

retries=3 若能接受数据丢失可配置为0

max.in.flight.requests.per.connection=1 越大则性能越高,但发生消息重试会打破消息有序性

acks=all 等待所有isr副本确认或配置1 仅leader确认即返回

buffer.memory=33554432 默认32MB

linger.ms=100 最慢0.1s会发送一个批次的消息

batch.size=163840 160k,取默认值的10倍,增大该配置可降低发送频次

compression.type=snappy 对消息做压缩,切记保持客户端和服务端版本及压缩格式一致 client.id=xxxx 标识生产者,可以为任意的一个字符串

send.buffer.bytes=327680 320kb,默认128kb,适量调整即可

receive.buffer.bytes=163840 160kb,默认32kb,适量调整即可

Kafka 幂等性

Producer 的幂等性

Producer 的幂等性指的是当发送同一条消息时,数据在 Server 端只会被持久化一次,数据不丟不重,但是这里的幂等性是有条件的:

  • 只能保证 Producer 在单个会话内不丟不重,如果 Producer 出现意外挂掉再重启是无法保证的(幂等性情况下,是无法获取之前的状态信息,因此是无法做到跨会话级别的不丢不重);
  • 幂等性不能跨多个 Topic-Partition,只能保证单个 partition 内的幂等性,当涉及多个 Topic-Partition 时,这中间的状态并没有同步。

如果需要跨会话、跨多个 topic-partition 的情况,需要使用 Kafka 的事务性来实现。

幂等性用来解决什么问题?

在 0.11.0 之前,Kafka 通过 Producer 端和 Server 端的相关配置可以做到数据不丢,也就是 at least once,但是在一些情况下,可能会导致数据重复,比如:网络请求延迟等导致的重试操作,在发送请求重试时 Server 端并不知道这条请求是否已经处理(没有记录之前的状态信息),所以就会有可能导致数据请求的重复发送,这是 Kafka 自身的机制(异常时请求重试机制)导致的数据重复。

幂等性的实现原理

PID

每个 Producer 在初始化时都会被分配一个唯一的 PID,这个 PID 对应用是透明的,完全没有暴露给用户。对于一个给定的 PID,sequence number 将会从0开始自增,每个 Topic-Partition 都会有一个独立的 sequence number。Producer 在发送数据时,将会给每条 msg 标识一个 sequence number,Server 也就是通过这个来验证数据是否重复。这里的 PID 是全局唯一的,Producer 故障后重新启动后会被分配一个新的 PID,这也是幂等性无法做到跨会话的一个原因。

Sequence number

在有了 PID 之后,在 PID + Topic-Partition 级别上添加一个 sequence numbers 信息,就可以实现 Producer 的幂等性了。ProducerBatch 也提供了一个 setProducerState() 方法,它可以给一个 batch 添加一些 meta 信息(pid、baseSequence、isTransactional),这些信息是会伴随着 ProduceRequest 发到 Server 端,Server 端也正是通过这些 meta 来做相应的判断,如下所示:

幂等性实现流程

Producer 幂等性的处理工作

在KafkaProducer 在初始化时会初始化一个TransactionManager实例, 它的左右有以下几部分:

  1. 记录本地事物状态(事务性必须)
  2. 记录一些状态信息以保证幂等性,比如:每个 topic-partition 对应的下一个 sequence numbers 和 last acked batch(最近一个已经确认的 batch)的最大的 sequence number 等;
  3. 记录ProducerIdAndEpoch信息(PID信息);

Producer 幂等性的发送流程

  1. 应用通过 KafkaProducer 的 send() 方法将数据添加到 RecordAccumulator 中,添加时会判断是否需要新建一个 ProducerBatch,这时这个 ProducerBatch 还是没有 PID 和 sequence number 信息的;
  2. Producer 后台发送线程 Sender,在 run() 方法中,会先根据 TransactionManager 判断当前的 PID 是否需要重置,重置的原因是因为:如果有 topic-partition 的 batch 重试多次失败最后因为超时而被移除,这时 sequence number 将无法做到连续,因为 sequence number 有部分已经分配出去,这时系统依赖自身的机制无法继续进行下去(因为幂等性是要保证不丢不重的),相当于程序遇到了一个 fatal 异常,PID 会进行重置,TransactionManager 相关的缓存信息被清空(Producer 不会重启),只是保存状态信息的 TransactionManager 做了 clear+new 操作,遇到这个问题时是无法保证 exactly once 的(有数据已经发送失败了,并且超过了重试次数);
  3. Sender 线程通过 maybeWaitForProducerId() 方法判断是否需要申请 PID,如果需要的话,这里会阻塞直到获取到相应的 PID 信息;
  4. Sender 线程通过 sendProducerData() 方法发送数据,整体流程与之前的 Producer 流程相似,不同的地方是在 RecordAccumulator 的 drain() 方法中,在加了幂等性之后,drain() 方法多了如下几步判断:
    1. 常规的判断:判断这个 topic-partition 是否可以继续发送(如果出现前面2中的情况是不允许发送的)、判断 PID 是否有效、如果这个 batch 是重试的 batch,那么需要判断这个 batch 之前是否还有 batch 没有发送完成,如果有,这里会先跳过这个 Topic-Partition 的发送,直到前面的 batch 发送完成,最坏情况下,这个 Topic-Partition 的 in-flight request 将会减少到1(这个涉及也是考虑到 server 端的一个设置,文章下面会详细分析)
    2. 如果这个ProdecerBatch 还没有这个相应的PID和sequence number 信息, 会在这里进行相应的设置;
  5. 最后 Sender 线程再调用 sendProduceRequests() 方法发送 ProduceRequest 请求,后面的就跟之前正常的流程保持一致了。

Server 幂等性的请求生产处理流程

  1. 如果请求设置了幂等性,检查是否对 ClusterResource 有 IdempotentWrite 权限,没有的话返回 CLUSTER_AUTHORIZATION_FAILED;
  2. 验证对 topic 是否有 Write 权限以及 Topic 是否存在,否则返回 TOPIC_AUTHORIZATION_FAILED 或 UNKNOWN_TOPIC_OR_PARTITION 异常;
  3. 检查是否有 PID 信息,没有的话走正常的写入流程;
  4. LOG 对象会在 analyzeAndValidateProducerState() 方法先根据 batch 的 sequence number 信息检查这个 batch 是否重复(server 端会缓存 PID 对应这个 Topic-Partition 的最近5个 batch 信息),如果有重复,这里当做写入成功返回(不更新 LOG 对象中相应的状态信息,比如这个 replica 的 the end offset 等);
  5. 有了 PID 信息,并且不是重复 batch 时,在更新 producer 信息时,会做以下校验:
    1. 检查该 PID 是否已经缓存中存在(主要是在 ProducerStateManager 对象中检查);
    2. 如果不存在,那么判断 sequence number 是否 从0 开始,是的话,在缓存中记录 PID 的 meta(PID,epoch, sequence number),并执行写入操作,否则返回 UnknownProducerIdException(PID 在 server 端已经过期或者这个 PID 写的数据都已经过期了,但是 Client 还在接着上次的 sequence number 发送数据);
    3. 如果该 PID 存在,先检查 PID epoch 与 server 端记录的是否相同;
    4. 如果不同并且 sequence number 不从 0 开始,那么返回 OutOfOrderSequenceException 异常;
    5. 如果不同并且 sequence number 从 0 开始,那么正常写入;
    6. 如果相同,那么根据缓存中记录的最近一次 sequence number(currentLastSeq)检查是否为连续(会区分为 0、Int.MaxValue 等情况),不连续的情况下返回 OutOfOrderSequenceException 异常。
  6. 下面与正常写入相同。

MAX_INFLIGHT 参数<5 的意义

如果这个 batch 有 PID 信息,会首先检查这个 batch 是否为重复的 batch 数据,其实现如下,batchMetadata 会缓存最新 5个 batch 的数据(如果超过5个,添加时会进行删除,这个也是幂等性要求 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 小于等于5 的原因,与这个值的设置有关),根据 batchMetadata 缓存的 batch 数据来判断这个 batch 是否为重复的数据。

思考: Producer 在设置幂等性时,为什么要求 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 小于等于 5,如果设置大于 5(不考虑 Producer 端参数校验的报错),会带来什么后果?

回答: server 端只会缓存最近 5 个 batch, 会导致异常数据不断重发

解释: 其实这里,要求 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 小于等于 5 的主要原因是:Server 端的 ProducerStateManager 实例会缓存每个 PID 在每个 Topic-Partition 上发送的最近 5 个batch 数据(这个 5 是写死的,至于为什么是 5,可能跟经验有关,当不设置幂等性时,当这个设置为 5 时,性能相对来说较高,社区是有一个相关测试文档,忘记在哪了),如果超过 5,ProducerStateManager 就会将最旧的 batch 数据清除。

假设应用将 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 设置为 6,假设发送的请求顺序是 1、2、3、4、5、6,这时候 server 端只能缓存 2、3、4、5、6 请求对应的 batch 数据,这时候假设请求 1 响应失败,需要重试,当重试的请求发送过来后,首先先检查是否为重复的 batch,这时候检查的结果是否,之后会开始 check 其 sequence number 值,这时候只会返回一个 OutOfOrderSequenceException 异常,client 在收到这个异常后,会再次进行重试,直到超过最大重试次数或者超时,这样不但会影响 Producer 性能,还可能给 Server 带来压力(相当于client 狂发错误请求)。

幂等性怎么保证有序

简单来说,其实现机制概括为:

  1. Server 端验证 batch 的 sequence number 值,不连续时,直接返回异常;
  2. Client 端请求重试时,batch 在 reenqueue 时会根据 sequence number 值放到合适的位置(有序保证之一);
  3. Sender 线程发送时,在遍历 queue 中的 batch 时,会检查这个 batch 是否是重试的 batch,如果是的话,只有这个 batch 是最旧的那个需要重试的 batch,才允许发送,否则本次发送跳过这个 Topic-Partition 数据的发送等待下次发送。

Kafka 副本Leader 的选举

1、选举原理
确实Kafka早期的版本就是直接用Zookeeper来完成选举的。利用了Zookeeper的Watch机制;节点不允许重复写入以及临时节点这些特性。这样实现比较简单,省事。但是也会存在一定的弊端。比如分区和副本数量过多,所有的副本都直接参与选举的话,一旦某个出现节点的增减,就会造成大量的Watch事件被触发,ZooKeeper的就会负载过重,不堪重负。

新版本的Kafka中换了一种实现方式。不是所有的Repalica都参与Leader选举,而是由其中的一个Broker统一来指挥,这个Broker的角色就叫做Controller控制器。

Kafka要先从所有Broker中选出唯一的一个Controller。

image-20230106165519993

所有的Broker会尝试在Zookeeper中创建临时节点/controller,谁先创建成功,谁就是Controller。那如果Controller挂掉或者网络出现问题,ZooKeeper上的临时节点就会消失。其他的Broker通过Watch监听到Controller下线的消息后,继续按照先到先得的原则竞选Controller。这个Controller就相当于选举委员会的主席。

当一个节点成为Controller之后,他就会承担以下职责:

监听Broker变化、监听Topic变化、监听Partition变化、获取和管理Broker、Topic、Partition的信息、管理Partiontion的主从信息。

2、选举规则
Controller确定以后,就可以开始做分区选主的事情。接下来就是找候选人。显然,每个Replica都想推荐自己,但不是所有的Replica都有竞选资格。只有在ISR(In-Sync Replicas)保持心跳同步的副本才有资格参与竞选。就好比是皇帝每天着急皇子们开早会,只有每天来打卡的皇子才能加入ISR。那些请假的、迟到的没有资格参与选举。

接下来,就是Leader选举,就相当于要在众多皇子中选出太子。在分布式选举中,有非常多的选举协议比如ZAB、Raft等等,他们的思想归纳起来都是:先到先得,少数服从多数。但是Kafka没有用这些方法,而是用了一种自己实现的算法。

提到Kafka官方的解释是,它的选举算法和微软的PacificA算法最相近。大致意思就是,默认是让ISR中第一个Replica变成Leader。比如ISR是1、5、9,优先让1成为Leader。这个跟中国古代皇帝传位是一样的,优先传给皇长子。

假设,我们创建一个4个分区2个副本的Topic,它的Leader分布是这样的,如图所示:

image-20230106165526696

第1个分区的副本Leader,落在B节点上。第2个分区的副本Leader落在C节点上,第3个分区的副本Leader落在A节点上,第4个分区的副本Leader落在B节点上。如果有更多副本,就以此类推。我们发现Leader的选举的规则相当于蛇形走位。

image-20230106165541777

这样设计的好处是可以提高数据副本的容灾能力。将Leader和副本完全错开,从而不至于一挂全挂。

刘小恺(Kyle) wechat
如有疑问可联系博主