在kafka中我们高水位和leader epoch机制是用来解决kafka主从之间的消息同步问题。下面分两部分进行介绍

高水位

在Kafka中,高水位(High Watermark,HW)标志着消费者能够安全读取的消息的最高偏移量。每个分区的高水位是保证数据一致性的关键机制之一,它表示所有同步副本(ISR)已经复制的消息的最小偏移量。我们可以通过下面的步骤来解释高水位如何实现同步:

  1. 消息写入

    • 生产者(Producers)向领导副本(Leader)发送消息。
    • 领导副本将消息追加到它的日志中,并更新其本地日志的最后偏移量(Last Offset)。
  2. 消息复制

    • 领导副本负责将这些新消息复制到所有的跟随副本(Followers)。
    • 跟随副本将复制的消息追加到自己的日志中,并向领导副本发送确认(ACK)。
  3. 计算高水位

    • 领导副本收到所有同步副本对特定消息的确认后,它将更新高水位。
    • 高水位被设定为所有同步副本中已复制数据的最小偏移量。换句话说,高水位是所有跟随副本都确认接收到的最小偏移量。
  4. 消费者读取

    • 消费者(Consumers)从领导副本读取消息。
    • 为了确保消费者不会读到任何可能因为故障而丢失的数据,消费者只能读到高水位以下的消息。
  5. 故障转移和高水位同步

    • 如果领导副本失败,将从ISR中选择一个新的领导副本。
    • 新的领导副本将它的高水位设置为它自身的高水位,这是因为新的领导副本保证了它至少复制了到这个偏移量的消息。
    • 新领导副本可能会降低高水位,以保证不会出现数据丢失的情况。

通过维护高水位,Kafka确保了即使在发生故障和副本更换的情况下,消费者也不会读取到不一致的数据。高水位也帮助系统在发生故障时决定需要从哪个偏移量开始复制数据,确保所有副本能够捕获所有提交的消息,从而实现一致性。

在Kafka中,高水位(High Watermark, HW)的更新是确保数据一致性和可靠性的关键机制。领导节点(Leader)和跟随节点(Follower)更新高水位的条件有所不同,主要因为它们在Kafka复制协议中扮演的角色不同。

领导节点的高水位更新条件

对于领导节点,高水位的更新依赖于消息复制到跟随节点的进度。具体而言,更新条件包括:

  1. 所有同步副本(ISR)的确认:领导节点的高水位只有在所有同步副本(ISR中的副本)已经成功复制到特定偏移量的消息后才会更新。换句话说,高水位会更新到所有ISR都已经复制的最小偏移量。

  2. 定期检查:领导节点定期检查所有跟随节点的复制进度。一旦确认所有ISR都已经复制到了新的偏移量,领导节点就会将其高水位更新到这个新的偏移量。

这意味着,领导节点的高水位表示在发生任何领导节点故障之前,消费者可以安全读取的最大偏移量,因为这部分数据已经被集群中的所有同步副本复制。

跟随节点的高水位更新条件

跟随节点的高水位更新则直接受到领导节点复制数据的影响。更新条件包括:

  1. 数据复制成功:当跟随节点从领导节点成功复制数据后,它会尝试更新自己的高水位。跟随节点的高水位更新到它已经复制和确认的最高偏移量,但这个偏移量不能超过它从领导节点接收到的高水位。

  2. 领导节点的高水位:跟随节点的高水位更新还依赖于领导节点告知的高水位。跟随节点不能将自己的高水位更新到超过领导节点当前高水位的位置。这确保了跟随节点不会向消费者提供还未被整个ISR集群确认的数据。

因此,跟随节点的高水位通常落后于或等于领导节点的高水位,确保了消费者读取的数据是一致且可靠的。

总的来说,领导节点和跟随节点的高水位更新机制反映了Kafka复制协议的设计,旨在保证数据的一致性和在故障转移场景下的可靠性。领导节点的高水位更新反映了ISR的复制进度,而跟随节点的高水位更新则确保了其与领导节点保持一致,共同维护了集群数据的一致性。

高水位的数据丢失问题

  1. 第一步:我们 BrokerA 读取到了m2部分,并且我们主节点收到了ack。但是还没来得及告诉BrokerA自己的高水位,因此此时BrokerA高水位是1,BrokerB的高水位是2。

  2. 第二步: BrokerB 还没有受到主节点的高水位,就宕机了。因此此时BrokerA高水位是1,BrokerB的高水位是2。

    image-20240407143438226

  3. 第三步:此时我们主节点 BrokerB也宕机了,但是我们BrokerA 启动了,成为了主节点,并且高水位为1,删除了m2。造成数据丢失。

    image-20240407143547858

高水位的数据不一致问题

  1. BrokerA 和 BrokerB 同时宕机,因此此时BrokerA高水位是1,BrokerB的高水位是2。。

    image-20240407143645520

  2. BrokerA先启动,选举为leader。并且写入了消息m2。这个时候高水位更新为2。

    image-20240407144815413

  3. BrokerB启动,高水位也是2。但是这个时候高水位为1的数据并不相同。

    image-20240407144901768

Leader Epoch

Leader Epoch是什么

这段话是在描述Kafka中Leader epoch在消息传递和日志复制中的应用。我将其分解来解释一下:

  1. Leader Epoch 是什么

    • Leader Epoch是一个32位的数字,用来标识特定分区领导副本(Leader replica)的“时代”。每次分区的领导副本发生变化时,这个数字都会递增。这是Kafka内部用于追踪领导副本更迭的一个机制。
  2. 如何管理

    • 这个数字是由Kafka的Controller管理的。Controller是Kafka集群中的一个组件,负责管理分区和副本的状态,并在必要时触发领导副本的选举。
    • Leader Epoch的值被存储在Zookeeper的分区状态信息中。Zookeeper是Kafka用于管理集群元数据的一种协调服务。
  3. LeaderAndIsrRequest

    • 当选举新的领导副本时,Controller会通过一个叫做LeaderAndIsrRequest的请求将Leader Epoch的值传递给新的Leader。这样,新的领导副本会知道自己是哪个时代的领导,并根据这个时代进行操作。
  4. 在生产者请求中的应用

    • 当生产者(Producer)向Kafka发送数据时,领导副本会使用当前的Leader Epoch来标记每个消息。这样,每个消息都与产生它时的Leader Epoch相关联。
  5. 在复制协议中的应用

    • 这个Leader Epoch号随后会通过Kafka的复制机制传播到所有副本。这是为了保证消息的一致性和正确的消息顺序,即使在发生故障转移时也能正确处理。
  6. 替换HW(High Watermark)

    • 在Kafka中,HW(High Watermark)标记了一个副本可以安全读取的最高偏移量。Leader Epoch编号用于更新HW,因为它反映了最新的、已被副本集中的所有副本所确认的消息偏移量。
    • 当领导副本更改时,或者发生某些故障情况时,Leader Epoch用于确定哪些消息可能需要被截断(truncate)。如果一个旧的领导副本恢复并试图成为新的领导副本,它的Leader Epoch会与Zookeeper中的记录不符,因此它会根据新的领导副本已确认的Leader Epoch来截断自己的日志,确保数据一致性。

简而言之,Leader Epoch是Kafka用来标识消息的元数据之一,使得在多副本同步和领导副本更迭的情况下,可以保持消息的一致性和系统的稳定性。

Leader Epoch Sequence文件

在Kafka中,Leader Epoch Sequence文件用于存储Leader Epoch与消息起始偏移量(Start Offset)的映射关系。这个文件是按一定的格式来组织的,通常会有以下的存储结构:

  1. Epoch号(Leader Epoch):每个Epoch对应一个4字节的整数。这个数值会随着新的Leader的选举而递增。

  2. 起始偏移量(Start Offset):与每个Epoch相关联的消息起始偏移量。这个偏移量指的是在该Epoch期间第一个消息的偏移量。

文件中的每个条目都会有这样的一个映射关系,例如:

1
2
3
4
Epoch 1: Offset 0
Epoch 2: Offset 105
Epoch 3: Offset 210
...

这意味着:

  • 从偏移量0开始到偏移量104的消息属于Epoch 1。
  • 从偏移量105开始到偏移量209的消息属于Epoch 2。
  • 从偏移量210开始到下一个Epoch的起始偏移量前的所有消息属于Epoch 3。
  • 以此类推。

Kafka使用这个文件来追踪Leader Epoch的变化,这对于确定消息的有效性和处理副本之间的数据同步非常重要。它允许系统在发生副本故障转移时正确地处理消息的截断和恢复,保证数据的一致性。

这个文件通常存储在Kafka的日志目录中,并且会被缓存在每个副本的内存中,以便快速访问。在处理读取和写入请求时,Kafka会利用这个文件来确定消息的位置和有效性。当Kafka启动或者领导副本发生变更时,这个文件会被更新。在进行数据同步或恢复时,这个文件也会被读取以确定同步的起始点。

如何工作

Kafka中的Leader Epoch机制是用来处理分区领导副本(Leader)与跟随副本(Follower)之间的交互以及管理Epoch更新的关键组件。这个机制确保了分区的数据一致性,尤其是在领导副本更换或发生网络分区时。以下是Leader和Follower之间交互的各种情况以及Epoch更新的条件的详细解释:

1. 正常操作下的数据复制

  • 情况:在正常操作中,Leader接收来自生产者的消息,将其追加到本地日志,并向所有Follower广播这些消息。Follower接收消息,将其追加到自己的本地日志,并向Leader发送确认。

  • Epoch更新条件:在正常操作期间,Leader的Epoch不会变化。只有在领导副本变更时,Epoch才会更新。

2. 领导副本更换

  • 情况:当当前的Leader因为故障或其他原因不可用时,Kafka的控制器(Controller)会从ISR(In-Sync Replicas)中选举一个新的Leader。新Leader的选举基于副本的日志长度以及它们的当前状态。

  • Epoch更新条件:新的Leader被选举出来时,Leader Epoch会递增。新Leader开始使用新的Epoch号,并在其首次与Follower通信时告知Follower这一变化。

3. Follower初始化或重新同步

  • 情况:Follower在启动时或在与Leader失去联系后尝试重新同步。Follower会请求Leader从最后已知的偏移量开始发送消息。

  • Epoch更新条件:Follower在请求中包含其最后已知的Epoch和偏移量。如果这个Epoch小于Leader的当前Epoch,Leader会通知Follower正确的Epoch和从哪个偏移量开始复制数据。Follower根据这些信息更新自己的Epoch和日志。

4. 处理数据丢失和回滚

  • 情况:如果一个Follower因网络分区等问题暂时与集群失去联系,它可能会错过一些更新。当它重新加入集群时,它的日志可能包含Leader上不存在的消息(因为这些消息可能在故障期间由旧Leader接收,但后来被回滚)。

  • Epoch更新条件:当Follower重新加入集群并尝试与当前Leader同步时,如果发现自己的Epoch落后,它可能需要根据Leader的指示截断一些消息。这确保了所有副本在同一个Epoch内保持数据的一致性。

5. 消费者读取数据

  • 情况:消费者从Leader读取数据。为了保证消费者读取的数据是一致和可靠的,它们只能读取到高水位(HW)标记之前的消息。

  • Epoch更新条件:消费者读取数据不直接涉及Epoch的更新。然而,高水位的更新确保了只有在所有同步副本都已复制的消息才对消费者可见,间接体现了Epoch管理的一致性保证。

通过这种方式,Leader Epoch机制为Kafka提供了一种强大的机制,用于在发生副本更换、故障恢复以及数据同步过程中保持数据的一致性和可靠性。这个机制确保了Kafka能够处理复杂的分布式系统中的各种故障情况,同时保持高效的数据处理能力。

leader epoch解决数据丢失问题

  1. 从节点读取了主节点的两个消息,但是没来的及更新高水位。

    image-20240407151939325

  2. 从节点宕机

    image-20240407150927948

  3. 主节点宕机,从节点回复,此时从节点成为主节点,并且客户端向主节点写入信息,因为不按照高水位截断,我们按照leader epoch进行同步。此时新主节点应该是(1,2)。

    image-20240407151054266

leader epoch解决数据不一致问题

  1. BrokerA 和 BrokerB同时宕机

    image-20240407151316188

  2. BrokerA提前启动,选成主节点,这个时候客户端写入消息。我们主节点更新epoch为(1,1)。

    image-20240407151438240

  3. BrokerB启动,需要同步当前主节点消息,向主节点发送自己的epoch(0,0),主节点返回(1,1)。从节点知道自己落后了,截断位置1,并且在后面的epoch写入m3。

    image-20240407151856516