# 一.基础介绍

# 1.什么是 Zookeeper

ZooKeeper 是一个分布式的,开放源码的分布式应用程序协调服务,是 Google 的 Chubby 一个开源的实现,它是集群的管理者,监视着集群中各个节点的状态根据节点提交的反馈进行下一步合理操作。最终,将简单易用的接口和性能高效、功能稳定的系统提供给用户。

  • Zookeeper 是 Apache Hadoop 项目下的一个子项目,是一个树形目录服务。
  • Zookeeper 翻译过来就是 动物园管理员,他是用来管 Hadoop(大象)、Hive(蜜蜂)、Pig(小 猪)的管理员。简称 zk
  • Zookeeper 是一个分布式的、开源的分布式应用程序的协调服务。

# 2.Zookeeper 主要功能

Zookeeper 提供的主要功能包括:

  • 命名服务
  • 配置管理
  • 分布式锁
  • 集群管理

分布式锁:

image-20230515113258419

配置中心:

image-20230515113248456

# 3.Zookeeper 数据模型

  • ZooKeeper 是一个树形目录服务,其数据模型和 Unix 的文件系统目录树很类似,拥有一个层次化结构
  • 这里面的每一个节点都被称为: ZNode,每个节点上都会保存自己的数据和节点信息。
  • 节点可以拥有子节点,同时也允许少量(1MB)数据存储在该节点之下。
  • 节点可以分为四大类

image-20230515113516031

# 4.节点类型

节点可以分为四大类:

  • PERSISTENT 持久化节点

  • EPHEMERAL 临时节点 :-e

  • PERSISTENT_SEQUENTIAL 持久化顺序节点 :-s

  • EPHEMERAL_SEQUENTIAL 临时顺序节点 :-es

image-20230515113526789

# 5.端口号

如何 8080 被占用了,需要修改zoo.cfg配置文件

#admin.serverPort 默认占8080端口
admin.serverPort=8888

#客户端连接 Zookeeper 集群使用的监听端口号
2181

#集群内机器通讯使用(Leader 和 Follower 之间数据同步使用的端口号,Leader 监听此端口)
2888

#选举 leader 使用
3888
1
2
3
4
5
6
7
8
9
10
11

# 6.三种工作模式

  • 单机模式:存在单点故障
  • 集群模式:在多台机器上部署 Zookeeper 集群,适合线上环境使用。
  • 伪集群模式:在一台机器同时运行多个 Zookeeper 实例,仍然有单点故障问题,当然,其中配置的端口号要错开的,适合实验环境模拟集群使用。

# 二.简单使用

# 1.服务端命令

#启动 ZooKeeper 服务: 
./zkServer.sh start

#查看 ZooKeeper 服务状态: 
./zkServer.sh status

#停止 ZooKeeper 服务: 
./zkServer.sh stop 

#重启 ZooKeeper 服务: 
./zkServer.sh restart 
1
2
3
4
5
6
7
8
9
10
11

image-20230515113626988

# 2.客户端命令

ZooKeeper GUI 客户端 PrettyZoo

#连接服务端
./zkCli.sh –server ip:port

#断开连接
quit

#查看帮助
help

#显示指定目录下的节点
ls 目录

#创建节点
create /节点path value

#获取节点值
get /节点path

#设置节点值
set /节点path value

#删除节点
delete /节点path

#删除带有子节点的节点
deleteall /节点path
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

image-20230515113526789

# 3.常用节点命令

#创建临时节点
create -e /节点path value

#创建有序节点
create -s /节点path value

#创建临时有序节点
create -es /节点path value

#查询节点详细信息
ls –s /节点path
1
2
3
4
5
6
7
8
9
10
11

# 4.节点属性说明

ls –s /节点path
1
  • czxid:节点被创建的事务 ID
  • ctime: 创建时间
  • mzxid: 最后一次被更新的事务 ID
  • mtime: 修改时间
  • pzxid:子节点列表最后一次被更新的事务 ID
  • cversion:子节点的版本号
  • dataversion:数据版本号
  • aclversion:权限版本号
  • ephemeralOwner:用于临时节点,代表临时节点的事务 ID,如果为持久节点则为 0
  • dataLength:节点存储的数据的长度
  • numChildren:当前节点的子节点个数
#执行命令 ls -s /node

cZxid = 0x7c #节点被创建的事务ID
ctime = Sun Nov 14 00:04:48 CST 2021 #创建时间
mZxid = 0x7c #最后一次被更新的事务ID
mtime = Sun Nov 14 00:04:48 CST 2021 #修改时间
pZxid = 0x115e6 #子节点列表最后一次被更新的事务ID
cversion = 8 #子节点的版本号
dataVersion = 0 #数据版本号
aclVersion = 0 #权限版本号
ephemeralOwner = 0x0 #用于临时节点,代表临时节点的事务ID,如果为持久节点则为0
dataLength = 0 #节点存储的数据的长度
numChildren = 2 #当前节点的子节点个数
1
2
3
4
5
6
7
8
9
10
11
12
13

# 5.zookeeper 的持久化存储

ZooKeeperZAB 协议对每一个写请求,会在每个 ZooKeeper 节点上保持写一个事务日志,同时再加上定期的将内存数据镜像(Snapshot)到磁盘来保证数据的一致性和持久性,以及宕机之后的数据可恢复,这是非常好的特性,但是我们要问,在服务发现场景中,其最核心的数据 - 实时的健康的服务的地址列表是不需要数据持久化的。

需要持久化存储的地方在于一个完整的生产可用的注册中心,除了服务的实时地址列表以及实时的健康状态之外,还会存储一些服务的元数据信息,例如服务的版本,分组,所在的数据中心,权重,鉴权策略信息,service label等元信息,这些数据需要持久化存储,并且注册中心应该提供对这些元信息的检索的能力。

# 6.Container 容器节点

是在 3.5.3 版本新增的节点。当我们创建完 Container 容器节点后,如果该节点下没有任何子节点,那么 60 秒后,该容器节点就会被 zk 删除。

image-20230517195243768

# 7.TTL 节点

  • 可以指定节点的到期时间,到期后会被 zk 删除,需要通过系统配置extendedTypesEnabled=true开启

  • 操作演示:在 zoo.cfg 配置文件中,加入 extendedTypesEnabled=true

  • 重启 Zookeeper

image-20230517195313909

# 8.监听节点内容变化

  • 监听节点内容的变化,我们可以使用get -w [节点]

  • 操作演示:

  • 首先,开启 SessionA,创建一个 watchNode 节点,然后对这个节点进行 watch 监听

image-20230517195625296

修改节点数据,可以看到监听变化

image-20230517195643390

# 9.监听下一级子目录变化

  • 监听节点目录的变化,我们可以使用ls -w [节点]
  • 操作演示:首先,开启 SessionA,对节点 watchNode 进行监听

image-20230517195826744

再打开一个会话,在监听节点下创建一个新节点,可以看到如下变化

image-20230517195906479

# 10.监听所有子目录节点

  • 监听所有级别子目录变化,我么可以使用ls -w -R [节点]
  • 操作演示:首先,开启 SessionA,对节点 watchNode 进行监听

image-20230517200006713

  • 再开启 SessionB,创建节点/watchNode/sub1/sub3

image-20230517200023680

image-20230517200033168

# 三.底层原理

# 1.Curator 介绍

Curator 是 Apache ZooKeeper 的 Java 客户端库。Curator 项目的目标是简化 ZooKeeper 客户端的使用。

Curator 最初是 Netfix 研发的,后来捐献了 Apache 基金会,目前是 Apache 的顶级项目。

常见的 ZooKeeper Java API :

  • 原生 Java API
  • ZkClient
  • Curator

Curator 官网 (opens new window)

# 2.Curator API 常用操作

  • 建立连接
  • 添加节点
  • 删除节点
  • 修改节点
  • 查询节点
  • Watch 事件监听
  • 分布式锁实现

# 3.Watch 事件监听

  • ZooKeeper 允许用户在指定节点上注册一些 Watcher,并且在一些特定事件触发的时候,ZooKeeper 服务端会将事件通知到感兴趣的客户端上去,该机制是 ZooKeeper 实现分布式协调服务的重要特性。
  • ZooKeeper 中引入了 Watcher 机制来实现了发布/订阅功能能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者。
  • ZooKeeper 原生支持通过注册 Watcher 来进行事件监听,但是其使用并不是特别方便,需要开发人员自己反复注册 Watcher,比较繁琐。
  • Curator 引入了 Cache 来实现对 ZooKeeper 服务端事件的监听。

ZooKeeper 提供了三种 Watcher:

客户端监听某个节点的变化,也就是调用了 create、delete、setdata 方法的时候,就会触发 znode 上注册的对应时间,请求 watch 的客户端会收到 zk 的异步通知,

  • NodeCache : 只是监听某一个特定的节点
  • PathChildrenCache : 监控一个 ZNode 的子节点.
  • TreeCache : 可以监控整个树上的所有节点,类似于 PathChildrenCache 和 NodeCache 的组合

# 4.分布式锁

  • 在我们进行单机应用开发,涉及并发同步的时候,我们往往采用 synchronized 或者 Lock 的方式来解决多线程间的代码同步问题,这时多线程的运行都是在同一个 JVM 之下,没有任何问题。
  • 但当我们的应用是分布式集群工作的情况下,属于多 JVM 下的工作环境,跨 JVM 之间已经无法通过多线程的锁解决同步问题。
  • 那么就需要一种更加高级的锁机制,来处理种跨机器的进程之间的数据同步问题——这就是分布式锁。

image-20230515114859909

# 5.分布式锁实现

在目前分布式锁实现方案中,比较成熟、主流的方案有两种:

  • 基于 Redis 的分布式锁

  • 基于 ZooKeeper 的分布式锁

两种锁,分别适用的场景为:

  • 基于 ZooKeeper 的分布式锁,适用于高可靠(高可用)而并发量不是太大的场景;

  • 基于 Redis 的分布式锁,适用于并发量很大、性能要求很高的、而可靠性问题可以通过其他方案去弥补的场景。

image-20230515114916377

# 6.ZooKeeper 分布式锁原理

核心思想:当客户端要获取锁,则创建节点,使用完锁,则删除该节点。

  1. 客户端获取锁时,在 lock 节点下创建临时顺序节点
  2. 然后获取 lock 下面的所有子节点,客户端获取到所有的子节点之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁。使用完锁后,将该节点删除。
  3. 如果发现自己创建的节点并非 lock 所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件。
  4. 如果发现比自己小的那个节点被删除,则客户端的 Watcher 会收到相应通知,此时再次判断自己创建的节点是否是 lock 子节点中序号最小的,如果是则获取到了锁,如果不是则重复以上步骤继续获取到比自己小的一个节点并注册监听。

image-20230515115052339

# 7.Curator 实现分布式锁 API

在 Curator 中有五种锁方案:

  • InterProcessSemaphoreMutex:分布式排它锁(非可重入锁)
  • InterProcessMutex:分布式可重入排它锁
  • InterProcessReadWriteLock:分布式读写锁
  • InterProcessMultiLock:将多个锁作为单个实体管理的容器
  • InterProcessSemaphoreV2:共享信号量

# 8.zk 实现分布式锁

zk中锁的种类:

  • 读锁(共享锁):都可以读,上读锁的前提是没有上写锁。
  • 写锁:只有得到写锁的才能写,上写锁的前提是没有上任何锁。

举一个栗子:

  • 读锁:小王没有结婚,大家都可以找她约会,
  • 写锁:小王要跟小明结婚了,就不能跟其他人约会了,只能跟小明约会了

zk上读锁:

  • 创建一个临时序号节点,节点的数据是 read,表示读锁
  • 获取当前 zk 中序号比自己小的所有节点,判断最小节点是否是读锁:
    • 如果是读锁,则上锁成功。
    • 如果是写锁,则上锁失败,为最小节点设置监听,阻塞等待,zk 的 watch 机制会当最小节点发生变化时通知当前节点,再执行上诉的判断

zk上写锁:

  • 创建一个临时序号节点,节点的数据是 write,表示是写锁
  • 获取 zk 中所有的子节点,判断自己是否是最小的节点:如果是最小节点,则上锁成功
    • 如果不是,说明前面还有锁,则上锁失败,监听最小的节点
    • 如果最小节点有变化,则再执行

# 9.Zookeeper 集群角色

ZAB协议定义的四种节点状态:

  • Looking:选举状态
  • Following:Follower 节点所处的状态
  • Leading:Leader 节点所处的状态
  • Observing:观察者节点所处的状态

在 ZooKeeper 集群服中务中有三个角色:

角色 作用
Leader 事务请求的唯一调度者和处理者 (事务请求为除查询之外的请求)
Follower 处理非事务请求,参与 Leader 选举投票
Observer 处理非事务请求,不参与选举投票
  • Leader 领导者
    • 处理事务请求
    • 集群内部各服务器的调度者
  • Follower 跟随者
    • 处理客户端非事务请求,
    • 转发事务请求给 Leader 服务器
    • 参与 Leader 选举投票
  • Observer 观察者
    • 处理客户端非事务请求
    • 转发事务请求给 Leader 服务器
    • 不参与 Leader 选举投票

image-20230515123044939

# 10.ZAB 协议

ZAB(Zookeeper Atomic Broadcast)协议是 Zookeeper 中使用的一种分布式一致性协议,它是 Zookeeper 实现高可用性和一致性的核心基础。ZAB 协议主要用于维护分布式系统中多个副本之间的一致性,确保数据的可靠性和正确性。

ZAB 协议的主要功能包括:

  1. 原子广播(Atomic Broadcast): ZAB 协议的核心功能是实现原子广播,这意味着在 Zookeeper 集群中的所有节点都会按照相同的顺序接收相同的消息。这确保了数据在不同节点之间的一致性。ZAB 协议通过一个递增的事务 ID 来标识消息的顺序,每个消息都会被广播到所有节点并按照相同的顺序应用。

  2. Leader 选举: ZAB 协议支持在 Zookeeper 集群中选举一个 Leader 节点,Leader 负责处理所有的写操作,并将写操作广播给其他节点。当一个新的 Leader 选举出来时,ZAB 协议确保所有节点都能达成一致,避免数据损坏或不一致的情况。

  3. 状态同步: 当一个节点加入 Zookeeper 集群时,它需要从其他节点同步最新的状态数据。ZAB 协议确保新节点能够追赶上已有节点的状态,以便保持一致性。

  4. 故障恢复: 当集群中的节点出现故障或网络分区时,ZAB 协议帮助恢复数据的一致性。它确保在节点恢复后,新的消息不会被应用到过时的数据上,以避免不一致情况的发生。

ZAB 协议在 Zookeeper 中起到了维护数据一致性、支持高可用性、处理 Leader 选举和故障恢复等关键作用。通过这些功能,Zookeeper 能够提供可靠的分布式协调和管理服务,适用于构建分布式系统和应用。值得注意的是,随着 Kafka 的演进,KRaft 协议逐渐取代了 ZAB 作为 Kafka 的协调协议。

# 11.zk 的选举机制中的概念

选举中的概念:

  • SID:服务器 ID。用来唯一标识一台 ZooKeeper 集群中的机器,每台机器不能重复,和 myid 一致。
  • ZXID:事务 ID。ZXID 是一个事务 ID,用来标识一次服务器状态的变更。在某一时刻,集群中的每台机器的 ZXID 值不一定完全一致,这和 ZooKeeper 服务器对于客户端“更新请求”的处理逻辑有关。
  • Epoch:每个 Leader 任期的代号。没有 Leader 时同一轮投票过程中的逻辑时钟值是相同的。每投完一次票这个数据就会增加。
  • LOOKING:选举中,正在寻找 Leader
  • FOLLOWING:随从状态,同步 leader 状态,参与投票
  • Leader:领导者,差不多是 master,在 zookeeper 中只有 leader 才有写的权限,following 只有读的权限
  • OBSERVING:观察者状态,不同 leader 状态,不参与投票

zookeeper提供了三种选举策略:

  • LeaderElection
  • FastLeaderElection
  • AuthFastLeaderElection

# 12.Leader 选举过程

成为Leader的条件:

  1. epoch 最大的
  2. epoch 相等,选 zxid 最大的
  3. epochzxid 都相等,选 server_id 最大的(也就是 zoo.cfg 中配置的 myid 服务器 id)

Leader选举:

  • Serverid:服务器 ID,比如有三台服务器,编号分别是 1,2,3。编号越大在选择算法中的权重越大。
  • Zxid:数据 ID.服务器中存放的最大数据 ID.值越大说明数据越新,在选举算法中数据越新权重越大。
  • Leader 选举的过程中,如果某台 ZooKeeper 获得了超过半数的选票,则此 ZooKeeper 就可以成为 Leader 了。

集群上线的选举过程

4 台集群配置为例(一般出去 observer 奇数个数较好,容易满足过半的要求)

  1. 启动第一台它的状态为 Looking
  2. 当第二台启动的时候就开始进行选举。选票的格式(myid,zxid),zxid 为事务 id,这个服务器上发生的增删改都会使 zxid+1。选票大小:优先 zxid 大的,然后才是 myid 大的
  3. 选举:每个节点生成一张自己的选票,将选票投给其他节点,举个列子,第一台的选票为(1,0)第二台的选票为(2,0),然后第一台将选票投给第二台,第二台投给第一台,
  4. 第一台里就有(1,0)(2,0)第二台(2,0)(1,0),将各自最大的选票投给自己的投票箱,这时集群中有 4 台机器,除开 observer 有 3 台,而投票箱里只有一票,不满足半数以上,
  5. 开始第二轮选票:将上一轮最大的选票更新为自己的选票,并投给其他节点,所以都是(2,0) (2,0),将(2,0)投到自己的投票箱,这时候选票箱就有 2 张(2,0)了,2 号机票数过半,Leader 为第二台,选举结束,所以说按顺序启动,始终都是第二台是 Leader。
  6. 第三台启动发现集群已经选举出了 Leader,于是把自己作为 Follower。

image-20230515122856931

# 13.崩溃恢复

崩溃恢复时的 Leader 选举:

在整个服务框架启动过程中,如果 Leader 服务器出现网络中断、崩溃退出或重启等异常情况,ZAB 协议就会进入崩溃恢复模式。同时选举出新的 Leader 服务器。

  1. 集群建立连接后,Leader 会发送 ping 格式的空数据维持心跳(也是 BIO),集群中 Follower 会周期性的去和 Leader 建立的 socket 连接里面去读取 ping 格式的空数据,读取不到数据时 Leader 就挂掉了,就会从 Follower 状态重新进入 Looking 状态
  2. 其他节点也如此,然后重新进入选举状态。此时 Leader 还没选举出来,不能对外提供服务。

# 14.主从服务器之间的数据同步

客户端向主节点写数据的情况:

  1. 主节点先把数据先到自己的数据文件中,并给自己返回一个 ACK

  2. Leader 把数据广播给 Follower,Follower 将数据写到本地的数据文件中

  3. 从节点返回 ACK 给 Leader

  4. Leader 收到超过集群半数的 ACK 就广播 commit 给 Follower

  5. 从节点收到 commit 后将数据文件中的数据写到内存中(二阶段提交,先到数据文件再到内存中)

# 15.消息广播过程

主从同步就是使用到了消息广播的机制

  1. 客户端发起写请求
  2. Leader 将客户端请求信息转化为事务 Proposal 提议,同时为每个 Proposal 分配一个事务 ID(Zxid)
  3. Leader 为每个 Follower 单独分配一个 FIFO 的队列,将需要广播的 Proposal 依次放入到队列中
  4. Follower 接收到 Proposal 后,首先将其以事务日志的方式写入到本地磁盘中,写入成功后给 Leader 反馈一个 ACK 响应
  5. Leader 接收到半数以上 Follower 的 ACK 响应后,即认为消息发送成功,可以发送 Commit 消息
  6. Leader 向所有 Follower 广播 Commit 消息,同时自身也会完成事务提交。Follower 接收到 Commit 消息后也会完成事务的提交

# 16.ZAB 数据一致性

ZAB 协议规定了 如果⼀个事务 Proposal 在⼀台机器上被处理成功,那么应该在所有的机器上都被处理成功,哪怕机器出现故障崩溃。 针对这些情况 ZAB 协议需要保证以下条件:

  • 已经在 Leader 服务器上提交的事务最终被所有服务器都提交。

    假设⼀个事务在 Leader 服务器上被提交了,并且已经得到过半 Folower 服务器的 Ack 反馈,但是在它将 Commit 消息发送给所有 Follower 机器之前,Leader 服务器挂了

  • 丢弃只在 Leader 服务器上被提出(未提交)的事务。

    假设初始的 Leader 服务器 Server1 在提出了⼀个事务 Proposal3 之后就崩溃退出 了,从⽽导致集群中的其他服务器都没有收到这个事务 Proposal3。于是,当 Server1 恢复过来再次加 ⼊到集群中的时候,ZAB 协议需要确保丢弃 Proposal3 这个事务。

综上所述,ZAB 的选举出来的 Leader 必须满足以下条件:

能够确保提交已经被 Leader 提交的事务 Proposal,同时丢弃已经被跳过的事务 Proposal。即:

  1. 新选举出来的 Leader 不能包含未提交的 Proposal。
  2. 新选举的 Leader 节点中含有最大的 zxid

# 17.Zookeeper 中的 NIO 与 BIO

  • NIO

    • 用于被客户端连接的 2181 端口,使用的是 NIO 模式与客户端连接。

    • 客户端开启 Watch 时,也使用 NIO,等待 Zookeeper 服务器的回调。

  • BIO

    • 集群在选举时,多个节点之间的投票通道端口,使用 BIO 进行通信。

# 四.搭建单机 ZK

下载地址 (opens new window)

历史版本 (opens new window)

环境准备:

ZooKeeper 服务器是用 Java 创建的,它运行在 JVM 之上。需要安装 JDK 7 或更高版本。

# 1.安装步骤

#打开 opt目录
cd /opt

#创建zooKeeper目录
mkdir zookeeper

#在mac的终端上传zookeeper到服务器,不是服务器的终端
scp /Users/qinyingjie/Downloads/apache-zookeeper-3.5.6-bin.tar.gz root@47.119.171.59:/opt/zookeeper

#搭建集群使用
scp /opt/zookeeper/apache-zookeeper-3.5.6-bin.tar.gz root@43.139.90.182:/opt/zookeeper

#进入目录
cd /opt/zookeeper

#解压文件
tar -zxvf apache-zookeeper-3.5.6-bin.tar.gz
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 2.修改配置

配置 zoo.cfg:

进入到 conf 目录拷贝一个 zoo_sample.cfg 并完成配置

#进入到conf目录
cd /opt/zookeeper/apache-zookeeper-3.5.6-bin/conf/
#拷贝
cp  zoo_sample.cfg  zoo.cfg
1
2
3
4

修改 zoo.cfg:

#打开目录
cd /opt/zookeeper/

#创建zooKeeper存储目录
mkdir  zkdata

#修改zoo.cfg
vim /opt/zookeeper/apache-zookeeper-3.5.6-bin/conf/zoo.cfg

#修改存储目录
dataDir=/opt/zookeeper/zkdata

#admin.serverPort 默认占8080端口
admin.serverPort=8888
1
2
3
4
5
6
7
8
9
10
11
12
13
14

image-20231215010819178

# 3.启动 ZooKeeper

#进入启动目录
cd /opt/zookeeper/apache-zookeeper-3.5.6-bin/bin/

#启动
./zkServer.sh  start
sh zkServer.sh start

#停止
./zkServer.sh stop
sh zkServer.sh stop

#重启
./zkServer.sh restart
sh zkServer.sh restart

#查看状态
./zkServer.sh status
sh zkServer.sh status
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

image-20231215010734206

看到上图表示 ZooKeeper 成功启动

# 4.查看状态

./zkServer.sh status
sh zkServer.sh status
1
2

zookeeper 启动成功。standalone 代表 zk 没有搭建集群,现在是单节点

image-20230514180546751

zookeeper 没有启动

image-20231215010755080

# 5.查问题

image-20230514182700028

cat zookeeper-root-server-iZwz9hmmq7muwhv31su0jeZ.out
1

# 6.启动客户端

#连接本机
./zkCli.sh

#连接指定机器
./zkCli.sh -server [ip地址]:[端口号-默认为2181]
1
2
3
4
5

进入客户端后可以执行 zookeeper 命令,创建节点,查看节点,更新节点等

# 五.搭建 Zookeeper 集群

# 1.搭建要求

真实的集群是需要部署在不同的服务器上的,但是在我们测试时同时启动很多个虚拟机内存会吃不消,所以我们通常会搭建伪集群,也就是把所有的服务都搭建在一台虚拟机上,用端口进行区分。

我们这里要求搭建一个三个节点的 Zookeeper 集群(伪集群)。

# 2.准备工作

重新部署一台服务器作为我们搭建集群的测试服务器。

Zookeeper 压缩包上传到服务器

将 Zookeeper 解压 ,建立/usr/local/zookeeper-cluster 目录,将解压后的 Zookeeper 复制到以下三个目录

/usr/local/zookeeper-cluster/zookeeper-1

/usr/local/zookeeper-cluster/zookeeper-2

/usr/local/zookeeper-cluster/zookeeper-3

#创建目录
mkdir /usr/local/zookeeper-cluster

#将zk解压到3个目录
cd /opt/zookeeper/apache-zookeeper-3.5.6-bin
cp -r  apache-zookeeper-3.5.6-bin /usr/local/zookeeper-cluster/zookeeper-1
cp -r  apache-zookeeper-3.5.6-bin /usr/local/zookeeper-cluster/zookeeper-2
cp -r  apache-zookeeper-3.5.6-bin /usr/local/zookeeper-cluster/zookeeper-3

#创建data目录
mkdir /usr/local/zookeeper-cluster/zookeeper-1/data
mkdir /usr/local/zookeeper-cluster/zookeeper-2/data
mkdir /usr/local/zookeeper-cluster/zookeeper-3/data

#zoo_sample.cfg改名为zoo.cfg
mv  /usr/local/zookeeper-cluster/zookeeper-1/conf/zoo_sample.cfg  /usr/local/zookeeper-cluster/zookeeper-1/conf/zoo.cfg
mv  /usr/local/zookeeper-cluster/zookeeper-2/conf/zoo_sample.cfg  /usr/local/zookeeper-cluster/zookeeper-2/conf/zoo.cfg
mv  /usr/local/zookeeper-cluster/zookeeper-3/conf/zoo_sample.cfg  /usr/local/zookeeper-cluster/zookeeper-3/conf/zoo.cfg
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#修改端口号和数据目录
vim /usr/local/zookeeper-cluster/zookeeper-1/conf/zoo.cfg
clientPort=2181
admin.serverPort=8881
dataDir=/usr/local/zookeeper-cluster/zookeeper-1/data

#修改端口号和数据目录
vim /usr/local/zookeeper-cluster/zookeeper-2/conf/zoo.cfg
clientPort=2182
admin.serverPort=8882
dataDir=/usr/local/zookeeper-cluster/zookeeper-2/data

#修改端口号和数据目录
vim /usr/local/zookeeper-cluster/zookeeper-3/conf/zoo.cfg
clientPort=2183
admin.serverPort=8883
dataDir=/usr/local/zookeeper-cluster/zookeeper-3/data
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 3.配置集群

在每个 zookeeper 的 data 目录下创建一个 myid 文件,内容分别是 1、2、3 。这个文件就是记录每个服务器的 ID

echo 1 >/usr/local/zookeeper-cluster/zookeeper-1/data/myid
echo 2 >/usr/local/zookeeper-cluster/zookeeper-2/data/myid
echo 3 >/usr/local/zookeeper-cluster/zookeeper-3/data/myid
1
2
3

在每一个 zookeeper 的 zoo.cfg 配置客户端访问端口(clientPort)和集群服务器 IP 列表。

集群服务器 IP 列表如下

#节点1
vim /usr/local/zookeeper-cluster/zookeeper-1/conf/zoo.cfg
server.1=172.22.30.207:2881:3881
server.2=172.22.30.207:2882:3882
server.3=172.22.30.207:2883:3883

#节点2
vim /usr/local/zookeeper-cluster/zookeeper-2/conf/zoo.cfg
server.1=172.22.30.207:2881:3881
server.2=172.22.30.207:2882:3882
server.3=172.22.30.207:2883:3883

#节点3
vim /usr/local/zookeeper-cluster/zookeeper-3/conf/zoo.cfg
server.1=172.22.30.207:2881:3881
server.2=172.22.30.207:2882:3882
server.3=172.22.30.207:2883:3883
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

解释:server.服务器 ID=服务器 IP 地址:服务器之间通信端口:服务器之间投票选举端口

# 4.启动集群

启动集群就是分别启动每个实例。

#启动
/usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh start
/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh start
/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh start


#停止
/usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh stop
/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh stop
/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh stop
1
2
3
4
5
6
7
8
9
10

img

启动后我们查询一下每个实例的运行状态

/usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh status
/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh status
/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh status
1
2
3

先查询第一个服务

img

Mode 为 follower 表示是跟随者(从)

再查询第二个服务 Mod 为 leader 表示是领导者(主)

img

查询第三个为跟随者(从)

img

# 5.模拟集群异常

从服务器挂掉:

首先我们先测试如果是从服务器挂掉,会怎么样

把 3 号服务器停掉,观察 1 号和 2 号,发现状态并没有变化

/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh stop

/usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh status
/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh status
1
2
3
4

img

由此得出结论,3 个节点的集群,从服务器挂掉,集群正常

从服务器全部挂掉:

我们再把 1 号服务器(从服务器)也停掉,查看 2 号(主服务器)的状态,发现已经停止运行了。

/usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh stop

/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh status
1
2
3

img

由此得出结论,3 个节点的集群,2 个从服务器都挂掉,主服务器也无法运行。因为可运行的机器没有超过集群总数量的半数。

启动一个从服务器:

我们再次把 1 号服务器启动起来,发现 2 号服务器又开始正常工作了。而且依然是领导者。

/usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh start

/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh status
1
2
3

img

主服务器挂掉:

我们把 3 号服务器也启动起来,把 2 号服务器停掉,停掉后观察 1 号和 3 号的状态。

/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh start
/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh stop

/usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh status
/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh status
1
2
3
4
5

img

发现新的 leader 产生了~

由此我们得出结论,当集群中的主服务器挂了,集群中的其他服务器会自动进行选举状态,然后产生新得 leader

重启原来的主服务器:

我们再次测试,当我们把 2 号服务器重新启动起来启动后,会发生什么?2 号服务器会再次成为新的领导吗?我们看结果

/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh start

/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh status
/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh status
1
2
3
4

img

我们会发现,2 号服务器启动后依然是跟随者(从服务器),3 号服务器依然是领导者(主服务器),没有撼动 3 号服务器的领导地位。

由此我们得出结论,当领导者产生后,再次有新服务器加入集群,不会影响到现任领导者。

# 六.Java Api

代码地址 (opens new window)

# 1.配置信息

pom.xml

<dependencies>
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
        <version>3.1</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.83</version>
    </dependency>
    <dependency>
        <groupId>org.jsoup</groupId>
        <artifactId>jsoup</artifactId>
        <version>1.15.3</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.10</version>
        <scope>test</scope>
    </dependency>
    <!--curator-->
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>4.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>4.0.0</version>
    </dependency>
    <!--日志-->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.21</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.21</version>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.1</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
    </plugins>
</build>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58

# 2.增删改查

# 1.创建节点

  • @Before 和 @After 的使用
  • 建立 zookeeper 连接
//创建有数据的节点
public class CuratorTest_01_Create {

    private CuratorFramework client;

    /**
     * 建立连接
     */
    @Before
    public void testConnect() {
        /*
         *
         * @param connectString       连接字符串。zk server 地址和端口 "43.139.90.182:2181,43.139.90.182:2182,43.139.90.182:2183"
         * @param sessionTimeoutMs    会话超时时间 单位ms
         * @param connectionTimeoutMs 连接超时时间 单位ms
         * @param retryPolicy         重试策略
         */
       /* //重试策略
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10);
        //1.第一种方式
        CuratorFramework client = CuratorFrameworkFactory.newClient("43.139.90.182:2181,43.139.90.182:2182,43.139.90.182:2183",
                60 * 1000, 15 * 1000, retryPolicy);*/
        //重试策略
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
        //2.第二种方式
        //CuratorFrameworkFactory.builder();
        client = CuratorFrameworkFactory.builder()
                .connectString("43.139.90.182:2181,43.139.90.182:2182,43.139.90.182:2183")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retryPolicy)
                .namespace("itheima")
                .build();
        //开启连接
        client.start();
    }

    @After
    public void close() {
        if (client != null) {
            client.close();
        }
    }

    /**
     * 创建节点:create 持久 临时 顺序 数据
     * 1. 基本创建 :create().forPath("")
     * 2. 创建节点 带有数据:create().forPath("",data)
     * 3. 设置节点的类型:create().withMode().forPath("",data)
     * 4. 创建多级节点  /app1/p1 :create().creatingParentsIfNeeded().forPath("",data)
     */
    @Test
    public void testCreate() throws Exception {
        //默认是持久无序节点
        //如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
        String path = client.create().forPath("/app2", "hehe".getBytes());
        System.out.println(path);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
//创建无数据的节点
@Test
public void testCreate2() throws Exception {
    //基本创建
    //如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
    String path = client.create().forPath("/app1");
    System.out.println(path);
}
1
2
3
4
5
6
7
8
//创建临时节点
@Test
public void testCreate3() throws Exception {
    //设置节点的类型
    //默认类型:持久化
    String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
    System.out.println(path);
}
1
2
3
4
5
6
7
8
//创建多级节点
@Test
public void testCreate4() throws Exception {
  //4. 创建多级节点  /app1/p1
  //creatingParentsIfNeeded():如果父节点不存在,则创建父节点
  String path = client.create().creatingParentsIfNeeded().forPath("/app4/p1");
  System.out.println(path);
}
1
2
3
4
5
6
7
8

# 2.获取节点数据

//查询节点数据
public class CuratorTest_05_Get {

    @Autowired
    private CuratorFramework client;

    /**
     * 查询节点:
     * 1. 查询数据:get: getData().forPath()
     * 2. 查询子节点: ls: getChildren().forPath()
     * 3. 查询节点状态信息:ls -s:getData().storingStatIn(状态对象).forPath()
     */
    @Test
    public void testGet1() throws Exception {
        //1. 查询数据:get
        byte[] data = client.getData().forPath("/app1");
        System.out.println(new String(data));
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//查询子节点数据
@Test
public void testGet2() throws Exception {
  // 2. 查询子节点: ls
  List<String> path = client.getChildren().forPath("/");
  System.out.println(path);
}
1
2
3
4
5
6
7
//查询节点状态
@Test
public void testGet3() throws Exception {
    Stat status = new Stat();
    System.out.println(status);
    //3. 查询节点状态信息:ls -s
    client.getData().storingStatIn(status).forPath("/app1");
    System.out.println(status);
}
1
2
3
4
5
6
7
8
9

# 3.设置节点数据

//修改数据
public class CuratorTest_08_Set {

  	@Autowired
    private CuratorFramework client;

    /**
     * 修改数据
     * 1. 基本修改数据:setData().forPath()
     * 2. 根据版本修改: setData().withVersion().forPath()
     * * version 是通过查询出来的。目的就是为了让其他客户端或者线程不干扰我。
     *
     * @throws Exception
     */
    @Test
    public void testSet() throws Exception {
        client.setData().forPath("/app1", "itcast".getBytes());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//根据版本号设置数据
@Test
public void testSetForVersion() throws Exception {
    Stat status = new Stat();
    //3. 查询节点状态信息:ls -s
    client.getData().storingStatIn(status).forPath("/app1");
    int version = status.getVersion();//查询出来的 3
    System.out.println(version);
    client.setData().withVersion(version).forPath("/app1", "hehe".getBytes());
}
1
2
3
4
5
6
7
8
9
10

# 4.删除节点

//删除节点
public class CuratorTest_10_Delete {

    @Autowired
    private CuratorFramework client;

    /**
     * 删除节点: delete deleteall
     * 1. 删除单个节点:delete().forPath("/app1");
     * 2. 删除带有子节点的节点:delete().deletingChildrenIfNeeded().forPath("/app1");
     * 3. 必须成功的删除:为了防止网络抖动。本质就是重试。  client.delete().guaranteed().forPath("/app2");
     * 4. 回调:inBackground
     *
     * @throws Exception
     */
    @Test
    public void testDelete() throws Exception {
        // 1. 删除单个节点
        client.delete().forPath("/app1");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//删除带有子节点的节点
@Test
public void testDelete2() throws Exception {
  //2. 删除带有子节点的节点
  client.delete().deletingChildrenIfNeeded().forPath("/app4");
}
1
2
3
4
5
6
//必须成功的删除
@Test
public void testDelete3() throws Exception {
    //3. 必须成功的删除
    client.delete().guaranteed().forPath("/app2");
}
1
2
3
4
5
6
//回调
@Test
public void testDelete4() throws Exception {
  //4. 回调
  client.delete().guaranteed().inBackground(new BackgroundCallback() {
      @Override
      public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
          System.out.println("我被删除了~");
          System.out.println(event);
      }
  }).forPath("/app1");
}
1
2
3
4
5
6
7
8
9
10
11
12

# 3.Watcher

# 1.NodeCache

public class Watcher_01_NodeCache {

    @Autowired
    private CuratorFramework client;

    /**
     * 演示 NodeCache:给指定一个节点注册监听器
     */
    @Test
    public void testNodeCache() throws Exception {
        //1. 创建NodeCache对象
        final NodeCache nodeCache = new NodeCache(client, "/app1");
        //2. 注册监听
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("节点变化了~");
                //获取修改节点后的数据
                byte[] data = nodeCache.getCurrentData().getData();
                System.out.println(new String(data));
            }
        });
        //3. 开启监听.如果设置为true,则开启监听是,加载缓冲数据
        nodeCache.start(true);
        while (true) {
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

# 2.PathChildrenCache

public class Watcher_02_PathChildrenCache {

    @Autowired
    private CuratorFramework client;

    /**
     * 演示 PathChildrenCache:监听某个节点的所有子节点们
     */
    @Test
    public void testPathChildrenCache() throws Exception {
        //1.创建监听对象
        PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/app2", true);
        //2. 绑定监听器
        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                System.out.println("子节点变化了~");
                System.out.println(event);
                //监听子节点的数据变更,并且拿到变更后的数据
                //1.获取类型
                PathChildrenCacheEvent.Type type = event.getType();
                //2.判断类型是否是update
                if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
                    System.out.println("数据变了!!!");
                    byte[] data = event.getData().getData();
                    System.out.println(new String(data));
                }
            }
        });
        //3. 开启
        pathChildrenCache.start();
        while (true) {
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

# 3.TreeCache

public class Watcher_03_TreeCache {

    @Autowired
    private CuratorFramework client;

    /**
     * 演示 TreeCache:监听某个节点自己和所有子节点们
     */
    @Test
    public void testTreeCache() throws Exception {
        //1. 创建监听器
        TreeCache treeCache = new TreeCache(client, "/app2");
        //2. 注册监听
        treeCache.getListenable().addListener(new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                System.out.println("节点变化了");
                System.out.println(event);
            }
        });
        //3. 开启
        treeCache.start();
        while (true) {
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

# 4.分布式锁

# 1.买票

public class Ticket12306 implements Runnable {

    /**
     * 数据库的票数
     */
    private int tickets = 10;
    private InterProcessMutex lock;

    public Ticket12306() {
        //重试策略
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
        //2.第二种方式
        CuratorFramework client = CuratorFrameworkFactory.builder()
                //集群节点
                .connectString("43.139.90.182:2181,43.139.90.182:2182,43.139.90.182:2183")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retryPolicy)
                .build();
        //开启连接
        client.start();
        lock = new InterProcessMutex(client, "/lock");
    }

    @Override
    public void run() {
        while (true) {
            //获取锁
            try {
                lock.acquire(3, TimeUnit.SECONDS);
                if (tickets > 0) {
                    System.out.println(Thread.currentThread() + ":" + tickets);
                    Thread.sleep(100);
                    tickets--;
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //释放锁
                try {
                    lock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

# 2.测试类

public class Ticket12306LockTest {
    public static void main(String[] args) {
        Ticket12306 ticket12306 = new Ticket12306();
        //创建客户端
        Thread t1 = new Thread(ticket12306, "携程");
        Thread t2 = new Thread(ticket12306, "飞猪");
        t1.start();
        t2.start();
    }
}
1
2
3
4
5
6
7
8
9
10

# 七.常见问题

# 1.端口占用

在使用 zookeeper 3.6 之后的版本,开启服务器,zk 会自动占用 8080 端口,而后端服务器大部分都需要使用 8080 端口,因此需要 zk 的配置文件即可。

在 zk conf 目录里面,修改 zoo.cfg,在其中加上如下配置,重启 zk 服务器,即可。

# admin.serverPort 默认占8080端口
admin.serverPort=8888
1
2

image-20230514182725836

# 2. CAP 定理

CAP 定理,简单来说就是分布式系统不可能同时满足三个要素

  • Consistency 一致性
  • Availability 可用性
  • Partition Tolerance 分区容错性

image-20230515202612860

CAP场景:

类型 场景
CA(不现实) 单点集群,满足—致性,可用性的系统,通常在可扩展性上不太强大。
CP 满足一致性,分区容忍必的系统,通常性能不是特别高。
AP 满足可用性,分区容忍性的系统,通常可能对一致性要求低一些。

# 3.BASE 理论

eBay 的架构师 Dan Pritchett 源于对⼤规模分布式系统的实践总结,在 ACM 上发表⽂章提出 BASE 理论,BASE 理论是对 CAP 理论的延伸,核⼼思想是即使⽆法做到强⼀致性(Strong Consistency,CAP 的⼀致性就是强⼀致性),但应⽤可以采⽤适合的⽅式达到最终⼀致性(Eventual Consitency)。

  • 基本可⽤(Basically Available):基本可⽤是指分布式系统在出现故障的时候,允许损失部分可⽤性,即保证核⼼可⽤。电商⼤促时,为了应对访问量激增,部分⽤户可能会被引导到降级⻚⾯,服务层也可能只提供降级服务。这就是损失部分可⽤性的体现。
  • 软状态(Soft State):软状态是指允许系统存在中间状态,⽽该中间状态不会影响系统整体可⽤性。分布式存储中⼀般⼀份数据⾄少会有三个副本,允许不同节点间副本同步的延时就是软状态的体现。mysql replication 的异步复制也是⼀种体现。
  • 最终⼀致性(Eventual Consistency):最终⼀致性是指系统中的所有数据副本经过⼀定时间后,最终能够达到⼀致的状态。弱⼀致性和强⼀致性相反,最终⼀致性是弱⼀致性的⼀种特殊情况。

# 4.注册中心比较

image-20230517194103993

zookeeper 和 eureka 分别是注册中心 CP AP 的两种的实践。他们都提供服务注册中心的功能。建议使用 AP。不强求数据的强一致性,达成数据的最终一致性。

eureka AP:

eureka 保证了可用性,实现最终一致性。

Eureka 各个节点都是平等的,几个节点挂掉不会影响正常节点的工作,剩余的节点依然可以提供注册和查询服务。而 Eureka 的客户端在向某个 Eureka 注册或时如果发现连接失败,则会自动切换至其它节点,只要有一台 Eureka 还在,就能保证注册服务可用(保证可用性),只不过查到的信息可能不是最新的(不保证强一致性),其中说明了,eureka 是不满足强一致性,但还是会保证最终一致性

zookeeper CP:

zookeeper 在选举 leader 时,会停止服务,直到选举成功之后才会再次对外提供服务,这个时候就说明了服务不可用,但是在选举成功之后,因为一主多从的结构,zookeeper 在这时还是一个高可用注册中心,只是在优先保证一致性的前提下,zookeeper 才会顾及到可用性

最后总结一下两者的区别:

比较点 Zookeeper Eureka
设计原则 CP AP
优点 数据最终一致 服务高可用
缺点 选举 leader 过程中集群不可用 服务节点间的数据可能不一致
适用场景 对数据一致性要求较高 对注册中心服务可用性要求较高

三个注册中心比较:

组件名 语言实现 CAP 对外暴露接口 SpringCloud 集成
Eureka Java AP(绝情与否可配置) 可配支持 HTTP
zookeeper Go CP(绝情) 支持 HTTP/DNS
consul Java CP(绝情) 支持客户端 已集成

# 5.羊群效应

根据 zookeeper 的分布式锁的理论,只要有一个节点发生变化,就会触发其他节点的 watch 监听,这样的话对 zk 的压力非常大。可以调成链式监听,并发是顺位的,只需要监听前一位即可。(curator 源码帮我们解决掉了这个问题)

# 6.zab 和 paxos

两者关系:

  1. 都存在一个类似 Leader 进程的角色,由其负责协调多个 Follower 进程的运行
  2. Leader 进程都会等待超过半数的 Follower 作出正确的反馈后,才会将一个提议进行提交(过半原则
  3. 在 ZAB 中,每个 Proposal 中都包含了一个 epoch 值,用来代表当前 Leader 周期,在 Paxos 中同样存在这样的一个表示,名字为 Ballot。

Paxos 算法:通过投票来对写操作进行全局编号,同一时刻,只有一个写操作被批准,同时并发的写操作要去争取选票,只有获得过半数选票的写操作才会被批准(所以永远只会有一个写操作得到批准),其他的写操作竞争失败只好再发起一轮投票,就这样,所有写操作都被严格编号排序。编号严格递增,当一个节点接受了一个编号为 100 的写操作,之后又接受到编号为 99 的写操作(因为网络延迟等很多不可预见原因),它马上能意识到自己数据不一致了,自动停止对外服务并重启同步过程。任何一个节点挂掉都不会影响整个集群的数据一致性(总 2n+1 台,除非挂掉大于 n 台)

区别:

  1. Paxos 算法中,新选举产生的主进程会进行两个阶段的工作;第一阶段称为读阶段:新的主进程和其他进程通信来收集主进程提出的提议,并将它们提交。第二阶段称为写阶段:当前主进程开始提出自己的提议。
  2. ZAB 协议在 Paxos 基础上添加了同步阶段,此时,新的 Leader 会确保存在过半的 Follower 已经提交了之前 Leader 周期中的所有事物 Proposal。这一同步阶段的引入,能够有效保证,Leader 在新的周期中提出事务 Proposal 之前,所有的进程都已经完成了对之前所有事务 Proposal 的提交。

总的来说,ZAB 协议和 Paxos 算法的本质区别在于两者的设计目的不一样:ZAB 协议主要用于构建一个高可用的分布式数据主备系统,而 Paxos 算法则用于构建一个分布式的一致性状态机系统。

# 7.Raft 算法

raft 官网 (opens new window)

raft 动画 (opens new window)

raft 视频讲解 (opens new window)

raft 经典场景分析 (opens new window)

在过去十年中,Paxos 是一致性算法的主流,大多数一致性算法的实现都是基于 Paxos 或受其影响,Paxos 已成为用于教授学生一致性相关知识的主要工具。但不幸的是,Paxos 实在是太难以理解,着手寻找一个新的一致性算法,可以为系统开发和教学提供更好的帮助。 Raft 的一致性算法

raft中的概念:

  • 客户端只能从主节点写数据,从节点里读数据。
  • 一个 Raft 集群包含若干个服务器节点,一般为 5 个,可以容忍 2 个节点失效
  • leader、follower 或者 candidate 三种状态
  • 集群中只有一个 leader ,并且其他的节点全部都是 follower
  • Follower 都是被动的:他们不会发送任何请求,只是简单的响应来自 leader 和 candidate 的请求。
  • Leader 处理所有的客户端请求
  • 日志复制(Log Replication):Leader 接收来自客户端的请求并将其以日志条目的形式复制到集群中的其它节点,并且强制要求其它节点的日志和自己保持一致;
  • Raft 算法将时间划分成为任意不同长度的任期(term)

三个角色之间的转换关系,大概如下:

  • Follower—> Candidate : 时间片用完,进入选举

  • Candidate—>Leader : 获得集群过半的投票

  • Candidate—>Follower : 发现集群里面已经有 Leader 了(因为集群在选举出 Leader 之后,Leader 会向集群群发消息),或者进入新的任期

  • Leader—>Follower : 发现具有更高任期(term)的服务器,辞去领导者的职务

Leader选举流程: 所有服务器节点初始状态都是 Follwer 状态,一个服务器节点只要能从 leader 或 candidate 处接收到有效的 RPC 就一直保持 follower 状态。

有两种超时时间 timeout 来控制选举,

  • 一种是 election timeout,是节点从 followers 状态转换到 candidate 状态的等待时间,随机初始化值为 150ms ~ 300ms

  • 一种是 heartbeat timeout,用于 Leader 和 follower 之间的保活。

各种存在的情况:

初始化时,所有 follower 都在等待成为 candidate 的场景

  • 当某个 follower 的 election timeout 先到达后,follower 先增加自己的当前任期号并且转换到 candidate 状态
  • 然后投票给自己并且并行地向集群中的其他服务器节点发送 RequestVote RPC(让其他服务器节点投票给它)

获得多数派投票成为 leader

  • 当一个 candidate 节点 获得多数派的 followers 的投票,它就赢得了这次选举并成为 leader 。然后它会 follower 节点发送心跳消息来确定自己的地位并阻止新的选举。

接收到 leader 的 Append Entries 消息(心跳包)

  • 在等待投票期间,candidate 可能会收到另一个声称自己是 leader 的服务器节点发来的 AppendEntries RPC 。如果这个 leader 的任期号(包含在 RPC 中)不小于 candidate 当前的任期号,那么 candidate 会承认该 leader 的合法地位并回到 follower 状态。 如果 RPC 中的任期号比自己的小,那么 candidate 就会拒绝这次的 RPC 并且继续保持 candidate 状态。

同时存在两个 candidate,并且获得选票相同

  • 如果有两个 follower 同时成为 candidate ,那么选票可能会被瓜分以至于没有 candidate 赢得过半的投票。
  • 当这种情况发生时,每一个候选人都会超时,然后通过增加当前任期号来开始一轮新的选举。然而,如果没有其他机制的话,该情况可能会无限重复。
  • Raft 算法使用随机选举超时时间的方法来解决这个问题,每个 candidate 在开始一次选举的时候会重置一个随机的选举超时时间,然后一直等待直到选举超时;这样减小了在新的选举中再次发生选票瓜分情况的可能性。

# 8.说说 zxid?

在 Apache ZooKeeper 中,zxid 是一种称为 "ZooKeeper Transaction Id" 的概念,用于标识每个写操作(数据变更)的唯一事务编号。zxid 在 ZooKeeper 集群中起着重要作用,用于实现数据的一致性顺序性

具体来说,zxid 由两部分组成:

  1. Epoch(时期):高 32 位用于表示 Epoch,表示 ZooKeeper 集群的逻辑时期。Epoch 的作用是在 ZooKeeper 重启后重新生成 zxid,从而避免使用相同的 zxid,确保一致性。

  2. Counter(计数器):低 32 位用于表示 Counter,表示在同一个 Epoch 内生成的事务的计数器。每个写操作(如创建、更新、删除节点)都会递增计数器,从而保证每个事务具有唯一的 zxid

zxid 在 ZooKeeper 中具有以下重要的作用:

  1. 数据一致性:ZooKeeper 使用 zxid 来维护数据的一致性。在数据变更时,ZooKeeper 使用 zxid 来确保数据的变更按照正确的顺序在所有节点上复制和应用,从而保证数据的一致性。

  2. Leader 选举:在 ZooKeeper 集群中,Leader 的选举也会使用 zxid 来确定谁应该成为新的 Leader。具有更大 zxid 的节点有更高的优先级成为 Leader。

  3. 节点版本控制zxid 还被用于跟踪节点的版本变化。每次节点的数据发生变更,都会引起 zxid 的变化,从而用于节点版本的比较。

zxid 在 ZooKeeper 中是一种重要的标识,用于确保数据的一致性、节点版本控制以及 Leader 选举等关键功能。

上次更新: 10/29/2024, 10:27:50 AM