先起一个工具类配置好客户端
public class CuratorUtil {
private final static String zkIp = "192.168.9.100:2181,192.168.9.101:2181,192.168.9.102:2181";
private static CuratorFramework client;
private static RetryPolicy retryPolicy = new ExponentialBackoffRetry(25000, 3);
public static CuratorFramework getZkClient(){
client = CuratorFrameworkFactory.builder()
.connectString(zkIp)
.sessionTimeoutMs(25000)
.connectionTimeoutMs(25000)
.retryPolicy(retryPolicy)
.namespace("base")
.build();
client.start();
return client;
}
}
再来操作代码
public class TestApplication {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorUtil.getZkClient();
System.out.println("创建客户端链接zookeeper");
client.create().forPath("/test");
client.create().forPath("/test1","data".getBytes());
client.create().withMode(CreateMode.EPHEMERAL).forPath("/test2");
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/test3/hai/zi");
System.out.println(new String(client.getData().forPath("/test1")));
Stat stat = new Stat();
new String(client.getData().storingStatIn(stat).forPath("/test1"));
System.out.println("test1节点的版本号:" + stat.getVersion());
int version = client.setData().forPath("/test1", "data修改了".getBytes()).getVersion();
String data1 = new String(client.getData().storingStatIn(stat).forPath("/test1"));
System.out.println("当前数据为:" + data1 + ". 节点的版本号:" + stat.getVersion());
client.setData().withVersion(version).forPath("/test1","根据办好号更新".getBytes());
String data2 = new String(client.getData().storingStatIn(stat).forPath("/test1"));
System.out.println("当前数据为:" + data2 + ". 节点的版本号:" + stat.getVersion());
client.delete().forPath("/test");
client.delete().guaranteed().forPath("/test1");
Stat stat1 = new Stat();
client.getData().storingStatIn(stat1).forPath("/test2");
client.delete().withVersion(stat1.getVersion()).forPath("/test2");
client.delete().deletingChildrenIfNeeded().forPath("/test3");
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback(){
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("code:"+event.getResultCode()+",type:"+event.getType());
if (0 == event.getResultCode()){
client.delete().deletingChildrenIfNeeded().forPath("/test4");
}
}
}).forPath("/test4");
NodeCache nodeCache = new NodeCache(client, "/test5", false);
nodeCache.start(true);
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("节点发生变化");
}
});
client.create().forPath("/test5");
Thread.sleep(1000);
client.setData().forPath("/test5","变身".getBytes());
Thread.sleep(1000);
client.delete().forPath("/test5");
PathChildrenCache childrenCache = new PathChildrenCache(client, "/test6", true);
childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
switch (event.getType()){
case CHILD_ADDED:
System.out.println("新增事件:"+event.getData().getPath());
break;
case CHILD_UPDATED:
System.out.println("修改事件:"+event.getData().getPath());
break;
case CHILD_REMOVED:
System.out.println("删除事件:"+event.getData().getPath());
break;
default:
break;
}
}
});
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/test6/6.1");
Thread.sleep(1000);
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/test6/6.2/6.2.1");
Thread.sleep(1000);
client.setData().forPath("/test6","6.1".getBytes());
Thread.sleep(1000);
client.setData().forPath("/test6/6.1","6.1".getBytes());
Thread.sleep(1000);
client.delete().deletingChildrenIfNeeded().forPath("/test6");
InterProcessMutex lock = new InterProcessMutex(client, "/lock");
CountDownLatch latch = new CountDownLatch(1);
for (int i = 0; i < 20; i++) {
new Thread(new Runnable(){
@Override
public void run() {
try {
latch.await();
lock.acquire();
System.out.println("看看是否会生成同样的时间" + new Date().getTime());
} catch (Exception e) {
e.printStackTrace();
}
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
latch.countDown();
Thread.sleep(Integer.MAX_VALUE);
}
}