Zookeeper:服务器动态上下线监听案例

需求

某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。

需求分析

具体实现

服务端:

package com.tinstu.zk;

import org.apache.zookeeper.*;

import java.io.IOException;

public class DistributeServer {
    private static String connectString = "180.76.106.99:2181,120.48.39.100:2181,152.136.194.161:2181";
    private static int sessionTimeout = 200000;
    private ZooKeeper zk = null;

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        DistributeServer server = new DistributeServer();
        //1.获取zk链接
        server.getConnect();
        //2.注册服务器到zk集群
        server.regist(args[0]);
        //3.启动业务逻辑
        server.business();
    }

    private void business() throws InterruptedException {
        Thread.sleep(Long.MAX_VALUE);
    }

    private void regist(String hostname) throws InterruptedException, KeeperException {
        String s = zk.create("/servers/"+hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    }

    private void getConnect() throws IOException {
        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {

            }
        });
    }
}

 客户端:

package com.tinstu.zk;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class DistributeCli {
    private static String connectString = "180.76.106.99:2181,120.48.39.100:2181,152.136.194.161:2181";
    private static int sessionTimeout = 200000;
    private ZooKeeper zk = null;
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        DistributeCli cli = new DistributeCli();
        //1.获取ck链接
        cli.getConnect();

        //2.监听/servers下面子节点的增加和删除
        cli.getServerList();

        //3.业务逻辑
        cli.business();
    }

    private void business() throws InterruptedException {
        Thread.sleep(Long.MAX_VALUE);
    }

    private void getServerList() throws InterruptedException, KeeperException {
        List<String> children = zk.getChildren("/servers", true);
        ArrayList<Object> servers = new ArrayList<>();

        for (String child : children) {
            // watch:false stat:null
            byte[] data = zk.getData("/servers/" + child, false, null);
            servers.add(new String(data));
        }
        System.out.println(servers);
    }

    private void getConnect() throws IOException {
        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
               //保证时刻监控
                try {
                    getServerList();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (KeeperException e) {
                    e.printStackTrace();
                }

            }
        });
    }
}
客户端手动测试图

客户端与服务端同时启动测试图

阅读剩余
THE END