-->
侧边栏壁纸
博主头像
断钩鱼 博主等级

行动起来,活在当下

  • 累计撰写 28 篇文章
  • 累计创建 34 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

Zookeeper 安装与基础

halt
2020-10-19 / 0 评论 / 8 点赞 / 2297 阅读 / 0 字

Zookeeper介绍

  • ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。

  • ZooKeeper的目标就是封装好复杂易出错的关键服务,将简单易用的接口和性能高效、功能稳定的系统提供给用户。

  • ZooKeeper包含一个简单的原语集,提供Java和C的接口。

ZooKeeper代码版本中,提供了分布式独享锁、选举、队列的接口,代码在zookeeper-3.4.3\src\recipes。其中分布锁和队列有Java和C两个版本,选举只有Java版本。

总结:Zookeeper负责服务的协调调度.当客户端发起请求时,返回正确的服务器地址

Zookeeper 下载安装

3.5.5以后版本下载带 bin 的包

  • zookeeper 依赖jdk运行

  • 安装jdk 上传解压

  • 或者使用命令安装

    配置环境变量

JAVA_HOME=/home/app/jdk
JAVA_BIN=/home/app/jdk/bin
PATH=$JAVA_HOME/bin:$PATH
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export JAVA_HOME JAVA_BIN PATH CLASSPATH

将下载文件上传安装 或者直接weget地址

  • 解压:
tar -xzvf zookeeper.tar.gz
  • 创建文件夹:
cd zookeeper
mkdir data log
  • 创建myid文件(集群):
vim data/myid

# 里面只需 写id即可不能重复
1
  • 复制配置文件并改名:
cd conf 
cp zoo_sample.cfg zoo.cfg
  • 编辑配置文件:
vim zoo.cfg

# 指定data目录
dataDir=/home/app/zookeeper/data/
# 指定log目录
dataLogDir=/home/app/zookeeper/log/
# 访问端口
clientPort=2181
# 集群需要指定 server.myid=ip:集群通信端口:集群选举端口
server.1=192.168.65.200:2887:3887
server.2=192.168.65.201:2887:3887
server.3=192.168.65.202:2887:3887

  • 克隆虚拟机 修改ip

  • 启动zookeeper 查看信息

cd zookeeper/bin/
sh zkServer.sh start #启动
sh zkServer.sh stop #停止
sh zkServer.sh status #查看信息

Zookeeper集群中leader负责监控集群状态,follower主要负责客户端链接获取服务列表信息.同时参与投票.

leader

follower

Zookeeper 数据模型

在ZooKeeper中,数据信息被保存在⼀个个数据节点上,这些节点被称为znode。ZNode 是 Zookeeper 中最⼩数据单位,在 ZNode 下⾯⼜可以再挂 ZNode,这样⼀层层下去就形成了⼀个层次化 命名空间 ZNode 树,我们称为 ZNode Tree,它采⽤了类似⽂件系统的层级树状结构进⾏管理。⻅下图 示例:

在 Zookeeper 中,每⼀个数据节点都是⼀个 ZNode,上图根⽬录下有两个节点,分别是:app1 和 app2,其中 app1 下⾯⼜有三个⼦节点,所有ZNode按层次化进⾏组织,形成这么⼀颗树,ZNode的节 点路径标识⽅式和Unix⽂件系统路径⾮常相似,都是由⼀系列使⽤斜杠(/)进⾏分割的路径表示,开 发⼈员可以向这个节点写⼊数据,也可以在这个节点下⾯创建⼦节点。

ZNode 的类型

Zookeeper 节点类型可以分为三⼤类: 持久性节点(Persistent) 临时性节点(Ephemeral) 顺序性节点(Sequential)
在开发中在创建节点的时候通过组合可以⽣成以下四种节点类型:持久节点持久顺序节点临时节 点临时顺序节点。不同类型的节点则会有不同的⽣命周期
持久节点:是Zookeeper中最常⻅的⼀种节点类型,所谓持久节点,就是指节点被创建后会⼀直存在服 务器,直到删除操作主动清除
持久顺序节点:就是有顺序的持久节点,节点特性和持久节点是⼀样的,只是额外特性表现在顺序上。 顺序特性实质是在创建节点的时候,会在节点名后⾯加上⼀个数字后缀,来表示其顺序。
临时节点:就是会被⾃动清理掉的节点,它的⽣命周期和客户端会话绑在⼀起,客户端会话结束,节点 会被删除掉。与持久性节点不同的是,临时节点不能创建⼦节点。
临时顺序节点:就是有顺序的临时节点,和持久顺序节点相同,在其创建的时候会在名字后⾯加上数字 后缀。

ACL--保障数据的安全

Zookeeper作为⼀个分布式协调框架,其内部存储了分布式系统运⾏时状态的元数据,这些元数据会直 接影响基于Zookeeper进⾏构造的分布式系统的运⾏状态,因此,如何保障系统中数据的安全,从⽽避 免因误操作所带来的数据随意变更⽽导致的数据库异常⼗分重要,在Zookeeper中,提供了⼀套完善的 ACL(Access Control List)权限控制机制来保障数据的安全。

可以从三个⽅⾯来理解ACL机制:权限模式(Scheme)授权对象(ID)权限(Permission),通常使⽤"scheme: id :permission"来标识⼀个有效的ACL信息。

权限模式:Scheme

权限模式⽤来确定权限验证过程中使⽤的检验策略,有如下四种模式:

  1. IP

IP模式就是通过IP地址粒度来进⾏权限控制,如"ip:192.168.0.110"表示权限控制针对该IP地址, 同时IP模式可以⽀持按照⽹段⽅式进⾏配置,如"ip:192.168.0.1/24"表示针对192.168.0.*这个⽹段 进⾏权限控制。

  1. Digest

Digest是最常⽤的权限控制模式,要更符合我们对权限控制的认识,其使 ⽤"username:password"形式的权限标识来进⾏权限配置,便于区分不同应⽤来进⾏权限控制。 当我们通过“username:password”形式配置了权限标识后,Zookeeper会先后对其进⾏SHA-1加密 和BASE64编码。

  1. World

World是⼀种最开放的权限控制模式,这种权限控制⽅式⼏乎没有任何作⽤,数据节点的访问权限 对所有⽤户开放,即所有⽤户都可以在不进⾏任何权限校验的情况下操作ZooKeeper上的数据。 另外,World模式也可以看作是⼀种特殊的Digest模式,它只有⼀个权限标识,即“world: anyone”。

  1. Super

Super模式,顾名思义就是超级⽤户的意思,也是⼀种特殊的Digest模式。在Super模式下,超级 ⽤户可以对任意ZooKeeper上的数据节点进⾏任何操作。

授权对象 ID

授权对象指的是权限赋予的⽤户或⼀个指定实体,例如 IP 地址或是机器等。在不同的权限模式下,授 权对象是不同的,表中列出了各个权限模式和授权对象之间的对应关系。

权限模式授权对象
IP通常是⼀个IP地址或IP段:例如:192.168.10.110 或192.168.10.1/24
Digest⾃定义,通常是username:BASE64(SHA-1(username:password))例如: zm:sdfndsllndlksfn7c=
WordWorld模式也可以看作是⼀种特殊的Digest模式,它只有⼀个权限标识,即“world: anyone”
Super超级用户

权限

权限就是指那些通过权限检查后可以被允许执⾏的操作。在ZooKeeper中,所有对数据的操作权限分为 以下五⼤类:

· CREATE(C):数据节点的创建权限,允许授权对象在该数据节点下创建⼦节点。

· DELETE(D): ⼦节点的删除权限,允许授权对象删除该数据节点的⼦节点。

· READ(R):数据节点的读取权限,允 许授权对象访问该数据节点并读取其数据内容或⼦节点列表等。

· WRITE(W):数据节点的更新权 限,允许授权对象对该数据节点进⾏更新操作。

· ADMIN(A):数据节点的管理权限,允许授权对象 对该数据节点进⾏ ACL 相关的设置操作。

使用zkCli连接zk

cd zookeeperPath/bin/ 进入zookeeper的bin目录
./zkCli.sh 或者 sh zkCli.sh 连接本地服务
./zkCli.sh -server ip:port 连接指定的服务器

Zookeeper 命令行使用

连接成功后可以使用 help查看命令 quit退出客户端

新增数据

# 格式
create [-s] [-e] [-c] [-t ttl] path [data] [acl]
create -s -e path data acl
# -s 指定为顺序节点 -e指定为临时节点 path 为路径 data为数据 acl指定权限控制
# 例:
# 创建持久节点 
create /zk-permanent
# 创建顺序持久节点 数据123
create -s /zk-order 123
# 创建临时节点
create -e /zk-temp
# 创建临时顺序节点
create -s -e /zk-order-temp 

查找数据

# 格式
ls [-s] [-w] [-R] path 
[-s] : 查看某一节点下的子节点加当前节点的元信息,相当于之前版本的ls2命令
[-w] : 查看节点并为节点添加一个监听,当节点被修改时,该客户端会收到一个回调
       之前版本是在path 后面加一个watch实现:ls path watch 
[-R] : 返回当前节点路径,当前节点的子节点,当前节点的子节点的子节点(递归)

get [-s] [-w] path 
[-s] : 查看节点数据加元信息
[-w] : 查看节点并为节点添加一个监听,当节点被修改时,该客户端会收到一个回调
       之前版本是在path 后面加一个watch实现:get path watch 

# 区别
ls 命令获取`节点`下的`直系节点信息` get获取`节点`下的`数据信息 `

  • znode节点信息

    每一个对znode树的更新操作,都会被赋予一个全局唯一的ID,我们称之为zxid(ZooKeeper Transaction ID)。更新操作的ID按照发生的时间顺序升序排序。例如,z1大于z2,那么z1的操作就早于z2操作。

    czxid,创建(create)该 znode 的 zxid
    mzxid,最后一次修改(modify)该 znode 的 zxid
    pzxid,最后一次修改该 znode 子节点的 zxid
    ctime,创建该 znode 的时间
    mtime,最后一次修改该 znode 的时间
    dataVersion,该节点内容的版本,每次修改内容,版本都会增加
    cversion,该节点子节点的版本
    aclVersion,该节点的 ACL 版本
    ephemeralOwner,如果该节点是临时节点(ephemeral node),会列出该节点所在客户端的 session id;如果不是临时节点,该值为 0
    dataLength,该节点存储的数据长度
    numChildren,该节点子节点的个数

修改数据

# 格式
set [-s] [-v version] path data 
[-s] : 返回修改后节点的元信息
[-v version] : 指定数据的版本,版本不符合时修改失败,类似关系型数据库的乐观锁
path : 修改节点路径
data : 修改的数据

删除数据

# 格式
delete [-v version] path
[-v version] : 指定数据的版本,版本不符合时删除失败,类似关系型数据库的乐观锁
path : 删除的节点路径
# 说明: 如果节点下有子节点无法删除 需要先删除子节点

# 直接删除包括子节点
deleteall path [-b batch size]
path : 删除的节点路径
[-b batch size] : 

Zookeeper api的使用

导入maven依赖

        <!-- 版本与zookeeper一致 v3.6.3-->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>{zk-version}</version>
        </dependency>

建立连接

package top.mengshuo;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
 * @author mengshuo
 * @since 2021-10-11
 */
public class CreateSession implements Watcher {

    /**
     * 初始化计数器 设置等待1个线程
     */
    private static final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(1);


    public static void main(String[] args) throws IOException, InterruptedException {

        /*
         * connectString: 连接信息 ip:port
         * sessionTimeout: 连接超时时间 单位毫秒
         * watcher: 监听器 当触发事件后ZK会通过通过watcher通知到客户端
         */
        ZooKeeper zooKeeper = new ZooKeeper("192.168.59.101:2181", 1000, new CreateSession());

        System.out.println(zooKeeper.getState());
        // 阻塞main线程(方法)执行 当计数器等于0后会继续执行
        COUNT_DOWN_LATCH.await();
        System.out.println(zooKeeper.getState());

    }

    /**
     * 监听回调方法
     * 当前类实现了Watcher接⼝,重写了process⽅法,该⽅法负责处理来⾃Zookeeper服务端的 watcher通知,
     * 在收到服务端发送过来的SyncConnected事件之后,解除主程序在CountDownLatch上 的等待阻塞,⾄此,会话创建完毕
     *
     * @param event 回调数据
     */
    @Override
    public void process(WatchedEvent event) {

        if (event.getState() == Event.KeeperState.SyncConnected) {
            // 计数器减一 main方法执行
            COUNT_DOWN_LATCH.countDown();
        }
    }
}

添加数据

package top.mengshuo;

import org.apache.zookeeper.*;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;

/**
 * @author mengshuo
 * @since 2021-10-12
 */
public class CreateNode implements Watcher {

    /**
     * 初始化计数器 设置等待1个线程
     */
    private static final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(1);
    private static ZooKeeper zooKeeper;


    public static void main(String[] args) throws Exception {

        /*
         * connectString: 连接信息 ip:port
         * sessionTimeout: 连接超时时间 单位毫秒
         * watcher: 监听器 当触发事件后ZK会通过通过watcher通知到客户端
         */
        zooKeeper = new ZooKeeper("192.168.59.101:2181", 1000, new CreateNode());

        System.out.println(zooKeeper.getState());
        // 阻塞main线程(方法)执行 当计数器等于0后会继续执行
        COUNT_DOWN_LATCH.await();
        System.out.println(zooKeeper.getState());

        CreateNode.create();
    }

    /**
     * 监听回调方法
     * 当前类实现了Watcher接⼝,重写了process⽅法,该⽅法负责处理来⾃Zookeeper服务端的 watcher通知,
     * 在收到服务端发送过来的SyncConnected事件之后,解除主程序在CountDownLatch上 的等待阻塞,⾄此,会话创建完毕
     *
     * @param event 回调数据
     */
    @Override
    public void process(WatchedEvent event) {

        if (event.getState() == Event.KeeperState.SyncConnected) {
            // 计数器减一 main方法执行
            COUNT_DOWN_LATCH.countDown();
        }
    }

    /**
     * 创建
     *
     * @throws InterruptedException exception
     * @throws KeeperException      exception
     */
    public static void create() throws InterruptedException, KeeperException {

        /*
          path: 节点创建路径
          data: 节点创建的数据 byte类型
          acl: 节点创建的权限信息
               ZooDefs.Ids.ANYONE_ID_UNSAFE 表示任何⼈
               ZooDefs.Ids.AUTH_IDS 此ID仅可⽤于设置ACL。它将被客户机验证的ID替换
               ZooDefs.Ids.OPEN_ACL_UNSAFE 这是⼀个完全开放的ACL(常⽤)--> word:anyone
               ZooDefs.Ids.CREATOR_ALL_ACL 此ACL授予创建者身份验证ID的所有权限
         
          createMode: 节点创建的类型
               PERSISTENT 持久节点
               PERSISTENT_SEQUENTIAL 持久顺序节点
               EPHEMERAL 临时节点
               EPHEMERAL_SEQUENTIAL 临时顺序节点
         */
        String persistentNode = zooKeeper.create("/zk-api-test-persistent", "api创建的持久节点".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        String persistentSequentialNode = zooKeeper.create("/zk-api-test-persistent-sequential", "api创建的持久顺序节点".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
        String ephemeralNode = zooKeeper.create("/zk-api-test-ephemeral", "api创建的临时节点".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
        System.out.println("创建的持久节点: " + persistentNode);
        System.out.println("创建的持久顺序节点: " + persistentSequentialNode);
        System.out.println("创建的临时节点: " + ephemeralNode);
    }
}

查询数据

package top.mengshuo;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * @author mengshuo
 * @since 2021-10-12
 */
public class GetNodeData implements Watcher {

    /**
     * 初始化计数器 设置等待1个线程
     */
    private static final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(1);
    private static ZooKeeper zooKeeper;


    public static void main(String[] args) throws Exception {

        /*
         * connectString: 连接信息 ip:port
         * sessionTimeout: 连接超时时间 单位毫秒
         * watcher: 监听器 当触发事件后ZK会通过通过watcher通知到客户端
         */
        zooKeeper = new ZooKeeper("192.168.59.101:2181", 1000, new GetNodeData());

        System.out.println("连接状态: "+zooKeeper.getState());
        // 阻塞main线程(方法)执行 当计数器等于0后会继续执行
        COUNT_DOWN_LATCH.await();


    }

    /**
     * 监听回调方法
     * 当前类实现了Watcher接⼝,重写了process⽅法,该⽅法负责处理来⾃Zookeeper服务端的 watcher通知,
     * 在收到服务端发送过来的SyncConnected事件之后,解除主程序在CountDownLatch上 的等待阻塞,⾄此,会话创建完毕
     *
     * @param event 回调数据
     */
    @Override
    public void process(WatchedEvent event) {
        /*
            当子节点数据发生变化时 会触发nodeChildrenChanged事件
            并且事件通知时一次性的 并不是每次变更都会通知
         */
        if (event.getType() == Event.EventType.NodeChildrenChanged) {
            System.out.println("子节点数据变更...");
            // 重新获取子节点信息并注册通知
            getChildren();
        }

        //当连接创建了,服务端发送给客户端SyncConnected事件
        if (event.getState() == Event.KeeperState.SyncConnected) {
            System.out.println("连接状态: "+zooKeeper.getState());
            // 计数器减一 main方法执行
            // 不再减一持续阻塞等待服务端的再次通知
            // COUNT_DOWN_LATCH.countDown();

            // 获取数据
            getData();

            // 获取子节点数据
            getChildren();
        }
    }


    /**
     * 获取节点数据
     */
    public static void getData() {
        /*
           path 获取数据的路径
           watch 是否注册监听器
           stat 节点状态信息 null表示获取最新数据
         */
        byte[] data = new byte[0];
        try {
            data = zooKeeper.getData("/zk-api-test-persistent", false, null);
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("节点数据: "+new String(data));
    }


    /**
     * 获取子节点信息
     */
    public static void getChildren() {
        List<String> childrenData = null;
        try {
            childrenData = zooKeeper.getChildren("/zk-api-test-persistent", true);
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("子节点信息: "+childrenData);
    }

}

  • 查询结果信息 & 监听

修改数据

package top.mengshuo;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.concurrent.CountDownLatch;

/**
 * @author mengshuo
 * @since 2021-10-12
 */
public class UpdateNodeData implements Watcher {

    /**
     * 初始化计数器 设置等待1个线程
     */
    private static final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(1);
    private static ZooKeeper zooKeeper;


    public static void main(String[] args) throws Exception {

        /*
         * connectString: 连接信息 ip:port
         * sessionTimeout: 连接超时时间 单位毫秒
         * watcher: 监听器 当触发事件后ZK会通过通过watcher通知到客户端
         */
        zooKeeper = new ZooKeeper("192.168.59.101:2181", 1000, new UpdateNodeData());

        System.out.println(zooKeeper.getState());
        // 阻塞main线程(方法)执行 当计数器等于0后会继续执行
        COUNT_DOWN_LATCH.await();
        System.out.println(zooKeeper.getState());

        UpdateNodeData.update();
    }

    /**
     * 监听回调方法
     * 当前类实现了Watcher接⼝,重写了process⽅法,该⽅法负责处理来⾃Zookeeper服务端的 watcher通知,
     * 在收到服务端发送过来的SyncConnected事件之后,解除主程序在CountDownLatch上 的等待阻塞,⾄此,会话创建完毕
     *
     * @param event 回调数据
     */
    @Override
    public void process(WatchedEvent event) {

        if (event.getState() == Event.KeeperState.SyncConnected) {
            // 计数器减一 main方法执行
            COUNT_DOWN_LATCH.countDown();
        }
    }

    /**
     * 更新
     *
     * @throws InterruptedException exception
     * @throws KeeperException      exception
     */
    public static void update() throws InterruptedException, KeeperException {

        byte[] data = zooKeeper.getData("/zk-api-test-persistent", false, null);
        System.out.println("修改之前的数据: " + new String(data));

        /*
            path 所修改的数据节点路径
            data 新数据内容
            version 所要修改的数据版本 -1表示修改最新版本

            返回值 stat 为节点信息
         */
        Stat stat = zooKeeper.setData("/zk-api-test-persistent",
                ("修改时间: "+LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))).getBytes(),
                -1);
        
        byte[] newData = zooKeeper.getData("/zk-api-test-persistent", false, null);
        System.out.println("修改之后的数据: " + new String(newData));

    }
}

  • 结果

删除节点数据

package top.mengshuo;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.util.concurrent.CountDownLatch;

/**
 * @author mengshuo
 * @since 2021-10-12
 */
public class DeleteNode implements Watcher {

    /**
     * 初始化计数器 设置等待1个线程
     */
    private static final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(1);
    private static ZooKeeper zooKeeper;


    public static void main(String[] args) throws Exception {

        /*
         * connectString: 连接信息 ip:port
         * sessionTimeout: 连接超时时间 单位毫秒
         * watcher: 监听器 当触发事件后ZK会通过通过watcher通知到客户端
         */
        zooKeeper = new ZooKeeper("192.168.59.101:2181", 1000, new DeleteNode());

        System.out.println(zooKeeper.getState());
        // 阻塞main线程(方法)执行 当计数器等于0后会继续执行
        COUNT_DOWN_LATCH.await();
        System.out.println(zooKeeper.getState());

        DeleteNode.delete();
    }

    /**
     * 监听回调方法
     * 当前类实现了Watcher接⼝,重写了process⽅法,该⽅法负责处理来⾃Zookeeper服务端的 watcher通知,
     * 在收到服务端发送过来的SyncConnected事件之后,解除主程序在CountDownLatch上 的等待阻塞,⾄此,会话创建完毕
     *
     * @param event 回调数据
     */
    @Override
    public void process(WatchedEvent event) {

        if (event.getState() == Event.KeeperState.SyncConnected) {
            // 计数器减一 main方法执行
            COUNT_DOWN_LATCH.countDown();
        }
    }

    /**
     * 删除
     *
     * @throws InterruptedException exception
     * @throws KeeperException      exception
     */
    public static void delete() throws InterruptedException, KeeperException {
        // 判断节点是否存在
        Stat stat = zooKeeper.exists("/zk-api-test-persistent/children-03", false);
        boolean exists = stat != null;
        if (exists) {
            System.out.println("节点存在 进行删除...");
            // 删除节点数据
            zooKeeper.delete("/zk-api-test-persistent/children-03", -1);
            System.out.println("删除后的节点状态: " + zooKeeper.exists("/zk-api-test-persistent/child02", false).toString());
        } else {
            System.out.println("节点不存在");
        }

    }
}

  • 执行结果

  • 删除之前节点信息

  • 删除之后节点信息

    ps: children-02 之前删除过了

ZK客户端-Curator的使用

Curator介绍

curator是Netflix公司开源的⼀套Zookeeper客户端框架,和ZKClient⼀样,Curator解决了很多 Zookeeper客户端⾮常底层的细节开发⼯作,包括连接重连,反复注册Watcher和 NodeExistsException异常等,是最流⾏的Zookeeper客户端之⼀。从编码⻛格上来讲,它提供了基于 Fluent的编程⻛格⽀持

Curator使用

导入pom依赖

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>5.2.0</version>
</dependency>

创建会话

  • 创建会话的两个方法源码
/**
 * 创建连接 指定默认的会话&连接超时时间
 * 此方法调用下面的四个参数的方法 同时指定了默认参数
 * 
 * @param connectString       连接信息 格式: ip:port,ip:port...
 * @param retryPolicy         重试策略
 * @return client
 */
public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy)
{
    return newClient(connectString, DEFAULT_SESSION_TIMEOUT_MS, DEFAULT_CONNECTION_TIMEOUT_MS, retryPolicy);
}

/**
 * 创建连接 使用了 建造者(Builder)模式
 * 此方法调用{@link CuratorFrameworkFactory#builder()}方法返回内部Builder类对象
 * 同时设置Builder对象中的属性最后调用{@link CuratorFrameworkFactory.Builder#build()} 方法
 * 由build方法构建一个 CuratorFramework 的子类 CuratorFrameworkImpl 
 * 
 * @param connectString       连接信息 格式: ip:port,ip:port...
 * @param sessionTimeoutMs    会话超时时间
 * @param connectionTimeoutMs 连接超时时间
 * @param retryPolicy         重试策略
 * @return client
 */
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
{
    return builder().
        connectString(connectString).
        sessionTimeoutMs(sessionTimeoutMs).
        connectionTimeoutMs(connectionTimeoutMs).
        retryPolicy(retryPolicy).
        build();
}

其中参数RetryPolicy提供重试策略的接⼝,可以让⽤户实现⾃定义的重试策略 默认提供了以下实现,分别为:

  1. ExponentialBackoffRetry(基于backoff的重连策略)构造器含有三个参数 :
  • ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)

    • baseSleepTimeMs:初始的sleep时间,⽤于计算之后的每次重试的sleep时间, 计算公式:当前sleep时间=baseSleepTimeMs*Math.max(1, -random.nextInt(1<<(retryCount+1)))

    • maxRetries:最⼤重试次数

    • maxSleepMs:最⼤sleep时间,如果上述的当前sleep计算出来⽐这个⼤,那么sleep⽤ 这个时间,默认的最⼤时间是Integer.MAX_VALUE毫秒

  1. RetryNTimes(重连N次策略)

  2. RetryForever(永远重试策略)

由以上可得知我们可以直接使用Builder模式进行构建CuratorFramework对象

  • 创建连接示例
package top.mengshuo.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * @author mengshuo
 * @since 2021-10-14
 */
public class CreateSession {


    public static void main(String[] args) {
        /*
            baseSleepTimeMs 初始的sleep时间,⽤于计算之后的每次重试的sleep时间
            maxRetries 最大重试次数
         */
        RetryPolicy retry = new ExponentialBackoffRetry(1000, 5);
        /*
            建立连接 两个参数
            connectString 连接信息 格式: ip:port,ip:port...
            retryPolicy 重试策略
         */
        CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.59.101:2181", retry);
        client.start();

        /*
            使用 fluent 方式
         */
        CuratorFramework curator = CuratorFrameworkFactory.builder()
                .connectString("192.168.59.101:2181")
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(3000)
                .retryPolicy(retry)
                .namespace("curator") // 命名空间 所有的操作都会在`/curator`节点下进行
                .build();

        curator.start();
    }

}

创建节点

  • api说明
  1. 创建一个空节点
client.create().forPath(path);
  1. 创建一个带数据的节点
client.create().forPath(path,data);
  1. 递归创建父节点并指定节点类型
`.creatingParentsIfNeeded()可以自动的创建不存在的节点`

`.withMode()指定创建的节点类型`
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
  • api创建测试
package top.mengshuo.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;

import java.nio.charset.StandardCharsets;

/**
 * @author mengshuo
 * @since 2021-10-14
 */
public class CreateNode {


    public static void main(String[] args) throws Exception {
        /*
            baseSleepTimeMs 初始的sleep时间,⽤于计算之后的每次重试的sleep时间
            maxRetries 最大重试次数
         */
        RetryPolicy retry = new ExponentialBackoffRetry(1000, 5);

        /*
            使用 fluent 方式
         */
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("192.168.59.101:2181")
                .sessionTimeoutMs(30000)
                .connectionTimeoutMs(10000)
                .retryPolicy(retry)
                .namespace("curator")
                .build();
        client.start();
        System.out.println("连接已创建...");
        String path = client.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath("/create/test", "创建测试".getBytes(StandardCharsets.UTF_8));
        System.out.println("创建完成: nameSpace = " + client.getNamespace() + "  路径 = " + path);
    }

}

结果:
[zk: localhost:2181(CONNECTED) 81] ls /
[zookeeper]
[zk: localhost:2181(CONNECTED) 82] get /curator/create/test
创建测试

删除节点

  • api说明
  1. 删除指定节点
client.delete().forPath(path)
  1. 指定版本
client.delete().withVersion(version).forPath(path)
  1. 强制保证删除
client.delete().guaranteed().forPath()
  1. `递归删除 ( 删除子节点 )并指定版本
client.delete().deletingChildrenIfNeeded().withVersion(version).forPath(path)
  • api删除测试
package top.mengshuo.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;

import java.nio.charset.StandardCharsets;

/**
 * @author mengshuo
 * @since 2021-10-14
 */
public class DeleteNode {


    public static void main(String[] args) throws Exception {
        /*
            baseSleepTimeMs 初始的sleep时间,⽤于计算之后的每次重试的sleep时间
            maxRetries 最大重试次数
         */
        RetryPolicy retry = new ExponentialBackoffRetry(1000, 5);

        /*
            使用 fluent 方式
         */
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("192.168.59.101:2181")
                .sessionTimeoutMs(30000)
                .connectionTimeoutMs(10000)
                .retryPolicy(retry)
                .namespace("curator")
                .build();
        client.start();
        System.out.println("连接已创建...");
        // 递归删除
        client.delete().deletingChildrenIfNeeded().forPath("/create");
        System.out.println("删除完成...");

    }

}

结果:
[zk: localhost:2181(CONNECTED) 28] ls /
[curator, zookeeper]
[zk: localhost:2181(CONNECTED) 29] ls /curator
[create]
[zk: localhost:2181(CONNECTED) 30] ls /curator
[]
[zk: localhost:2181(CONNECTED) 31] ls /
[curator, zookeeper]

修改节点

  • api说明

    1. 查询节点数据
client.getData().forPath(path)
1. 查询节点信息
data = client.getData().storingStatIn(stat).forPath(path);
  • api测试
package top.mengshuo.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

import java.nio.charset.StandardCharsets;

/**
 * @author mengshuo
 * @since 2021-10-14
 */
public class GetNodeData {


    public static void main(String[] args) throws Exception {
        /*
            baseSleepTimeMs 初始的sleep时间,⽤于计算之后的每次重试的sleep时间
            maxRetries 最大重试次数
         */
        RetryPolicy retry = new ExponentialBackoffRetry(1000, 5);

        /*
            使用 fluent 方式
         */
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("192.168.59.101:2181")
                .sessionTimeoutMs(30000)
                .connectionTimeoutMs(10000)
                .retryPolicy(retry)
                .namespace("curator")
                .build();
        client.start();
        System.out.println("连接已创建...");
        byte[] data = client.getData().forPath("/create/test");
        System.out.println("获取成功完成: " + new String(data));
        Stat stat = new Stat();
        data = client.getData().storingStatIn(stat).forPath("/create/test");
        System.out.println("获取成功完成: " + new String(data));
        System.out.println("节点状态信息: "+stat);

    }

}

结果:
连接已创建...
获取成功完成: 创建测试
获取成功完成: 创建测试
节点状态信息: 181,181,1634462939342,1634462939342,0,0,0,0,12,0,181

更新数据

  • api说明

    1. 更新数据
client.setData().forPath(path,data)
1. 更新指定版本的数据
client.setData().withVersion(version).forPath(path,data)
  • api测试
package top.mengshuo.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;

import java.nio.charset.StandardCharsets;

/**
 * @author mengshuo
 * @since 2021-10-14
 */
public class UpdateNodeData {


    public static void main(String[] args) throws Exception {
        /*
            baseSleepTimeMs 初始的sleep时间,⽤于计算之后的每次重试的sleep时间
            maxRetries 最大重试次数
         */
        RetryPolicy retry = new ExponentialBackoffRetry(1000, 5);

        /*
            使用 fluent 方式
         */
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("192.168.59.101:2181")
                .sessionTimeoutMs(30000)
                .connectionTimeoutMs(10000)
                .retryPolicy(retry)
                .namespace("curator")
                .build();
        client.start();
        System.out.println("连接已创建...");
        byte[] data = client.getData().forPath("/create/test");
        System.out.println("旧数据: "+new String(data));
        client.setData().forPath("/create/test","更新数据".getBytes(StandardCharsets.UTF_8));
        System.out.println("更新完成");
        data = client.getData().forPath("/create/test");
        System.out.println("新数据: "+new String(data));

    }

}

结果:
连接已创建...
旧数据: 创建测试
更新完成
新数据: 更新数据

8
博主关闭了所有页面的评论