Zookeeper介绍
-
ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。
-
ZooKeeper的目标就是封装好复杂易出错的关键服务,将简单易用的接口和性能高效、功能稳定的系统提供给用户。
-
ZooKeeper包含一个简单的原语集,提供Java和C的接口。
ZooKeeper代码版本中,提供了分布式独享锁、选举、队列的接口,代码在zookeeper-3.4.3\src\recipes。其中分布锁和队列有Java和C两个版本,选举只有Java版本。
总结:Zookeeper负责服务的协调调度.当客户端发起请求时,返回正确的服务器地址
Zookeeper 下载安装
3.5.5以后版本下载带 bin 的包
-
zookeeper 依赖jdk运行
-
安装jdk 上传解压
-
或者使用命令安装
配置环境变量
JAVA_HOME=/home/app/jdk
JAVA_BIN=/home/app/jdk/bin
PATH=$JAVA_HOME/bin:$PATH
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export JAVA_HOME JAVA_BIN PATH CLASSPATH
将下载文件上传安装 或者直接weget地址
- 解压:
tar -xzvf zookeeper.tar.gz
- 创建文件夹:
cd zookeeper
mkdir data log
- 创建myid文件(集群):
vim data/myid
# 里面只需 写id即可不能重复
1
- 复制配置文件并改名:
cd conf
cp zoo_sample.cfg zoo.cfg
- 编辑配置文件:
vim zoo.cfg
# 指定data目录
dataDir=/home/app/zookeeper/data/
# 指定log目录
dataLogDir=/home/app/zookeeper/log/
# 访问端口
clientPort=2181
# 集群需要指定 server.myid=ip:集群通信端口:集群选举端口
server.1=192.168.65.200:2887:3887
server.2=192.168.65.201:2887:3887
server.3=192.168.65.202:2887:3887
-
克隆虚拟机 修改ip
-
启动zookeeper 查看信息
cd zookeeper/bin/
sh zkServer.sh start #启动
sh zkServer.sh stop #停止
sh zkServer.sh status #查看信息
Zookeeper集群中leader负责监控集群状态,follower主要负责客户端链接获取服务列表信息.同时参与投票.
Zookeeper 数据模型
在ZooKeeper中,数据信息被保存在⼀个个数据节点上,这些节点被称为znode。ZNode 是 Zookeeper 中最⼩数据单位,在 ZNode 下⾯⼜可以再挂 ZNode,这样⼀层层下去就形成了⼀个层次化 命名空间 ZNode 树,我们称为 ZNode Tree,它采⽤了类似⽂件系统的层级树状结构进⾏管理。⻅下图 示例:
在 Zookeeper 中,每⼀个数据节点都是⼀个 ZNode,上图根⽬录下有两个节点,分别是:app1 和 app2,其中 app1 下⾯⼜有三个⼦节点,所有ZNode按层次化进⾏组织,形成这么⼀颗树,ZNode的节 点路径标识⽅式和Unix⽂件系统路径⾮常相似,都是由⼀系列使⽤斜杠(/)进⾏分割的路径表示,开 发⼈员可以向这个节点写⼊数据,也可以在这个节点下⾯创建⼦节点。
ZNode 的类型
Zookeeper 节点类型可以分为三⼤类: 持久性节点(Persistent)
临时性节点(Ephemeral)
顺序性节点(Sequential)
在开发中在创建节点的时候通过组合可以⽣成以下四种节点类型:持久节点
、持久顺序节点
、临时节 点
、临时顺序节点
。不同类型的节点则会有不同的⽣命周期
持久节点:是Zookeeper中最常⻅的⼀种节点类型,所谓持久节点,就是指节点被创建后会⼀直存在服 务器,直到删除操作主动清除
持久顺序节点:就是有顺序的持久节点,节点特性和持久节点是⼀样的,只是额外特性表现在顺序上。 顺序特性实质是在创建节点的时候,会在节点名后⾯加上⼀个数字后缀,来表示其顺序。
临时节点:就是会被⾃动清理掉的节点,它的⽣命周期和客户端会话绑在⼀起,客户端会话结束,节点 会被删除掉。与持久性节点不同的是,临时节点不能创建⼦节点。
临时顺序节点:就是有顺序的临时节点,和持久顺序节点相同,在其创建的时候会在名字后⾯加上数字 后缀。
ACL--保障数据的安全
Zookeeper作为⼀个分布式协调框架,其内部存储了分布式系统运⾏时状态的元数据,这些元数据会直 接影响基于Zookeeper进⾏构造的分布式系统的运⾏状态,因此,如何保障系统中数据的安全,从⽽避 免因误操作所带来的数据随意变更⽽导致的数据库异常⼗分重要,在Zookeeper中,提供了⼀套完善的 ACL(Access Control List)权限控制机制来保障数据的安全。
可以从三个⽅⾯来理解ACL机制:权限模式(Scheme)、授权对象(ID)、权限(Permission),通常使⽤"scheme: id :permission
"来标识⼀个有效的ACL信息。
权限模式:Scheme
权限模式⽤来确定权限验证过程中使⽤的检验策略,有如下四种模式:
- IP
IP模式就是通过IP地址粒度来进⾏权限控制,如"ip:192.168.0.110"表示权限控制针对该IP地址, 同时IP模式可以⽀持按照⽹段⽅式进⾏配置,如"ip:192.168.0.1/24"表示针对192.168.0.*这个⽹段 进⾏权限控制。
- Digest
Digest是最常⽤的权限控制模式,要更符合我们对权限控制的认识,其使 ⽤"username:password"形式的权限标识来进⾏权限配置,便于区分不同应⽤来进⾏权限控制。 当我们通过“username:password”形式配置了权限标识后,Zookeeper会先后对其进⾏SHA-1加密 和BASE64编码。
- World
World是⼀种最开放的权限控制模式,这种权限控制⽅式⼏乎没有任何作⽤,数据节点的访问权限 对所有⽤户开放,即所有⽤户都可以在不进⾏任何权限校验的情况下操作ZooKeeper上的数据。 另外,World模式也可以看作是⼀种特殊的Digest模式,它只有⼀个权限标识,即“world: anyone”。
- Super
Super模式,顾名思义就是超级⽤户的意思,也是⼀种特殊的Digest模式。在Super模式下,超级 ⽤户可以对任意ZooKeeper上的数据节点进⾏任何操作。
授权对象 ID
授权对象指的是权限赋予的⽤户或⼀个指定实体,例如 IP 地址或是机器等。在不同的权限模式下,授 权对象是不同的,表中列出了各个权限模式和授权对象之间的对应关系。
权限模式 | 授权对象 |
---|---|
IP | 通常是⼀个IP地址或IP段:例如:192.168.10.110 或192.168.10.1/24 |
Digest | ⾃定义,通常是username:BASE64(SHA-1(username:password))例如: zm:sdfndsllndlksfn7c= |
Word | World模式也可以看作是⼀种特殊的Digest模式,它只有⼀个权限标识,即“world: anyone” |
Super | 超级用户 |
权限
权限就是指那些通过权限检查后可以被允许执⾏的操作。在ZooKeeper中,所有对数据的操作权限分为 以下五⼤类:
· CREATE(C):数据节点的创建权限,允许授权对象在该数据节点下创建⼦节点。
· DELETE(D): ⼦节点的删除权限,允许授权对象删除该数据节点的⼦节点。
· READ(R):数据节点的读取权限,允 许授权对象访问该数据节点并读取其数据内容或⼦节点列表等。
· WRITE(W):数据节点的更新权 限,允许授权对象对该数据节点进⾏更新操作。
· ADMIN(A):数据节点的管理权限,允许授权对象 对该数据节点进⾏ ACL 相关的设置操作。
使用zkCli连接zk
cd zookeeperPath/bin/ 进入zookeeper的bin目录
./zkCli.sh 或者 sh zkCli.sh 连接本地服务
./zkCli.sh -server ip:port 连接指定的服务器
Zookeeper 命令行使用
连接成功后可以使用 help
查看命令 quit
退出客户端
新增数据
# 格式
create [-s] [-e] [-c] [-t ttl] path [data] [acl]
create -s -e path data acl
# -s 指定为顺序节点 -e指定为临时节点 path 为路径 data为数据 acl指定权限控制
# 例:
# 创建持久节点
create /zk-permanent
# 创建顺序持久节点 数据123
create -s /zk-order 123
# 创建临时节点
create -e /zk-temp
# 创建临时顺序节点
create -s -e /zk-order-temp
查找数据
# 格式
ls [-s] [-w] [-R] path
[-s] : 查看某一节点下的子节点加当前节点的元信息,相当于之前版本的ls2命令
[-w] : 查看节点并为节点添加一个监听,当节点被修改时,该客户端会收到一个回调
之前版本是在path 后面加一个watch实现:ls path watch
[-R] : 返回当前节点路径,当前节点的子节点,当前节点的子节点的子节点(递归)
get [-s] [-w] path
[-s] : 查看节点数据加元信息
[-w] : 查看节点并为节点添加一个监听,当节点被修改时,该客户端会收到一个回调
之前版本是在path 后面加一个watch实现:get path watch
# 区别
ls 命令获取`节点`下的`直系节点信息` get获取`节点`下的`数据信息 `
-
znode节点信息
每一个对znode树的更新操作,都会被赋予一个全局唯一的ID,我们称之为
zxid
(ZooKeeper Transaction ID)。更新操作的ID按照发生的时间顺序升序排序。例如,z1大于z2,那么z1的操作就早于z2操作。czxid,创建(create)该 znode 的 zxid
mzxid,最后一次修改(modify)该 znode 的 zxid
pzxid,最后一次修改该 znode 子节点的 zxid
ctime,创建该 znode 的时间
mtime,最后一次修改该 znode 的时间
dataVersion,该节点内容的版本,每次修改内容,版本都会增加
cversion,该节点子节点的版本
aclVersion,该节点的 ACL 版本
ephemeralOwner,如果该节点是临时节点(ephemeral node),会列出该节点所在客户端的 session id;如果不是临时节点,该值为 0
dataLength,该节点存储的数据长度
numChildren,该节点子节点的个数
修改数据
# 格式
set [-s] [-v version] path data
[-s] : 返回修改后节点的元信息
[-v version] : 指定数据的版本,版本不符合时修改失败,类似关系型数据库的乐观锁
path : 修改节点路径
data : 修改的数据
删除数据
# 格式
delete [-v version] path
[-v version] : 指定数据的版本,版本不符合时删除失败,类似关系型数据库的乐观锁
path : 删除的节点路径
# 说明: 如果节点下有子节点无法删除 需要先删除子节点
# 直接删除包括子节点
deleteall path [-b batch size]
path : 删除的节点路径
[-b batch size] :
Zookeeper api的使用
导入maven依赖
<!-- 版本与zookeeper一致 v3.6.3-->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>{zk-version}</version>
</dependency>
建立连接
package top.mengshuo;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
/**
* @author mengshuo
* @since 2021-10-11
*/
public class CreateSession implements Watcher {
/**
* 初始化计数器 设置等待1个线程
*/
private static final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(1);
public static void main(String[] args) throws IOException, InterruptedException {
/*
* connectString: 连接信息 ip:port
* sessionTimeout: 连接超时时间 单位毫秒
* watcher: 监听器 当触发事件后ZK会通过通过watcher通知到客户端
*/
ZooKeeper zooKeeper = new ZooKeeper("192.168.59.101:2181", 1000, new CreateSession());
System.out.println(zooKeeper.getState());
// 阻塞main线程(方法)执行 当计数器等于0后会继续执行
COUNT_DOWN_LATCH.await();
System.out.println(zooKeeper.getState());
}
/**
* 监听回调方法
* 当前类实现了Watcher接⼝,重写了process⽅法,该⽅法负责处理来⾃Zookeeper服务端的 watcher通知,
* 在收到服务端发送过来的SyncConnected事件之后,解除主程序在CountDownLatch上 的等待阻塞,⾄此,会话创建完毕
*
* @param event 回调数据
*/
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
// 计数器减一 main方法执行
COUNT_DOWN_LATCH.countDown();
}
}
}
添加数据
package top.mengshuo;
import org.apache.zookeeper.*;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
/**
* @author mengshuo
* @since 2021-10-12
*/
public class CreateNode implements Watcher {
/**
* 初始化计数器 设置等待1个线程
*/
private static final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(1);
private static ZooKeeper zooKeeper;
public static void main(String[] args) throws Exception {
/*
* connectString: 连接信息 ip:port
* sessionTimeout: 连接超时时间 单位毫秒
* watcher: 监听器 当触发事件后ZK会通过通过watcher通知到客户端
*/
zooKeeper = new ZooKeeper("192.168.59.101:2181", 1000, new CreateNode());
System.out.println(zooKeeper.getState());
// 阻塞main线程(方法)执行 当计数器等于0后会继续执行
COUNT_DOWN_LATCH.await();
System.out.println(zooKeeper.getState());
CreateNode.create();
}
/**
* 监听回调方法
* 当前类实现了Watcher接⼝,重写了process⽅法,该⽅法负责处理来⾃Zookeeper服务端的 watcher通知,
* 在收到服务端发送过来的SyncConnected事件之后,解除主程序在CountDownLatch上 的等待阻塞,⾄此,会话创建完毕
*
* @param event 回调数据
*/
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
// 计数器减一 main方法执行
COUNT_DOWN_LATCH.countDown();
}
}
/**
* 创建
*
* @throws InterruptedException exception
* @throws KeeperException exception
*/
public static void create() throws InterruptedException, KeeperException {
/*
path: 节点创建路径
data: 节点创建的数据 byte类型
acl: 节点创建的权限信息
ZooDefs.Ids.ANYONE_ID_UNSAFE 表示任何⼈
ZooDefs.Ids.AUTH_IDS 此ID仅可⽤于设置ACL。它将被客户机验证的ID替换
ZooDefs.Ids.OPEN_ACL_UNSAFE 这是⼀个完全开放的ACL(常⽤)--> word:anyone
ZooDefs.Ids.CREATOR_ALL_ACL 此ACL授予创建者身份验证ID的所有权限
createMode: 节点创建的类型
PERSISTENT 持久节点
PERSISTENT_SEQUENTIAL 持久顺序节点
EPHEMERAL 临时节点
EPHEMERAL_SEQUENTIAL 临时顺序节点
*/
String persistentNode = zooKeeper.create("/zk-api-test-persistent", "api创建的持久节点".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
String persistentSequentialNode = zooKeeper.create("/zk-api-test-persistent-sequential", "api创建的持久顺序节点".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
String ephemeralNode = zooKeeper.create("/zk-api-test-ephemeral", "api创建的临时节点".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
System.out.println("创建的持久节点: " + persistentNode);
System.out.println("创建的持久顺序节点: " + persistentSequentialNode);
System.out.println("创建的临时节点: " + ephemeralNode);
}
}
查询数据
package top.mengshuo;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* @author mengshuo
* @since 2021-10-12
*/
public class GetNodeData implements Watcher {
/**
* 初始化计数器 设置等待1个线程
*/
private static final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(1);
private static ZooKeeper zooKeeper;
public static void main(String[] args) throws Exception {
/*
* connectString: 连接信息 ip:port
* sessionTimeout: 连接超时时间 单位毫秒
* watcher: 监听器 当触发事件后ZK会通过通过watcher通知到客户端
*/
zooKeeper = new ZooKeeper("192.168.59.101:2181", 1000, new GetNodeData());
System.out.println("连接状态: "+zooKeeper.getState());
// 阻塞main线程(方法)执行 当计数器等于0后会继续执行
COUNT_DOWN_LATCH.await();
}
/**
* 监听回调方法
* 当前类实现了Watcher接⼝,重写了process⽅法,该⽅法负责处理来⾃Zookeeper服务端的 watcher通知,
* 在收到服务端发送过来的SyncConnected事件之后,解除主程序在CountDownLatch上 的等待阻塞,⾄此,会话创建完毕
*
* @param event 回调数据
*/
@Override
public void process(WatchedEvent event) {
/*
当子节点数据发生变化时 会触发nodeChildrenChanged事件
并且事件通知时一次性的 并不是每次变更都会通知
*/
if (event.getType() == Event.EventType.NodeChildrenChanged) {
System.out.println("子节点数据变更...");
// 重新获取子节点信息并注册通知
getChildren();
}
//当连接创建了,服务端发送给客户端SyncConnected事件
if (event.getState() == Event.KeeperState.SyncConnected) {
System.out.println("连接状态: "+zooKeeper.getState());
// 计数器减一 main方法执行
// 不再减一持续阻塞等待服务端的再次通知
// COUNT_DOWN_LATCH.countDown();
// 获取数据
getData();
// 获取子节点数据
getChildren();
}
}
/**
* 获取节点数据
*/
public static void getData() {
/*
path 获取数据的路径
watch 是否注册监听器
stat 节点状态信息 null表示获取最新数据
*/
byte[] data = new byte[0];
try {
data = zooKeeper.getData("/zk-api-test-persistent", false, null);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
System.out.println("节点数据: "+new String(data));
}
/**
* 获取子节点信息
*/
public static void getChildren() {
List<String> childrenData = null;
try {
childrenData = zooKeeper.getChildren("/zk-api-test-persistent", true);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
System.out.println("子节点信息: "+childrenData);
}
}
- 查询结果信息 & 监听
修改数据
package top.mengshuo;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
/**
* @author mengshuo
* @since 2021-10-12
*/
public class UpdateNodeData implements Watcher {
/**
* 初始化计数器 设置等待1个线程
*/
private static final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(1);
private static ZooKeeper zooKeeper;
public static void main(String[] args) throws Exception {
/*
* connectString: 连接信息 ip:port
* sessionTimeout: 连接超时时间 单位毫秒
* watcher: 监听器 当触发事件后ZK会通过通过watcher通知到客户端
*/
zooKeeper = new ZooKeeper("192.168.59.101:2181", 1000, new UpdateNodeData());
System.out.println(zooKeeper.getState());
// 阻塞main线程(方法)执行 当计数器等于0后会继续执行
COUNT_DOWN_LATCH.await();
System.out.println(zooKeeper.getState());
UpdateNodeData.update();
}
/**
* 监听回调方法
* 当前类实现了Watcher接⼝,重写了process⽅法,该⽅法负责处理来⾃Zookeeper服务端的 watcher通知,
* 在收到服务端发送过来的SyncConnected事件之后,解除主程序在CountDownLatch上 的等待阻塞,⾄此,会话创建完毕
*
* @param event 回调数据
*/
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
// 计数器减一 main方法执行
COUNT_DOWN_LATCH.countDown();
}
}
/**
* 更新
*
* @throws InterruptedException exception
* @throws KeeperException exception
*/
public static void update() throws InterruptedException, KeeperException {
byte[] data = zooKeeper.getData("/zk-api-test-persistent", false, null);
System.out.println("修改之前的数据: " + new String(data));
/*
path 所修改的数据节点路径
data 新数据内容
version 所要修改的数据版本 -1表示修改最新版本
返回值 stat 为节点信息
*/
Stat stat = zooKeeper.setData("/zk-api-test-persistent",
("修改时间: "+LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))).getBytes(),
-1);
byte[] newData = zooKeeper.getData("/zk-api-test-persistent", false, null);
System.out.println("修改之后的数据: " + new String(newData));
}
}
- 结果
删除节点数据
package top.mengshuo;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.util.concurrent.CountDownLatch;
/**
* @author mengshuo
* @since 2021-10-12
*/
public class DeleteNode implements Watcher {
/**
* 初始化计数器 设置等待1个线程
*/
private static final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(1);
private static ZooKeeper zooKeeper;
public static void main(String[] args) throws Exception {
/*
* connectString: 连接信息 ip:port
* sessionTimeout: 连接超时时间 单位毫秒
* watcher: 监听器 当触发事件后ZK会通过通过watcher通知到客户端
*/
zooKeeper = new ZooKeeper("192.168.59.101:2181", 1000, new DeleteNode());
System.out.println(zooKeeper.getState());
// 阻塞main线程(方法)执行 当计数器等于0后会继续执行
COUNT_DOWN_LATCH.await();
System.out.println(zooKeeper.getState());
DeleteNode.delete();
}
/**
* 监听回调方法
* 当前类实现了Watcher接⼝,重写了process⽅法,该⽅法负责处理来⾃Zookeeper服务端的 watcher通知,
* 在收到服务端发送过来的SyncConnected事件之后,解除主程序在CountDownLatch上 的等待阻塞,⾄此,会话创建完毕
*
* @param event 回调数据
*/
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
// 计数器减一 main方法执行
COUNT_DOWN_LATCH.countDown();
}
}
/**
* 删除
*
* @throws InterruptedException exception
* @throws KeeperException exception
*/
public static void delete() throws InterruptedException, KeeperException {
// 判断节点是否存在
Stat stat = zooKeeper.exists("/zk-api-test-persistent/children-03", false);
boolean exists = stat != null;
if (exists) {
System.out.println("节点存在 进行删除...");
// 删除节点数据
zooKeeper.delete("/zk-api-test-persistent/children-03", -1);
System.out.println("删除后的节点状态: " + zooKeeper.exists("/zk-api-test-persistent/child02", false).toString());
} else {
System.out.println("节点不存在");
}
}
}
- 执行结果
- 删除之前节点信息
-
删除之后节点信息
ps: children-02 之前删除过了
ZK客户端-Curator的使用
Curator介绍
curator是Netflix公司开源的⼀套Zookeeper客户端框架,和ZKClient⼀样,Curator解决了很多 Zookeeper客户端⾮常底层的细节开发⼯作,包括连接重连,反复注册Watcher和 NodeExistsException异常等,是最流⾏的Zookeeper客户端之⼀。从编码⻛格上来讲,它提供了基于 Fluent的编程⻛格⽀持
Curator使用
导入pom依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.2.0</version>
</dependency>
创建会话
- 创建会话的两个方法源码
/**
* 创建连接 指定默认的会话&连接超时时间
* 此方法调用下面的四个参数的方法 同时指定了默认参数
*
* @param connectString 连接信息 格式: ip:port,ip:port...
* @param retryPolicy 重试策略
* @return client
*/
public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy)
{
return newClient(connectString, DEFAULT_SESSION_TIMEOUT_MS, DEFAULT_CONNECTION_TIMEOUT_MS, retryPolicy);
}
/**
* 创建连接 使用了 建造者(Builder)模式
* 此方法调用{@link CuratorFrameworkFactory#builder()}方法返回内部Builder类对象
* 同时设置Builder对象中的属性最后调用{@link CuratorFrameworkFactory.Builder#build()} 方法
* 由build方法构建一个 CuratorFramework 的子类 CuratorFrameworkImpl
*
* @param connectString 连接信息 格式: ip:port,ip:port...
* @param sessionTimeoutMs 会话超时时间
* @param connectionTimeoutMs 连接超时时间
* @param retryPolicy 重试策略
* @return client
*/
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
{
return builder().
connectString(connectString).
sessionTimeoutMs(sessionTimeoutMs).
connectionTimeoutMs(connectionTimeoutMs).
retryPolicy(retryPolicy).
build();
}
其中参数RetryPolicy提供重试策略的接⼝,可以让⽤户实现⾃定义的重试策略 默认提供了以下实现,分别为:
- ExponentialBackoffRetry(基于backoff的重连策略)构造器含有三个参数 :
-
ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
-
baseSleepTimeMs:初始的sleep时间,⽤于计算之后的每次重试的sleep时间, 计算公式:当前sleep时间=baseSleepTimeMs*Math.max(1, -random.nextInt(1<<(retryCount+1)))
-
maxRetries:最⼤重试次数
-
maxSleepMs:最⼤sleep时间,如果上述的当前sleep计算出来⽐这个⼤,那么sleep⽤ 这个时间,默认的最⼤时间是Integer.MAX_VALUE毫秒
-
-
RetryNTimes(重连N次策略)
-
RetryForever(永远重试策略)
由以上可得知我们可以直接使用Builder模式进行构建CuratorFramework对象
- 创建连接示例
package top.mengshuo.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
/**
* @author mengshuo
* @since 2021-10-14
*/
public class CreateSession {
public static void main(String[] args) {
/*
baseSleepTimeMs 初始的sleep时间,⽤于计算之后的每次重试的sleep时间
maxRetries 最大重试次数
*/
RetryPolicy retry = new ExponentialBackoffRetry(1000, 5);
/*
建立连接 两个参数
connectString 连接信息 格式: ip:port,ip:port...
retryPolicy 重试策略
*/
CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.59.101:2181", retry);
client.start();
/*
使用 fluent 方式
*/
CuratorFramework curator = CuratorFrameworkFactory.builder()
.connectString("192.168.59.101:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(3000)
.retryPolicy(retry)
.namespace("curator") // 命名空间 所有的操作都会在`/curator`节点下进行
.build();
curator.start();
}
}
创建节点
- api说明
- 创建一个空节点
client.create().forPath(path);
- 创建一个带数据的节点
client.create().forPath(path,data);
- 递归创建父节点并指定节点类型
`.creatingParentsIfNeeded()可以自动的创建不存在的节点`
`.withMode()指定创建的节点类型`
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
- api创建测试
package top.mengshuo.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import java.nio.charset.StandardCharsets;
/**
* @author mengshuo
* @since 2021-10-14
*/
public class CreateNode {
public static void main(String[] args) throws Exception {
/*
baseSleepTimeMs 初始的sleep时间,⽤于计算之后的每次重试的sleep时间
maxRetries 最大重试次数
*/
RetryPolicy retry = new ExponentialBackoffRetry(1000, 5);
/*
使用 fluent 方式
*/
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.59.101:2181")
.sessionTimeoutMs(30000)
.connectionTimeoutMs(10000)
.retryPolicy(retry)
.namespace("curator")
.build();
client.start();
System.out.println("连接已创建...");
String path = client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/create/test", "创建测试".getBytes(StandardCharsets.UTF_8));
System.out.println("创建完成: nameSpace = " + client.getNamespace() + " 路径 = " + path);
}
}
结果:
[zk: localhost:2181(CONNECTED) 81] ls /
[zookeeper]
[zk: localhost:2181(CONNECTED) 82] get /curator/create/test
创建测试
删除节点
- api说明
- 删除指定节点
client.delete().forPath(path)
- 指定版本
client.delete().withVersion(version).forPath(path)
- 强制保证删除
client.delete().guaranteed().forPath()
- `递归删除 ( 删除子节点 )并指定版本
client.delete().deletingChildrenIfNeeded().withVersion(version).forPath(path)
- api删除测试
package top.mengshuo.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import java.nio.charset.StandardCharsets;
/**
* @author mengshuo
* @since 2021-10-14
*/
public class DeleteNode {
public static void main(String[] args) throws Exception {
/*
baseSleepTimeMs 初始的sleep时间,⽤于计算之后的每次重试的sleep时间
maxRetries 最大重试次数
*/
RetryPolicy retry = new ExponentialBackoffRetry(1000, 5);
/*
使用 fluent 方式
*/
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.59.101:2181")
.sessionTimeoutMs(30000)
.connectionTimeoutMs(10000)
.retryPolicy(retry)
.namespace("curator")
.build();
client.start();
System.out.println("连接已创建...");
// 递归删除
client.delete().deletingChildrenIfNeeded().forPath("/create");
System.out.println("删除完成...");
}
}
结果:
[zk: localhost:2181(CONNECTED) 28] ls /
[curator, zookeeper]
[zk: localhost:2181(CONNECTED) 29] ls /curator
[create]
[zk: localhost:2181(CONNECTED) 30] ls /curator
[]
[zk: localhost:2181(CONNECTED) 31] ls /
[curator, zookeeper]
修改节点
-
api说明
- 查询节点数据
client.getData().forPath(path)
1. 查询节点信息
data = client.getData().storingStatIn(stat).forPath(path);
- api测试
package top.mengshuo.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import java.nio.charset.StandardCharsets;
/**
* @author mengshuo
* @since 2021-10-14
*/
public class GetNodeData {
public static void main(String[] args) throws Exception {
/*
baseSleepTimeMs 初始的sleep时间,⽤于计算之后的每次重试的sleep时间
maxRetries 最大重试次数
*/
RetryPolicy retry = new ExponentialBackoffRetry(1000, 5);
/*
使用 fluent 方式
*/
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.59.101:2181")
.sessionTimeoutMs(30000)
.connectionTimeoutMs(10000)
.retryPolicy(retry)
.namespace("curator")
.build();
client.start();
System.out.println("连接已创建...");
byte[] data = client.getData().forPath("/create/test");
System.out.println("获取成功完成: " + new String(data));
Stat stat = new Stat();
data = client.getData().storingStatIn(stat).forPath("/create/test");
System.out.println("获取成功完成: " + new String(data));
System.out.println("节点状态信息: "+stat);
}
}
结果:
连接已创建...
获取成功完成: 创建测试
获取成功完成: 创建测试
节点状态信息: 181,181,1634462939342,1634462939342,0,0,0,0,12,0,181
更新数据
-
api说明
- 更新数据
client.setData().forPath(path,data)
1. 更新指定版本的数据
client.setData().withVersion(version).forPath(path,data)
- api测试
package top.mengshuo.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
import java.nio.charset.StandardCharsets;
/**
* @author mengshuo
* @since 2021-10-14
*/
public class UpdateNodeData {
public static void main(String[] args) throws Exception {
/*
baseSleepTimeMs 初始的sleep时间,⽤于计算之后的每次重试的sleep时间
maxRetries 最大重试次数
*/
RetryPolicy retry = new ExponentialBackoffRetry(1000, 5);
/*
使用 fluent 方式
*/
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.59.101:2181")
.sessionTimeoutMs(30000)
.connectionTimeoutMs(10000)
.retryPolicy(retry)
.namespace("curator")
.build();
client.start();
System.out.println("连接已创建...");
byte[] data = client.getData().forPath("/create/test");
System.out.println("旧数据: "+new String(data));
client.setData().forPath("/create/test","更新数据".getBytes(StandardCharsets.UTF_8));
System.out.println("更新完成");
data = client.getData().forPath("/create/test");
System.out.println("新数据: "+new String(data));
}
}
结果:
连接已创建...
旧数据: 创建测试
更新完成
新数据: 更新数据