相关推荐recommended
Zookeeper之快速入门
作者:mmseoamin日期:2024-02-03

前言

本篇文章主要还是让人快速上手入门,想要深入的话可以通过书籍系统的学习。

简介

是什么

可用于协调、构建分布式应用。

本质上是一个分布式的小文件存储系统。提供基于类似于文件系统的目录树方式的数据存储,并且可以对树中的节点进行有效管理。从而用来维护和监控你存储的数据的状态变化。通过监控这些数据状态的变化,从而可以达到基于数据的集群管理。

ZooKeeper 适用于存储和协同相关的关键数据,不适合用于大数据量存储。

应用场景

  • 配置中心
  • 注册中心
  • 分布式锁
  • 分布式队列
  • 负载均衡器
  • DNS服务
  • Master选举

    安装

    • 下载地址:https://zookeeper.apache.org/releases.html

    • 创建数据存储目录

      Zookeeper之快速入门,image.png,第1张

      • 拷贝zoo_sample.cfg并重命名为zoo_.cfg,打开并配置数据存储目录

        Zookeeper之快速入门,image.png,第2张

        • 启动服务器和客户端

          Zookeeper之快速入门,image.png,第3张

          基础概念

          存储模型

          类似于文件系统的结构。

          Zookeeper 的层次模型称作DataTree。DataTree 的每个节点叫作znode。不同于文件系统,每个节点都可以保存数据。每个节点都有一个版本(version),版本从0 开始计数。

          Zookeeper之快速入门,DataTree,第4张

          节点的分类

          是否持久?

          • 持久性的:持久化到磁盘。
          • 临时性的:保存在内容中,Server宕机、Client超时未跟Server连接都会丢失(客户端关闭节点也消失)。

            是否有序?

            • 持久有序的。
            • 临时有序的。

              每个顺序节点,都分配一个唯一且是在之前基础上递增的整数。

              客户端命令

              帮助help

              查询路径下的节点ls

              查看根节点:ls /

              查看子节点:ls /app1

              创建节点

              • 普通节点

                create /app1

                • 顺序节点

                  create -s /app2

                  • 临时节点

                    create -e /app3

                    • 临时有序节点

                      create -e -s /app4

                      查看节点

                      查询节点数据

                      get /app1

                      节点状态

                      stat /app1

                      ## -------------------------节点的状态信息,也称为stat结构体---------------------
                      cZxid = 0x17f ## 该数据节点被创建时的事务id
                      #其中zxid表示的是zookeeper的事务ID,由64位数字组成,分为高32位和低32位
                      ctime = Sat Dec 21 19:47:36 CST 2019 ## 该数据节点创建时间
                      mZxid = 0x17f ## 该数据节点被修改时最新的事物id
                      mtime = Sat Dec 21 19:47:36 CST 2019 ## 该数据节点最后更新时间
                      pZxid = 0x183 ## 当前节点的父级节点事务ID
                      cversion = 4 ## znode子节点变化号,znode子节点修改次数
                      dataVersion = 0 ## znode数据变化号
                      aclVersion = 0 ## 访问控制列表的变化号 access control
                      ephemeralOwner = 0x0 ## 如果临时节点,表示当前节点的拥有者的sessionId。如果不是临时节点,则值为0
                      dataLength = 6 ## 数据长度
                      numChildren = 4 ## 子节点数据
                      

                      修改、删除节点

                      • 修改

                        set /app1 12345

                        • 删除

                          delete /app20000000002

                          • 删除多层节点

                            deleteall /app1

                            拥有子节点的父节点,无法使用delete删除

                            案例:使用JAVA API操作ZK

                            依赖
                            
                                        org.apache.zookeeper
                                        zookeeper
                                        3.4.8
                                    
                                    
                                        org.apache.curator
                                        curator-client
                                        4.0.0
                                        
                                            
                                                org.apache.zookeeper
                                                zookeeper
                                            
                                        
                                    
                                    
                                        org.apache.curator
                                        curator-framework
                                        4.0.0
                                        
                                            
                                                org.apache.zookeeper
                                                zookeeper
                                            
                                        
                                    
                                    
                                        org.apache.curator
                                        curator-recipes
                                        4.0.0
                                    
                            
                            创建节点
                            • 创建连接对象
                                  @Before
                                  public void initClient() {
                                      //初始化Zookeeper的客户端对象client
                                      String connectString = "127.0.0.1:2181";
                                      RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 100);
                                      int sessionTimeoutMs = 60 * 1000;//当前客户端会话超时时间
                                      int connectionTimeoutMs = 15 * 1000;//连接超时时间
                                      client = CuratorFrameworkFactory
                                              .newClient(connectString, sessionTimeoutMs,
                                                      connectionTimeoutMs, retryPolicy);
                                  }
                                  @After
                                  public void destroy() {
                                      client.close();
                                  }
                              
                              • 创建节点
                                @After
                                    public void destroy() {
                                        client.close();
                                    }
                                    @Test
                                    public void createNode() throws Exception {
                                        //3种方式,四种节点类型
                                        client.start();
                                        //方式1: 创建空节点
                                //        client.create().forPath("/app3");
                                        //方式2: 创建有内容节点
                                //        client.create().forPath("/app4", "app3Node".getBytes());
                                        //方式3: 创建多层节点
                                //        client.create().creatingParentsIfNeeded().forPath("/app5/a", "aa".getBytes());
                                        //节点节点类型1 : 持久节点                   CreateMode.PERSISTENT
                                        //节点节点类型2 : 临时节点【客户端关闭则节点消失】CreateMode.EPHEMERAL
                                //        client.create().withMode(CreateMode.EPHEMERAL).forPath("/app6", "app5Node".getBytes());
                                        //节点节点类型3 : 持久节点+自带序号 CreateMode.PERSISTENT_SEQUENTIAL
                                //        client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/app7", "app6Node".getBytes());
                                        //节点节点类型4 : 临时节点+自带序号【客户端关闭则节点消失】CreateMode.EPHEMERAL_SEQUENTIAL
                                //        client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/app8", "app6Node".getBytes());
                                        //Thread.sleep(9000);//休眠9秒,观察效果
                                        //3.关闭客户端,释放资源
                                        client.close();
                                    }
                                
                                修改节点数据
                                    @Test
                                    public void updateNode() throws Exception {
                                        client.start();
                                        client.setData().forPath("/app1", "app1Node".getBytes());
                                        client.close();
                                    }
                                
                                查询节点数据
                                    @Test
                                    public void getNode() throws Exception {
                                        //1.开启客户端
                                        client.start();
                                        //2.获取节点数据
                                        byte[] nodeDateBytes = client.getData().forPath("/app1");
                                        System.out.println("节点数据 = " + new String(nodeDateBytes));
                                        //3.关闭客户端,释放资源
                                        client.close();
                                    }
                                
                                删除节点
                                    @Test
                                    public void deleteNode() throws Exception {
                                        //1.开启客户端
                                        client.start();
                                        //2.获取节点数据
                                        //方式1: 删除一个节点
                                        //client.delete().forPath("/app3");
                                        //方式2: 递归删除多个节点
                                        //client.delete().deletingChildrenIfNeeded().forPath("/app3");
                                        //方式3: 强制删除【避免一些因为网络传输导致的删除不成功】
                                        client.delete().guaranteed().forPath("/app3");
                                        //3.关闭客户端,释放资源
                                        client.close();
                                    }
                                

                                ZK监听机制详解

                                watch机制如何运作?

                                watch机制本质就是订阅发布!采用了推拉结合的模式。

                                • **推 **: 服务端感知到内容变化了,会发送事件信息给关注watch节点的客户端。事件本身是轻量的不含变更内容,这是“推”。
                                • 拉 : 客户端收到变更通知事件,客户端需要自己去拉变更数据,这是“拉”。

                                  Watch机制允许客户端在ZooKeeper上的数据节点发生变化时获得通知。它提供了一种事件驱动的编程模型,使得应用程序可以实时地监控和响应数据的变化,而无需持续地查询。

                                  当客户端在某个数据节点上注册了一个Watch,它会在以下三种情况下被触发:

                                  • 数据节点的创建:当指定节点被创建时,与该节点相关联的Watch会被触发。
                                  • 数据节点的删除:当指定节点被删除时,与该节点相关联的Watch会被触发。
                                  • 数据节点的数据更新:当指定节点的数据发生变化时,与该节点相关联的Watch会被触发。

                                    监听原理详解

                                    Zookeeper观察者Watcher由三个部分组成,涉及消息通信及数据存储。

                                    • 客户端Client
                                    • 客户端监听管理器WatchManager
                                    • Zookeeper服务器Server

                                      监听过程:

                                      1. Client向Server注册监听事件。
                                      2. Client将观察者Watcher存储到客户端的WatchManager里。
                                      3. Server触发对应事件之后,向Client推送(Push)消息通知。
                                      4. Client线程从WatchManager拿取回调Watcher执行业务逻辑,拉取节点数据(Pull)。

                                      Zookeeper之快速入门,image.png,第5张

                                      假如我们监听B节点,那么也可以监听B1、B2,但是孙子节点B11无法监听到。

                                      Zookeeper之快速入门,image.png,第6张

                                      注意:watcher设置后一旦触发一次后就会失效,如果要想一直监听,需要在process回调函数里重新注册相同的 watcher。

                                      Apache Curator框架的监听实现:

                                      Curator对watcher机制做了优化,Curator引入了Cache的概念用来实现对Zookeeper服务器端进行事件监听。Cache是Curator对事件监听的包装,其对事件的监听可以近似看做是一个本地缓存视图和远程Zookeeper视图的对比过程。而且Curator会自动的再次监听,我们就不需要自己手动的重复监听了。

                                      **Curator客户端的Cache共有三种模式: **

                                      • 监听某节点的变化。
                                      • 监听某节点的子节点变化。
                                      • 监听整个树变化。

                                        监听的三种模式

                                        监听某节点的变化
                                        public class Demo02Watch {
                                            CuratorFramework client;
                                            @Before
                                            public void initClient() {
                                                //初始化Zookeeper的客户端对象client
                                                String connectString = "127.0.0.1:2181";
                                                RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 100);
                                                int sessionTimeoutMs = 60 * 1000;//当前客户端会话超时时间
                                                int connectionTimeoutMs = 15 * 1000;//连接超时时间
                                                client =  CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
                                            }
                                            @After
                                            public void destroy(){
                                                client.close();
                                            }
                                            /**
                                             * 1.监听节点数据变化
                                             */
                                            @Test
                                            public void listenNode() throws Exception {
                                                //1.启动客户端
                                                client.start();
                                                System.out.println("连接Zookeeper成功~~~~~~~~~~~~~~~~~~~");
                                                //2.创建节点监听对象NodeCache : 设置监听节点、监听回调方法
                                                NodeCache nodeCache = new NodeCache(client, "/hero");
                                                //设置监听节点
                                                ChildData currentData = nodeCache.getCurrentData();
                                                System.out.println("当前节点数据 = " + currentData);
                                                //开启监听
                                                nodeCache.start(true);
                                                //设置监听回调方法
                                                nodeCache.getListenable().addListener(new NodeCacheListener() {
                                                    /**
                                                     * 如果节点数据有变化,回调当前方法
                                                     */
                                                    @Override
                                                    public void nodeChanged() throws Exception {
                                                        //获取当前节点的数据
                                                        ChildData currentData = nodeCache.getCurrentData();
                                                        //获取最新节点名称
                                                        String path = currentData.getPath();
                                                        System.out.println("节点名称 = " + path);
                                                        //获取最新节点数据
                                                        byte[] currentDataByte = currentData.getData();
                                                        System.out.println("修改后节点数据" + new String(currentDataByte));
                                                        System.out.println("--------------->>");
                                                    }
                                                });
                                                //从输入流中读取数据的下一个字节。
                                                //阻塞主线程
                                                //System.in.read();
                                                Thread.sleep(100000);
                                            }
                                        }
                                        
                                        监听节点的子节点

                                        PathChildrenCache是用来监听指定节点的子节点变化情况【新增、修改、删除】。

                                        启动模式:

                                        1. NORMAL:这是默认的启动模式。在NORMAL模式下,PathChildrenCache会从指定的路径开始监听子节点的变化,并将子节点的当前状态缓存在本地。当有子节点增加、删除或更新时,PathChildrenCache会触发相应的事件通知,并更新本地缓存。
                                        2. BUILD_INITIAL_CACHE:在使用该模式启动PathChildrenCache之前,需要先调用rebuild()方法来构建初始的缓存。在BUILD_INITIAL_CACHE模式下,PathChildrenCache会首先从ZooKeeper服务器获取指定路径下的所有子节点,并将它们的状态缓存在本地。然后,PathChildrenCache开始监听子节点的变化,并在本地缓存的基础上进行更新。
                                        3. POST_INITIALIZED_EVENT:在使用该模式启动PathChildrenCache之前,需要先调用rebuild()方法来构建初始的缓存,类似于BUILD_INITIAL_CACHE模式。不同之处在于,使用POST_INITIALIZED_EVENT模式启动PathChildrenCache后,它会在完成初始缓存构建后,发送一个PathChildrenCacheEvent.Type.INITIALIZED类型的事件通知。

                                        如何选用合适的模式?

                                        • 如果只关心子节点的当前状态,并希望在子节点变化时及时得到通知,可以使用NORMAL模式。
                                        • 如果需要在启动时获取所有子节点的初始状态,并维护一个本地缓存,可以选择BUILD_INITIAL_CACHE模式。
                                        • 如果需要在初始缓存构建完成后得到一个初始化完成的事件通知,可以选择POST_INITIALIZED_EVENT模式,其它方面跟BUILD_INITIAL_CACHE差不多。

                                          触发回调的事件类型有哪些?

                                          1. Type.CONNECTION_RECONNECTED 重新连接
                                          2. Type.connection_lost 连接丢失
                                          3. Type.connection_suspended 连接暂停
                                          4. Type.INITIALIZED 初始化
                                          5. Type.CHILD_REMOVED 子节点移除
                                          6. Type.CHILD_ADDED 子节点添加
                                          7. Type.CHILD_UPDATED 子节点修改
                                              /**
                                               * 2.监听当前节点的子节点变化,不含节点数据
                                               */
                                              @Test
                                              public void listenSubNode2() throws Exception {
                                                  //1.启动客户端
                                                  client.start();
                                                  System.out.println("连接Zookeeper成功~~~~~~~~~~~~~~~~~~~");
                                                  //2.创建节点监听对象PathChildrenCache :
                                                  //参数3,是否缓存数据
                                                  PathChildrenCache childrenCache = new PathChildrenCache(client, "/hero", true);
                                                  // 开启监听
                                                  childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
                                                  // 设置监听回调方法
                                                  childrenCache.getListenable().addListener((client, event) -> {
                                                      //获取修改的数据
                                                      byte[] bytes = event.getData().getData();
                                                      System.out.println("节点内数据 = " + new String(bytes));
                                                      //获取被修改的子节点
                                                      System.out.println("节点名称 = " + event.getData().getPath());
                                                      //获取事件类型
                                                      //PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED 重新连接
                                                      //PathChildrenCacheEvent.Type.connection_lost 连接丢失
                                                      //PathChildrenCacheEvent.Type.connection_suspended 连接暂停
                                                      //PathChildrenCacheEvent.Type.INITIALIZED 初始化
                                                      //PathChildrenCacheEvent.Type.CHILD_REMOVED 子节点移除
                                                      //PathChildrenCacheEvent.Type.CHILD_ADDED 子节点添加
                                                      //PathChildrenCacheEvent.Type.CHILD_UPDATED 子节点修改
                                                      PathChildrenCacheEvent.Type type = event.getType();
                                                      System.out.println("事件触发类型 = " + type);
                                                      System.out.println("--------------------------------------------------->>");
                                                  });
                                                  //3.阻塞程序
                                                  System.in.read();
                                              }
                                          
                                          监听所有节点
                                          /**
                                           * 3.树形监听所有下级节点变化【模式1+模式2】,含节点数据变更
                                           */
                                          @Test
                                          public void treeCache() throws Exception {
                                              //1.启动客户端
                                              client.start();
                                              System.out.println("连接Zookeeper成功~~~~~~~~~~~~~~~~~~~");
                                              //2.创建节点监听对象TreeCache :
                                              TreeCache treeCache = new TreeCache(client, "/hero");
                                              //启动缓存
                                              treeCache.start();
                                              //添加监听回调方法
                                              treeCache.getListenable().addListener(new TreeCacheListener() {
                                                  @Override
                                                  public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                                                      //获取修改的数据
                                                      byte[] bytes = event.getData().getData();
                                                      System.out.println("节点内数据 = " + new String(bytes));
                                                      //获取被修改的子节点
                                                      System.out.println("节点名称 = " + event.getData().getPath());
                                                      //获取事件类型
                                                      //TreeCacheEvent.Type.CONNECTION_RECONNECTED 重新连接
                                                      //TreeCacheEvent.Type.connection_lost 连接丢失
                                                      //TreeCacheEvent.Type.connection_suspended 连接暂停
                                                      //TreeCacheEvent.Type.INITIALIZED 初始化
                                                      //TreeCacheEvent.Type.NODE_REMOVED 子节点移除
                                                      //TreeCacheEvent.Type.NODE_ADDED 子节点添加
                                                      //TreeCacheEvent.Type.NODE_UPDATED 子节点修改
                                                      TreeCacheEvent.Type type = event.getType();
                                                      System.out.println("事件触发类型 = " + type);
                                                      System.out.println("--------------------------------------------------->>");
                                                  }
                                              });
                                              //3.阻塞程序
                                              System.in.read();
                                          }
                                          

                                          结尾

                                          假如你认真读完本篇,那么你已经超越很多人了。

                                          如果你对其感兴趣,可以看看下一篇,我们基于ZK手写一个简单的分布式锁,加强实践,巩固知识。https://blog.csdn.net/qq_38974073/article/details/135293504