Java API操作ZK node

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 创建会话建立简单连接/** * 测试创建Zk会话 * Created by liuhuichao on 2017/7/25. */public class ZooKeeper_Constructor_Usage_Simple implements Watche...

创建会话

  • 建立简单连接
/**
 * 测试创建Zk会话
 * Created by liuhuichao on 2017/7/25.
 */
public class ZooKeeper_Constructor_Usage_Simple implements Watcher {
    private static CountDownLatch connectedSemaphore=new CountDownLatch(1);

    public static void main(String[] args) throws Exception{
        ZooKeeper zk=new ZooKeeper("192.168.99.215:2181",5000,new ZooKeeper_Constructor_Usage_Simple());
        System.out.println(zk.getState());
        connectedSemaphore.await();
        System.out.println("zk session established");
    }

    /**
     * 处理来自ZK服务端的watcher通知
     * @param watchedEvent
     */
    @Override
    public void process(WatchedEvent watchedEvent) {
        System.out.println("receive watched event:"+watchedEvent);
        if(Event.KeeperState.SyncConnected==watchedEvent.getState()){
            connectedSemaphore.countDown();//解除等待阻塞
        }
    }
}
  • 复用会话
/**
 * 复用sessionId和sessionPassword的会话
 * Created by liuhuichao on 2017/7/25.
 */
public class ZooKeeper_Constructor_Usage_With_sid_password implements Watcher {

    private static CountDownLatch connectedSemaphore=new CountDownLatch(1);
    @Override
    public void process(WatchedEvent watchedEvent) {
        System.out.println("receive watched event:"+watchedEvent);
        if(Event.KeeperState.SyncConnected==watchedEvent.getState()){
            connectedSemaphore.countDown();
        }


    }

    public static void main(String[] args) throws Exception{
        ZooKeeper zooKeeper=new ZooKeeper("lhc-centos0:2181",5000,new ZooKeeper_Constructor_Usage_With_sid_password());
        connectedSemaphore.await();
        long sessionId=zooKeeper.getSessionId();
        byte[] password=zooKeeper.getSessionPasswd();

        /**使用错误的sessionID跟sessionPwd连连接测试[192.168.99.215 lhc-centos0]**/
        ZooKeeper zkWrong=new ZooKeeper("lhc-centos0:2181",5000,new ZooKeeper_Constructor_Usage_With_sid_password(),1l,"lhc".getBytes());
        /**使用正确的来进行连接**/
        ZooKeeper zkTrue=new ZooKeeper("lhc-centos0:2181",5000,new ZooKeeper_Constructor_Usage_With_sid_password(),sessionId,password);
        Thread.sleep(Integer.MAX_VALUE);
    }
}

创建节点

  • 使用同步API创建节点
/**
 * 使用同步API创建一个节点
 * Created by liuhuichao on 2017/7/25.
 */
public class ZooKeeper_Create_API_Sync_Usage implements Watcher {
    private static CountDownLatch connectedSemaphore=new CountDownLatch(1);
    @Override
    public void process(WatchedEvent watchedEvent) {
        if(Event.KeeperState.SyncConnected==watchedEvent.getState()){
            connectedSemaphore.countDown();
        }
    }

    public static void main(String[] args) throws Exception{
        ZooKeeper zooKeeper=new ZooKeeper("192.168.99.215:2181",5000,new ZooKeeper_Create_API_Sync_Usage());
        connectedSemaphore.await();
        String path1=zooKeeper.create("/zk-test1","lhc".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);//临时结点
        System.out.println(path1+" 创建成功!");

        String path2=zooKeeper.create("/zk-test2","lllhhhhhhhhhhhhhhhhc".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println(path2+"  创建成功!");

    }
}
  • 使用异步API创建一个节点
/**
 * 使用异步API创建一个节点
 * Created by liuhuichao on 2017/7/25.
 */
public class ZooKeeper_Create_API_ASync_Usage implements Watcher{
    private static CountDownLatch connectedSamphore=new CountDownLatch(1);

    @Override
    public void process(WatchedEvent watchedEvent) {
        if(watchedEvent.getState()== Event.KeeperState.SyncConnected){
            connectedSamphore.countDown();
        }
    }

    public static void main(String[] args) throws Exception{
        ZooKeeper zk1=new ZooKeeper("lhc-centos0:2181",5000,new ZooKeeper_Create_API_ASync_Usage());
        connectedSamphore.await();
        zk1.create("/zk-test-1","".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT,new IStringCallBack(),"i am a context");
        zk1.create("/zk-test-2","".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT,new IStringCallBack(),"i am a context");
        zk1.create("/zk-test-3","".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT_SEQUENTIAL,new IStringCallBack(),"i am a context");
        Thread.sleep(Integer.MAX_VALUE);
    }
}


/**
 * Created by liuhuichao on 2017/7/26.
 */
public class IStringCallBack implements AsyncCallback.StringCallback{

        @Override
        public void processResult(int rc, String path, Object ctx, String name) {
            System.out.println("result:"+rc+"; path="+path+" ctx="+ctx+" name = "+name);
        }
}

删除节点

/**
 * 删除zk的持久结点
 * Created by liuhuichao on 2017/7/26.
 */
public class ZooKeeperDeleteNode implements Watcher {

    private  static CountDownLatch conntedSamphore=new CountDownLatch(1);
    @Override
    public void process(WatchedEvent watchedEvent) {
        if(Event.KeeperState.SyncConnected==watchedEvent.getState()){
            conntedSamphore.countDown();
        }

    }

    public static void main(String[] args) throws Exception{
        /**同步删除节点**/
        ZooKeeper zooKeeper=new ZooKeeper("lhc-centos0:2181",5000,new ZooKeeperDeleteNode());
        conntedSamphore.await();
        zooKeeper.delete("/zk-test-30000000014",0);
    }


}

读取数据

  • 使用同步API获取子节点列表
/**
 *获取结点-同步
 * Created by liuhuichao on 2017/7/26.
 */
public class ZooKeeper_GetChildren_API_Sync_Usage implements Watcher {
    private  static CountDownLatch conntedSamphore=new CountDownLatch(1);
    private static ZooKeeper zooKeeper=null;

    @Override
    public void process(WatchedEvent watchedEvent) {
        if(Event.KeeperState.SyncConnected==watchedEvent.getState()){
            conntedSamphore.countDown();
        }else if(watchedEvent.getType()== Event.EventType.NodeChildrenChanged){
            try {
                System.out.println("--------------------------------------reget children:"+zooKeeper.getChildren(watchedEvent.getPath(),true));
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }


    public static void main(String[] args) throws Exception{
        String path="/zk-test-1";
        zooKeeper=new ZooKeeper("lhc-centos0:2181",5000,new ZooKeeper_GetChildren_API_Sync_Usage());
        conntedSamphore.await();
        zooKeeper.create(path+"/test1","".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);
        List<String> childrenList=zooKeeper.getChildren(path,true);
        System.out.println(childrenList);
        zooKeeper.create(path+"/test2","".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);
        Thread.sleep(Integer.MAX_VALUE);
    }

}


  • 使用异步API获取子节点列表
**
 * 异步获取结点
 * Created by liuhuichao on 2017/7/26.
 */
public class ZooKeeper_GetChildren_API_ASync_Usage implements Watcher {

    private static CountDownLatch connectedSemphore=new CountDownLatch(1);
    private static ZooKeeper zk=null;

    @Override
    public void process(WatchedEvent watchedEvent) {
        if(Event.KeeperState.SyncConnected==watchedEvent.getState()){
            connectedSemphore.countDown();
        }else if(watchedEvent.getType()== Event.EventType.NodeChildrenChanged){
            try {
                System.out.println("node changed===="+zk.getChildren(watchedEvent.getPath(),true));
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }


    }

    public static void main(String[] args) throws Exception{
        String path="/zk-test-1";
        zk=new ZooKeeper("lhc-centos0:2181",5000,new ZooKeeper_GetChildren_API_ASync_Usage());
        connectedSemphore.await();
        zk.create(path+"/test3","".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        zk.getChildren(path,true,new ICChild2Callback(),null);
        Thread.sleep(Integer.MAX_VALUE);
    }


}


/**
 * 异步获取结点回调接口
 * Created by liuhuichao on 2017/7/26.
 */
public class ICChild2Callback implements AsyncCallback.Children2Callback{

    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
        System.out.println("get children zonde result:[reponse code:"+rc+" path="+path+" ctx="+ctx+"  childrenlist="+children+" stat="+stat);
    }
}
  • 使用同步API获取结点数据
/**
 *
 * 同步获取数据
 * Created by liuhuichao on 2017/7/27.
 */
public class GetData_API_Sync_Usage  implements Watcher{

    private static CountDownLatch conntedSamphore=new CountDownLatch(1);
    private static ZooKeeper zk=null;
    private static Stat stat=new Stat();
    @Override
    public void process(WatchedEvent watchedEvent) {
        if(Event.KeeperState.SyncConnected==watchedEvent.getState()){
            conntedSamphore.countDown();
        }else if(watchedEvent.getType()== Event.EventType.NodeCreated){
            System.out.println("node changed:"+watchedEvent.getPath());
        }

    }

    public static void main(String[] args) throws Exception{
        String path="/test-1";
       zk =new ZooKeeper("rc-zkp-datn-rse-nmg-ooz-woasis:2181",5000,new GetData_API_Sync_Usage());
        conntedSamphore.await();
        System.out.println("zk-19 连接成功!");
        //zk.create(path+"/lhc", "".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        List<String> children=zk.getChildren(path,new GetData_API_Sync_Usage());
        System.out.println("children node:"+children);
        zk.setData(path+"/lhc","memeda".getBytes(),-1);
        byte[] nodeValue=zk.getData(path+"/lhc",true,stat);
        System.out.println(new String(nodeValue));


    }
}
  • 使用异步API获取结点数据
/**
 *
 * 同步/异步获取数据
 * Created by liuhuichao on 2017/7/27.
 */
public class GetData_API_Sync_Usage  implements Watcher{

    private static CountDownLatch conntedSamphore=new CountDownLatch(1);
    private static ZooKeeper zk=null;
    private static Stat stat=new Stat();
    @Override
    public void process(WatchedEvent watchedEvent) {
        if(Event.KeeperState.SyncConnected==watchedEvent.getState()){
            conntedSamphore.countDown();
        }else if(watchedEvent.getType()== Event.EventType.NodeCreated){
            System.out.println("node changed:"+watchedEvent.getPath());
        }

    }

    public static void main(String[] args) throws Exception{
        String path="/test-1";
       zk =new ZooKeeper("rc-zkp-datn-rse-nmg-ooz-woasis:2181",5000,new GetData_API_Sync_Usage());
        conntedSamphore.await();
        System.out.println("zk-19 连接成功!");
        //zk.create(path+"/lhc", "".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        List<String> children=zk.getChildren(path,new GetData_API_Sync_Usage());
        System.out.println("children node:"+children);
        zk.setData(path+"/lhc","lllhc".getBytes(),-1);
        zk.getData(path+"/lhc",true,new IDataCallback(),null);//异步获取数据
        Thread.sleep(Integer.MAX_VALUE);
    }
}

/**
 * 异步获取node数据回调
 * Created by liuhuichao on 2017/7/27.
 */
public class IDataCallback implements AsyncCallback.DataCallback {
    @Override
    public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
        System.out.println("rc="+rc+" ;path="+path+" ;ctx="+ctx+" ;data="+data+" ;stat="+stat);
        System.out.println("string data="+new String(data));
        System.out.println("max version="+stat.getVersion());
    }
}

更新数据

  • 同步设置数据
  zk.setData(path+"/lhc","lllhc".getBytes(),-1);//同步设置数据
  • 异步设置数据
/**
 *
 * 同步/异步获取数据
 * Created by liuhuichao on 2017/7/27.
 */
public class GetData_API_Sync_Usage  implements Watcher{

    private static CountDownLatch conntedSamphore=new CountDownLatch(1);
    private static ZooKeeper zk=null;
    private static Stat stat=new Stat();
    @Override
    public void process(WatchedEvent watchedEvent) {
        if(Event.KeeperState.SyncConnected==watchedEvent.getState()){
            conntedSamphore.countDown();
        }else if(watchedEvent.getType()== Event.EventType.NodeCreated){
            System.out.println("node changed:"+watchedEvent.getPath());
        }

    }

    public static void main(String[] args) throws Exception{
        String path="/test-1";
       zk =new ZooKeeper("rc-zkp-datn-rse-nmg-ooz-woasis:2181",5000,new GetData_API_Sync_Usage());
        conntedSamphore.await();
        System.out.println("zk-19 连接成功!");
        //zk.create(path+"/lhc", "".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        List<String> children=zk.getChildren(path,new GetData_API_Sync_Usage());
        System.out.println("children node:"+children);
        //zk.setData(path+"/lhc","lllhc".getBytes(),-1);//同步设置数据
        zk.setData(path+"/lhc","lhc".getBytes(),-1,new IStatCallback(),null);
        zk.getData(path+"/lhc",true,new IDataCallback(),null);//异步获取数据
        Thread.sleep(Integer.MAX_VALUE);
    }
}



/**
 * 异步设置数据回调接口
 * Created by liuhuichao on 2017/7/27.
 */
public class IStatCallback implements AsyncCallback.StatCallback{
    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        System.out.println("rc="+rc+" ;path="+path+" ;ctx="+ctx+" ;stat="+stat);
        if(rc==0){
            System.out.println("数据设置成功!");
        }
    }
}

检测节点是否存在


/**
 * 检测zk node
 * Created by liuhuichao on 2017/7/27.
 */
public class Exist_API_Sync_Usage implements Watcher{
    private static CountDownLatch connetedSamphore=new CountDownLatch(1);
    private static ZooKeeper zk=null;

    @Override
    public void process(WatchedEvent watchedEvent) {
        if(Event.KeeperState.SyncConnected==watchedEvent.getState()){
            connetedSamphore.countDown();
        }else if(Event.EventType.NodeCreated==watchedEvent.getType()){
            System.out.println("node created=="+watchedEvent.getPath());
        }else if(Event.EventType.NodeDataChanged==watchedEvent.getType()){
            System.out.println("node changed=="+watchedEvent.getPath());
        }else if(Event.EventType.NodeDeleted==watchedEvent.getType()){
            System.out.println("node deleted=="+watchedEvent.getPath());
        }
    }

    public static void main(String[] args)throws Exception {
        String path="/test-1";
        zk =new ZooKeeper("rc-zkp-datn-rse-nmg-ooz-woasis:2181",5000,new Exist_API_Sync_Usage());
        connetedSamphore.await();
        System.out.println("zk-19 连接成功!");
        Stat stat=zk.exists(path,new Exist_API_Sync_Usage());
        System.out.println("stat="+stat==null?"为空":"不为空");
        zk.setData(path,"".getBytes(),-1);
        Thread.sleep(Integer.MAX_VALUE);

    }
}
相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
25天前
|
Java API Maven
HDFS的API操作
HDFS的API操作
25 0
|
1月前
|
Java API 数据处理
Java 8新特性之Stream API详解
【2月更文挑战第22天】本文将深入探讨Java 8中引入的Stream API,这是一种基于函数式编程的新特性,用于处理集合数据。我们将详细介绍Stream的基本概念、操作方法以及在实际开发中的应用,帮助读者更好地理解和使用这一强大的工具。
|
1月前
|
Shell
Flume【问题记录 01】【at org.apache.flume.node.Application.main(Application.java:xxx) 类问题整理+其他类型问题总结】【避坑指南】
【2月更文挑战第17天】Flume【问题记录 01】【at org.apache.flume.node.Application.main(Application.java:xxx) 类问题整理+其他类型问题总结】【避坑指南】
47 2
|
1月前
|
Java API
Java 8新特性之Lambda表达式与Stream API
【2月更文挑战第21天】本文将介绍Java 8中的两个重要特性:Lambda表达式和Stream API。Lambda表达式是Java 8中引入的一种新的编程语法,它允许我们将函数作为参数传递给方法,从而使代码更加简洁、易读。Stream API是一种用于处理集合的新API,它提供了一种高效且易于使用的处理数据的方式。本文将通过实例讲解这两个特性的基本用法以及它们如何帮助我们编写更简洁、高效的Java代码。
|
1天前
|
安全 Java API
RESTful API设计与实现:Java后台开发指南
【4月更文挑战第15天】本文介绍了如何使用Java开发RESTful API,重点是Spring Boot框架和Spring MVC。遵循无状态、统一接口、资源标识和JSON数据格式的设计原则,通过创建控制器处理HTTP请求,如示例中的用户管理操作。此外,文章还提及数据绑定、验证、异常处理和跨域支持。最后,提出了版本控制、安全性、文档测试以及限流和缓存的最佳实践,以确保API的稳定、安全和高效。
|
4天前
|
存储 Java 关系型数据库
掌握Java 8 Stream API的艺术:详解流式编程(一)
掌握Java 8 Stream API的艺术:详解流式编程
24 1
|
13天前
|
前端开发 Java API
构建RESTful API:Java中的RESTful服务开发
【4月更文挑战第3天】本文介绍了在Java环境中构建RESTful API的重要性及方法。遵循REST原则,利用HTTP方法处理资源,实现CRUD操作。在Java中,常用框架如Spring MVC简化了RESTful服务开发,包括定义资源、设计表示层、实现CRUD、考虑安全性、文档和测试。通过Spring MVC示例展示了创建RESTful服务的步骤,强调了其在现代Web服务开发中的关键角色,有助于提升互操作性和用户体验。
构建RESTful API:Java中的RESTful服务开发
|
23天前
|
Java 数据库连接 API
Java 学习路线:基础知识、数据类型、条件语句、函数、循环、异常处理、数据结构、面向对象编程、包、文件和 API
Java 是一种广泛使用的、面向对象的编程语言,始于1995年,以其跨平台性、安全性和可靠性著称,应用于从移动设备到数据中心的各种场景。基础概念包括变量(如局部、实例和静态变量)、数据类型(原始和非原始)、条件语句(if、else、switch等)、函数、循环、异常处理、数据结构(如数组、链表)和面向对象编程(类、接口、继承等)。深入学习还包括包、内存管理、集合框架、序列化、网络套接字、泛型、流、JVM、垃圾回收和线程。构建工具如Gradle、Maven和Ant简化了开发流程,Web框架如Spring和Spring Boot支持Web应用开发。ORM工具如JPA、Hibernate处理对象与数
88 3
|
24天前
|
分布式计算 Java 程序员
Java 8新特性之Lambda表达式与Stream API
本文将详细介绍Java 8中的两个重要新特性:Lambda表达式和Stream API。Lambda表达式是Java 8中引入的一种简洁、匿名的函数表示方法,它允许我们将函数作为参数传递给其他方法。而Stream API则是一种新的数据处理方式,它允许我们以声明式的方式处理数据,从而提高代码的可读性和可维护性。通过本文的学习,你将能够掌握Lambda表达式和Stream API的基本用法,以及如何在项目中应用这两个新特性。
28 10
|
24天前
|
Java API 数据处理
Java 8新特性之Lambda表达式与Stream API
本文将介绍Java 8中的两个重要特性:Lambda表达式和Stream API。Lambda表达式是一种新的语法结构,允许我们将函数作为参数传递给方法。而Stream API则是一种处理数据的新方式,它允许我们对数据进行更简洁、更高效的操作。通过学习这两个特性,我们可以编写出更简洁、更易读的Java代码。