Zookeeper详解
1、集群选举机制
第一次启动
①服务器1启动,发起一次选举。服务器1投自己一票。此时服务器1票数一票,不够半数以上(3票),选举无法完成,服务器1状态保持为LOOKING。
②服务器2启动,再发起一次选举。服务器1和2分别投自己一票并交换选票信息:此时服务器1发现服务器2的myid比自己目前投票推举的(服务器1)大,更改选票为推举服务器2。此时服务器1票数0票,服务器2票数2票,没有半数以上结果,选举无法完成,服务器1,2状态保持LOOKING。
③服务器3启动,发起一次选举。此时服务器1和2都会更改选票为服务器3。此次投票结果:服务器1为0票,服务器2为0票,服务器3为3票。此时服务器3的票数已经超过半数,服务器3当选Leader。服务器1,2更改状态为FOLLOWING,服务器3更改状态为LEADING。
④服务器4启动,发起一次选举。此时服务器1,2,3已经不是LOOKING状态,不会更改选票信息。交换选票信息结果:服务器3为3票,服务器4为1票。此时服务器4服从多数,更改选票信息为服务器3,并更改状态为FOLLOWING。
⑤服务器5启动,同4一样当小弟。
非第一次启动
当ZooKeeper集群中的一台服务器出现以下两种情况之一时,就会开始进入Leader选举:
- 服务器初始化启动。
- 服务器运行期间无法和Leader保持连接。
而当一台机器进入Leader选举流程时,当前集群也可能会处于以下两种状态:
集群中本来就已经存在一个Leader。此时机器试图去选举Leader时,会被告知当前服务器的Leader信息,对于该机器来说,仅仅需要和Leader机器建立连接,并进行状态同步即可。
集群中确实不存在Leader。
假设ZooKeeper由5台服务器组成,SID分别为1、2、3、4、5,ZXID分别为8、8、8、7、7,并且此时SID为3的服务器是Leader。某一时刻, 3和5服务器出现故障,因此开始进行Leader选举。
此时SID为1、2、4的机器投票情况:
1 2 4 (EPOCH,ZXID,SID) (1,8,1) (1,8,2) (1,7,4) - ①EPOCH大的直接胜出。
- ②EPOCH相同,事务id大的胜出。
- ③事务id相同,服务器id大的胜出。
2、命令行操作
命令行语法如下:
命令基本语法 功能描述 help 显示所有操作命令 ls path 使用ls命令来查看当前znode的子节点[可监听]
-w:监听子节点变化
-s:附加次级信息create 普通创建
-s:含有序列
-e:临时(重启或者超时消失)get path 获得节点的值 [可监听]
-w:监听节点内容变化
-s:附加次级信息set 设置节点的具体值 stat 查看节点状态 delete 删除节点 deleteall 递归删除节点 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53[root@iz2zecc4mo4hoc75p0frn8z ~]# docker exec -it zookeeper_node1 /bin/bash
root@a9c8be2c968a:/apache-zookeeper-3.5.7-bin# bin/zkCli.sh # 连接本地2181端口
……………………
WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0]
root@a9c8be2c968a:/apache-zookeeper-3.5.7-bin# bin/zkCli.sh -server zookeeper_node2:2181 # -server可指定连接哪台服务器
……………………
WatchedEvent state:SyncConnected type:None path:null
[zk: zookeeper_node2:2181(CONNECTED) 0] help # 查看帮助命令
ZooKeeper -server host:port cmd args
addauth scheme auth
close
config [-c] [-w] [-s]
connect host:port
create [-s] [-e] [-c] [-t ttl] path [data] [acl]
delete [-v version] path
deleteall path
delquota [-n|-b] path
get [-s] [-w] path
getAcl [-s] path
history
listquota path
ls [-s] [-w] [-R] path
ls2 path [watch]
printwatches on|off
quit
reconfig [-s] [-v version] [[-file path] | [-members serverID=host:port1:port2;port3[,...]*]] | [-add serverId=host:port1:port2;port3[,...]]* [-remove serverId[,...]*]
redo cmdno
removewatches path [-c|-d|-a] [-l]
rmr path
set [-s] [-v version] path data
setAcl [-s] [-v version] [-R] path acl
setquota -n|-b val path
stat [-w] path
sync path
Command not found: Command not found help
[zk: zookeeper_node2:2181(CONNECTED) 1] ls / # 查看当前znode中所包含的内容
[zookeeper]
[zk: zookeeper_node2:2181(CONNECTED) 2] ls -s / # 查看当前节点详细数据
[zookeeper]cZxid = 0x0 # czxid:创建节点的事务zxid,每次修改ZooKeeper状态都会产生一个ZooKeeper事务ID。事务ID是ZooKeeper中所有修改总的次序。每次修改都有唯一的zxid,如果zxid1小于zxid2,那么zxid1在zxid2之前发生。
ctime = Thu Jan 01 00:00:00 UTC 1970 # ctime:znode被创建的毫秒数(从1970年开始)
mZxid = 0x0 # mzxid:znode最后更新的事务zxid
mtime = Thu Jan 01 00:00:00 UTC 1970 # mtime:znode最后修改的毫秒数(从1970年开始)
pZxid = 0x0 # pZxid:znode最后更新的子节点zxid
cversion = -1 # cversion:znode子节点变化号,znode子节点修改次数
dataVersion = 0 # dataversion:znode数据变化号
aclVersion = 0 # aclVersion:znode访问控制列表的变化号
ephemeralOwner = 0x0 # ephemeralOwner:如果是临时节点,这个是znode拥有者的session id。如果不是临时节点则是0。
dataLength = 0 # dataLength:znode的数据长度
numChildren = 1 # numChildren:znode子节点数量
[zk: zookeeper_node2:2181(CONNECTED) 3]
3、节点类型
- 分为两种类型:
持久(Persistent):客户端和服务器端断开连接后,创建的节点不删除。(有序或者无序)
短暂(Ephemeral):客户端和服务器端断开连接后,创建的节点自己删除。(有序或者无序)
①持久化目录节点(例如上图的znode1):
客户端与Zookeeper断开连接后,该节点依旧存在。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] create /sanguo "diaochan" # 创建不带序号的永久节点
Created /sanguo
[zk: localhost:2181(CONNECTED) 1] create /sanguo/shuguo "liubei"
Created /sanguo/shuguo
[zk: localhost:2181(CONNECTED) 2] ls /
[sanguo, zookeeper]
[zk: localhost:2181(CONNECTED) 3] ls /sanguo
[shuguo]
[zk: localhost:2181(CONNECTED) 4] get -s /sanguo # 获得节点的值
diaochan
cZxid = 0x300000008
ctime = Sun Sep 26 06:29:45 UTC 2021
mZxid = 0x300000008
mtime = Sun Sep 26 06:29:45 UTC 2021
pZxid = 0x300000009
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 8
numChildren = 1
[zk: localhost:2181(CONNECTED) 5] get -s /sanguo/shuguo
liubei
cZxid = 0x300000009
ctime = Sun Sep 26 06:30:22 UTC 2021
mZxid = 0x300000009
mtime = Sun Sep 26 06:30:22 UTC 2021
pZxid = 0x300000009
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 6
numChildren = 0
②持久化顺序编号目录节点(例如上图的znode2_001):
创建znode时设置顺序标识,znode名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护。
在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序。
客户端与Zookeeper断开连接后,该节点依旧存在,只是Zookeeper给该节点名称进行顺序编号。
1
2
3
4
5
6
7
8[zk: localhost:2181(CONNECTED) 6] create /sanguo/weiguo "caocao"
Created /sanguo/weiguo
[zk: localhost:2181(CONNECTED) 7] create -s /sanguo/weiguo/zhangliao "zhangliao" # 创建带序号的永久节点
Created /sanguo/weiguo/zhangliao0000000000
[zk: localhost:2181(CONNECTED) 8] ls /sanguo/weiguo
[zhangliao0000000000]
[zk: localhost:2181(CONNECTED) 9] create -s /sanguo/weiguo/zhangliao "zhangliao" # 可以重复创建
Created /sanguo/weiguo/zhangliao0000000001
③临时目录节点(例如上图的znode3):
客户端与Zookeeper断开连接后,该节点被删除。
1
2
3
4
5[zk: localhost:2181(CONNECTED) 0] create -e /sanguo/wuguo "zhouyu"
Created /sanguo/wuguo
[zk: localhost:2181(CONNECTED) 2] ls /sanguo
[shuguo, weiguo, wuguo]
# 退出当前客户端然后再重启客户端后所创建的临时节点消失
④临时顺序编号目录节点(例如上图的znode4_001):
客户端与Zookeeper断开连接后,该节点被删除,只是Zookeeper给该节点名称进行顺序编号。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33[zk: localhost:2181(CONNECTED) 1] create -e -s /sanguo/wuguo "zhouyu"
Created /sanguo/wuguo0000000003
[zk: localhost:2181(CONNECTED) 2] ls /sanguo
[shuguo, weiguo, wuguo, wuguo0000000003]
# 退出当前客户端然后再重启客户端后所创建的临时节点消失
[zk: localhost:2181(CONNECTED) 3] get -s /sanguo/weiguo
caocao
cZxid = 0x30000000a
ctime = Sun Sep 26 06:34:23 UTC 2021
mZxid = 0x30000000a
mtime = Sun Sep 26 06:34:23 UTC 2021
pZxid = 0x30000000e
cversion = 2
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 6
numChildren = 2
[zk: localhost:2181(CONNECTED) 4] set /sanguo/weiguo "simayi" # 修改节点数据值
[zk: localhost:2181(CONNECTED) 5] get -s /sanguo/weiguo
simayi
cZxid = 0x30000000a
ctime = Sun Sep 26 06:34:23 UTC 2021
mZxid = 0x300000016
mtime = Sun Sep 26 06:53:02 UTC 2021
pZxid = 0x30000000e
cversion = 2
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 6
numChildren = 2
4、监听器
客户端注册监听它关心的目录节点,当目录节点发生变化(数据改变、节点删除、子目 录节点增加删除)时,ZooKeeper会通知客户端。监听机制保证ZooKeeper保存的任何的数据的任何改变都能快速的响应到监听了该节点的应用程序。
原理详解:
①首先要有一个main()线程。
②在main线程中创建Zookeeper客户端,这时就会创建两个线程,一个负责网络连接通信(connet),一个负责监听(listener)。
③通过connect线程将注册的监听事件发送给Zookeeper。
④在Zookeeper的注册监听器列表中将注册的监听事件添加到列表中。
⑤Zookeeper监听到有数据或路径变化,就会将这个消息发送给listener线程。
⑥listener线程内部调用了process()方法。
监听案例:
①节点的值变化监听:
①在zookeeper_node1主机上注册监听/sanguo节点数据变化。
1
2[zk: localhost:2181(CONNECTED) 0] get -w /sanguo # 监听节点数据的变化
diaochan②在zookeeper_node2主机上修改/sanguo节点的数据。
1
[zk: localhost:2181(CONNECTED) 0] set /sanguo "xisi"
③观察zookeeper_node1主机收到数据变化的监听。
1
2WATCHER::
WatchedEvent state:SyncConnected type:NodeDataChanged path:/sanguo- 注意:在zookeeper_node2再多次修改/sanguo的值,zookeeper_node1上不会再收到监听。因为注册一次,只能监听一次。想再次监听,需要再次注册。
②节点的子节点变化监听(路径变化):
①在zookeeper_node1主机上注册监听/sanguo节点的子节点变化。
1
2[zk: localhost:2181(CONNECTED) 1] ls -w /sanguo # 监听子节点增减的变化
[shuguo, weiguo]②在zookeeper_node2主机的/sanguo节点上创建子节点。
1
2[zk: localhost:2181(CONNECTED) 2] create /sanguo/jin "simayi"
Created /sanguo/jin③观察zookeeper_node1主机收到子节点变化的监听。
1
2WATCHER::
WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/sanguo- 注意:节点的路径变化,也是注册一次,生效一次。想多次生效,就需要多次注册。
5、节点删除与查看
1 | [zk: localhost:2181(CONNECTED) 0] ls /sanguo |
6、客户端API操作
①创建一个新的maven工程并引入以下依赖:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23<dependencies>
<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/log4j/log4j -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.7</version>
</dependency>
</dependencies>②在项目的src/main/resources目录下,新建一个log4j.properties日志配置文件。
1
2
3
4
5
6
7
8log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n③测试连接zookeeper并创建节点/zhu。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31public class ZkClient {
private static Logger log = Logger.getLogger(ZkClient.class);
private static String connectString = "ip:2181,ip:2182,ip:2183";
private static int sessionTimeout = 3000;
private ZooKeeper zkClient = null;
public void init() throws IOException {
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
//客户端和服务器建立连接时也会触发此监听器
public void process(WatchedEvent watchedEvent) {
}
});
}
/**
* 创建节点
* @throws KeeperException
* @throws InterruptedException
*/
public void create() throws KeeperException, InterruptedException {
//参数1:要创建的节点的路径
//参数2:节点数据
//参数3:节点权限
//参数4:节点的类型
String nodeCreated = zkClient.create("/zhu", "hongguang".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}运行后能查看到节点信息:
1
2
3
4
5
6
7
8
9
10
11
12
13[zk: localhost:2181(CONNECTED) 0] get -s /zhu
hongguang
cZxid = 0x300000029
ctime = Sat Oct 02 07:32:40 UTC 2021
mZxid = 0x300000029
mtime = Sat Oct 02 07:32:40 UTC 2021
pZxid = 0x300000029
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 9
numChildren = 0
④测试节点监听。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36public class ZkClient {
private static Logger log = Logger.getLogger(ZkClient.class);
private static String connectString = "ip:2181,ip:2182,ip:2183";
private static int sessionTimeout = 3000;
private ZooKeeper zkClient = null;
public void init() throws IOException {
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
//客户端和服务器建立连接时也会触发此监听器
public void process(WatchedEvent watchedEvent) {
System.out.println("-------------------------------");
List<String> children = null;
try {
children = zkClient.getChildren("/", true);
for (String child : children) {
System.out.println(child);
}
System.out.println("-------------------------------");
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
public void getChildren() throws KeeperException, InterruptedException {
// 延时
Thread.sleep(Long.MAX_VALUE);
}
}运行后控制台打印如下:
1
2
3-------------------------------
zookeeper
-------------------------------创建新节点zhu。
1
2[zk: localhost:2181(CONNECTED) 0] create /zhu "hongguang"
Created /zhu控制台增加打印如下:
1
2
3
4-------------------------------
zookeeper
zhu
-------------------------------
⑤测试判断节点是否存在。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23public class ZkClient {
private static Logger log = Logger.getLogger(ZkClient.class);
private static String connectString = "ip:2181,ip:2182,ip:2183";
private static int sessionTimeout = 3000;
private ZooKeeper zkClient = null;
public void init() throws IOException {
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
//客户端和服务器建立连接时也会触发此监听器
public void process(WatchedEvent watchedEvent) {
}
});
}
public void exist() throws KeeperException, InterruptedException {
Stat stat = zkClient.exists("/zhu", false);
System.out.println(stat == null ? "not exist " : "exist");
}
}
7、写流程
写流程之写入请求直接发送给Leader节点。
写流程之写入请求发送给follower节点。
8、实际案例
8.1 服务器动态上下线监听案例
某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知 到主节点服务器的上下线。
①在集群上创建/servers节点。
1
2[zk: localhost:2181(CONNECTED) 11] create /servers "servers"
Created /servers②服务器端向Zookeeper注册代码。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37public class DistributeServer {
private String connectString = "ip:2181,ip:2182,ip:2183";
private int sessionTimeout = 2000;
private ZooKeeper zk;
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
DistributeServer server = new DistributeServer();
//获取zk连接
server.getConnect();
//注册服务器到zk集群
server.register(args[0]);
//启动业务逻辑(睡觉)
server.business();
}
private void business() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}
private void register(String hostname) throws KeeperException, InterruptedException {
//创建临时的带序号的节点
String create = zk.create("/servers/" + hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(hostname + " is online");
}
private void getConnect() throws IOException {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
public void process(WatchedEvent watchedEvent) {
}
});
}
}③编写客户端代码。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48public class DistributeClient {
private String connectString = "ip:2181,ip:2182,ip:2183";
private int sessionTimeout = 2000;
private ZooKeeper zk;
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
DistributeClient client = new DistributeClient();
//获取zk连接
client.getConnect();
//监听/servers下面子节点的增加和删除
client.getServerList();
//业务逻辑(睡觉)
client.business();
}
private void business() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}
private void getServerList() throws KeeperException, InterruptedException {
List<String> children = zk.getChildren("/servers", true);
ArrayList<String> servers = new ArrayList<>();
for (String child : children) {
byte[] data = zk.getData("/servers/" + child, false, null);
servers.add(new String(data));
}
//打印
System.out.println(servers);
}
private void getConnect() throws IOException {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
public void process(WatchedEvent watchedEvent) {
try {
getServerList();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}④启动DistributeClient客户端。
⑤在IDEA中设置服务端的传入变量值(即主机名)后启动服务端。
⑥观察到DistributeServer控制台,提示”server001 is online”;以及DistributeClient控制台,提示”[server001]”。
8.2 分布式锁案例
分布式锁定义:
- 例如”进程 1”在使用某资源的时候,会先去获得锁,”进程 1”获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,”进程 1”用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。
使用zookeeper充当分布式锁的过程如下:
①接收到请求后,在/locks节点下创建一个临时顺序节点。
②判断自己是不是当前节点下最小的节点:是,获取到锁;不是,对前一个节点进行监听。
③获取到锁,处理完业务后,delete节点释放锁,然后下面的节点将收到通知,重复第二步判断。
代码案例:
①实现分布式锁代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91public class DistributedLock {
private final String connectString = "ip:2181,ip:2182,ip:2183";
private final int sessionTimeout = 2000;
private final ZooKeeper zk;
private CountDownLatch connectLatch = new CountDownLatch(1);
private CountDownLatch waitLatch = new CountDownLatch(1);
private String waitPath;
private String currentMode;
public DistributedLock() throws IOException, InterruptedException, KeeperException {
// 获取连接
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
public void process(WatchedEvent watchedEvent) {
// 连接建立时, 打开latch, 唤醒wait在该latch上的线程
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
connectLatch.countDown();
}
// 发生了waitPath的删除事件
if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {
waitLatch.countDown();
}
}
});
// 等待zk正常连接后,往下走程序
connectLatch.await();
// 判断根节点/locks是否存在
Stat stat = zk.exists("/locks", false);
if (stat == null) {
// 创建根节点
zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
// 对zk加锁
public void zkLock() {
// 创建对应的临时带序号节点
try {
currentMode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// wait一小会, 让结果更清晰一些
Thread.sleep(10);
// 判断创建的节点是否是最小的序号节点,如果是获取到锁;如果不是,监听他序号前一个节点
List<String> children = zk.getChildren("/locks", false);
// 如果children 只有一个值,那就直接获取锁; 如果有多个节点,需要判断,谁最小
if (children.size() == 1) {
// 就一个节点,可以获取锁了
} else {
Collections.sort(children);
// 获取节点名称 seq-00000000
String thisNode = currentMode.substring("/locks/".length());
// 通过seq-00000000获取该节点在children集合的位置
int index = children.indexOf(thisNode);
// 判断
if (index == -1) {
System.out.println("数据异常");
} else if (index == 0) {
// index == 0, 说明thisNode在列表中最小, 当前client获得锁
} else {
// 需要监听前一个节点变化
waitPath = "/locks/" + children.get(index - 1);
zk.getData(waitPath, true, new Stat());
// 等待监听
waitLatch.await();
}
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 解锁
public void unZkLock() {
// 删除节点
try {
zk.delete(this.currentMode, -1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}②测试代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38public class DistributedLockTest {
public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
final DistributedLock lock1 = new DistributedLock();
final DistributedLock lock2 = new DistributedLock();
new Thread(new Runnable() {
public void run() {
try {
lock1.zkLock();
System.out.println("线程1 启动,获取到锁");
Thread.sleep(5 * 1000);
lock1.unZkLock();
System.out.println("线程1 释放锁");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
public void run() {
try {
lock2.zkLock();
System.out.println("线程2 启动,获取到锁");
Thread.sleep(5 * 1000);
lock2.unZkLock();
System.out.println("线程2 释放锁");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}运行结果如下:
1
2
3
4线程1 启动,获取到锁
线程1 释放锁
线程2 启动,获取到锁
线程2 释放锁
8.3 Curator框架实现分布式锁案例
原生的Java API开发存在的问题:
- 会话连接是异步的,需要自己去处理。比如使用CountDownLatch。
- Watch需要重复注册,不然就不能生效。
- 开发的复杂性还是比较高的。
- 不支持多节点删除和创建,需要自己去递归。
Curator是Netflix公司开源的一套zookeeper客户端框架,解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等等。
实际案例:
①添加如下依赖:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.3.0</version>
</dependency>②编写测试代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72public class CuratorLockTest {
public static void main(String[] args) {
// 创建分布式锁1
InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");
// 创建分布式锁2
InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");
new Thread(new Runnable() {
public void run() {
try {
lock1.acquire();
System.out.println("线程1 获取到锁");
lock1.acquire();
System.out.println("线程1 再次获取到锁");
Thread.sleep(5 * 1000);
lock1.release();
System.out.println("线程1 释放锁");
lock1.release();
System.out.println("线程1 再次释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
public void run() {
try {
lock2.acquire();
System.out.println("线程2 获取到锁");
lock2.acquire();
System.out.println("线程2 再次获取到锁");
Thread.sleep(5 * 1000);
lock2.release();
System.out.println("线程2 释放锁");
lock2.release();
System.out.println("线程2 再次释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
private static CuratorFramework getCuratorFramework() {
ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("ip:2181,ip:2182,ip:2183")
.connectionTimeoutMs(2000)
.sessionTimeoutMs(2000)
.retryPolicy(policy).build();
// 启动客户端
client.start();
System.out.println("zookeeper 启动成功");
return client;
}
}执行结果如下:
1
2
3
4
5
6
7
8线程1 获取到锁
线程1 再次获取到锁
线程1 释放锁
线程1 再次释放锁
线程2 获取到锁
线程2 再次获取到锁
线程2 释放锁
线程2 再次释放锁
9、总结
选举机制:半数机制,超过半数的投票通过,即通过。
- 第一次启动选举规则:投票过半数时,服务器id大的胜出。
- 第二次启动选举规则:
- ①EPOCH大的直接胜出。
- ②EPOCH相同,事务id大的胜出。
- ③事务id相同,服务器id大的胜出。
生产集群应该安装奇数台zk合适。
Zookeeper的大部分操作都是通过选举产生的。比如,标记一个写是否成功是要在超过一半节点发送写请求成功时才认为有效。同样,Zookeeper选择领导者节点也是在超过一半节点同意时才有效。最后,Zookeeper是否正常是要根据是否超过一半的节点正常才算正常。这是基于CAP的一致性原理。
zookeeper有这样一个特性:集群中只要有过半的机器是正常工作的,那么整个集群对外就是可用的。也就是说如果有2个zookeeper,那么只要有1个死了zookeeper就不能用了,因为1没有过半,所以2个zookeeper的死亡容忍度为0;同理,要是有3个zookeeper,一个死了,还剩下2个正常的,过半了,所以3个zookeeper的容忍度为1。同理多列举几个:2->0;3->1;4->1;5->2;6->2会发现一个规律,2n和2n-1的容忍度是一样的,都是n-1,所以为了更加高效,会选择安装奇数台zookeeper。