zookeeper之Curator使用

先起一个工具类配置好客户端

public class CuratorUtil {

    private final static String zkIp = "192.168.9.100:2181,192.168.9.101:2181,192.168.9.102:2181";

    private static CuratorFramework client; // 客户端

    /* 重连策略:初始时间,最大重试次数 */
    private static RetryPolicy retryPolicy = new ExponentialBackoffRetry(25000, 3);

    public static CuratorFramework getZkClient(){

        /* 创建会话:服务器列表,会话超时时间,链接创建超时时间,重试策略 */
//        client = CuratorFrameworkFactory.newClient(zkIp, 5000 , 5000, retryPolicy);
        /* Fluent风格便于阅读 */
        client = CuratorFrameworkFactory.builder()
                    .connectString(zkIp)
                    .sessionTimeoutMs(25000)
                    .connectionTimeoutMs(25000) // 创建连接超时时间过小的话,可能会导致client没连接上,操作报未连接错.
                    .retryPolicy(retryPolicy)
                    .namespace("base") // 独立隔离命名空间,任何操作都是基于该相对目录
                    .build();

        client.start();

        return client;
    }

}

再来操作代码

public class TestApplication {

	public static void main(String[] args) throws Exception {
	
		CuratorFramework client = CuratorUtil.getZkClient();

		System.out.println("创建客户端链接zookeeper");

		/**
		 * 创建节点
		 */
		/* 创建一个持久节点.默认持久节点,内容为空 */
		client.create().forPath("/test");

		/*持久节点,并写入数据*/
		client.create().forPath("/test1","data".getBytes());

		/* 零时节点. withMode:节点属性. CreateMode.EPHEMERAL:零时*/
		client.create().withMode(CreateMode.EPHEMERAL).forPath("/test2");

		/* 零时节点,并递归创建父节点.注意只有叶子是零时节点,其递归创建的父节点是持久节点. */
		client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/test3/hai/zi");


		/**
		 * 读取数据
		 */
		System.out.println(new String(client.getData().forPath("/test1")));
		Stat stat = new Stat(); // 节点版本信息容器
		new String(client.getData().storingStatIn(stat).forPath("/test1"));
		System.out.println("test1节点的版本号:" + stat.getVersion());


		/**
		 * 更新数据
		 */
		/* 直接更新 */
		int version = client.setData().forPath("/test1", "data修改了".getBytes()).getVersion(); // 更新数据并拿到版本号
		String data1 = new String(client.getData().storingStatIn(stat).forPath("/test1")); // 查看数据是否更新
		System.out.println("当前数据为:" + data1 + ". 节点的版本号:" + stat.getVersion());

		/* 根据版本号更新 */
		client.setData().withVersion(version).forPath("/test1","根据办好号更新".getBytes());
		String data2 = new String(client.getData().storingStatIn(stat).forPath("/test1"));
		System.out.println("当前数据为:" + data2 + ". 节点的版本号:" + stat.getVersion());

		
		/**
		 * 删除节点
		 */
		/* 删除叶子节点 */
		client.delete().forPath("/test");

		/* 删除节点,强制保证删除(只要客户端会话有效,后台会一直尝试删除,直到成功删除) */
		client.delete().guaranteed().forPath("/test1");

		/* 按版本号删除(有时候你想删除的版本 可能已经不是那个版本喽) */
		Stat stat1 = new Stat(); // 节点版本信息容器
		client.getData().storingStatIn(stat1).forPath("/test2"); // 读取节点,将资源版本信息填入stat里
		client.delete().withVersion(stat1.getVersion()).forPath("/test2");

		/* 删除节点,并递归删除子节点 */
		client.delete().deletingChildrenIfNeeded().forPath("/test3");


		/**
		 * 异步接口:inBackground,回调类:BackgroundCallback
		 */
		client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback(){
			@Override
			public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
				System.out.println("code:"+event.getResultCode()+",type:"+event.getType()); // code = 0 代表成功
				/* 异步成功以后可以做的事 */
				if (0 == event.getResultCode()){
					client.delete().deletingChildrenIfNeeded().forPath("/test4"); // 我这里给删除了
				}
			}
		}).forPath("/test4");


		/**
		 * 事件监听
		 */
		/* 自身节点监听 */
		/* 节点监听对象.参数-1:客户端,2:监听节点,3:是否进行数据压缩 */
		NodeCache nodeCache = new NodeCache(client, "/test5", false);
		nodeCache.start(true); // 创建监听对象就立马读取节点数据内容并保存在cache中
		nodeCache.getListenable().addListener(new NodeCacheListener() { // 回调对象
			@Override
			public void nodeChanged() throws Exception {
				System.out.println("节点发生变化");
//				nodeCache.getCurrentData().getData(); // 节点数据这么拿
			}
		});
		client.create().forPath("/test5"); // 创建听节点
		Thread.sleep(1000);
		client.setData().forPath("/test5","变身".getBytes()); // 修改节点数据
		Thread.sleep(1000);
		client.delete().forPath("/test5"); // 删除节点

		/* 子节点监听 */
		/* 只会对一级子节点监听,不会越级对孙子节点监听.也不会对自己本身监听 */
		/* 子节点监听对象.参数-1:客户端,2:监听节点,3:数据变更时是否获取节点数据内容 */
		PathChildrenCache childrenCache = new PathChildrenCache(client, "/test6", true);
		childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); // 啥时候启动缓存:POST INITIALIZED EVENT:初始化后的事件
		childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
			@Override
			public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
				switch (event.getType()){
					case CHILD_ADDED:
						System.out.println("新增事件:"+event.getData().getPath());
						break;
					case CHILD_UPDATED:
						System.out.println("修改事件:"+event.getData().getPath());
						break;
					case CHILD_REMOVED:
						System.out.println("删除事件:"+event.getData().getPath());
						break;
					default:
						break;
				}
			}
		});
		client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/test6/6.1");
		Thread.sleep(1000);
		client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/test6/6.2/6.2.1"); // 这里只会触发创建/test6/6.2节点事件,并不会触发创建/test6/6.2/6.2.1节点事件.
		Thread.sleep(1000);
		client.setData().forPath("/test6","6.1".getBytes()); // 监控的节点本身操作,并不会触发监听子节点事件
		Thread.sleep(1000);
		client.setData().forPath("/test6/6.1","6.1".getBytes());
		Thread.sleep(1000);
		client.delete().deletingChildrenIfNeeded().forPath("/test6"); // 这里会触发删除6.1与6.2两个删除事件.因为是递归删除/test6节点

		/*
		* 大家学习了以上操作 是不是可以利用监听子节点创建成功事件来做个简单的分布式锁呢?
		* 创建成功就是获得了锁,可以一顿操作.操作完了以后记得释放锁(删除锁节点).
		* 没创建成功的先机器也不要灰心,监控这个锁节点的删除事件,当这个节点被删除了,再去抢着创建锁节点获得锁哈.
		*/


		/*
		* 也有实现好的分布式锁功能.
		* 模拟一下
		*/
		InterProcessMutex lock = new InterProcessMutex(client, "/lock"); // 锁对象
		CountDownLatch latch = new CountDownLatch(1); // 多线程的门闩,用来让多个线程同时跑
		for (int i = 0; i < 20; i++) {
			new Thread(new Runnable(){
				@Override
				public void run() {
					try {
						latch.await(); // 门闩拦住线程等待执行
						lock.acquire(); // 上锁
						System.out.println("看看是否会生成同样的时间" + new Date().getTime()); // 这里替换成你的代码逻辑
						
					} catch (Exception e) {
						e.printStackTrace();
					}
					
					try {
						lock.release(); // 解锁
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			}).start();
		}
		latch.countDown(); // 门闩打开,线程们一起执行



		Thread.sleep(Integer.MAX_VALUE); // 阻塞,不然代码跑完了异步和线程方法不一定能执行到
	}

}