ZooKeeper 实现负载均衡

1. 前言

在分布式的环境中,我们常常使用集群部署的方式来提高某个服务的可用性,为了让高并发的请求能够平均的分配到集群中的每一个服务,避免有些服务压力过大,而有些服务处于空闲状态这样的情况,我们需要制定一些规则来把请求进行路由,这种分配请求的做法就叫做负载均衡,路由请求的规则就是负载均衡的策略。

那么负载均衡的策略有哪些呢?如何使用 Zookeeper 实现负载均衡呢?接下来我们就带着这些问题开始本节的内容。

2. 负载均衡的策略

当我们使用集群的方式部署的服务在不同的机器上时,根据机器的性能以及网络环境,我们可能需要使用负载均衡策略来分配请求到不同的机器,这里我们就开始讲解负载均衡的策略。

  • Round Robin 轮询策略

    轮询策略,按照集群的服务列表的顺序,依次进行请求的分配,直到列表中所有的服务都分配了一次请求,就完成了一轮的请求分配,然后再从第一个服务开始分配请求。

    轮询策略是很多负载均衡技术的默认策略,这样的方式保证了的每个服务所承受的请求压力是平均的,我们可以把服务列表按照顺序放到一个数组来循环分配请求。

    /**
     * 轮询策略 Demo
     */
    public class RoundRobinStrategy {
        public static void main(String[] args) {
            // 模拟 Server 地址列表
            String[] serverList = {"192.168.0.77","192.168.0.88","192.168.0.99"};
            // 模拟 5 次请求
            for (int i = 0; i < 5; i++) {
                // 根据数组长度取模,顺序获取地址索引
                int i1 = i % serverList.length;
                // 根据索引获取服务器地址
                System.out.println(serverList[i1]);
            }
        }
    }
    

    执行 main 方法,查看控制台输出:

    192.168.0.77
    192.168.0.88
    192.168.0.99
    192.168.0.77
    192.168.0.88
    

    我们可以观察到控制台输出的服务地址是顺序的。

  • Random 随机策略

    随机策略,顾名思义就是根据随机算法把请求随机的分配给服务列表中的任意一个服务。

    随机策略的实现方式:我们可以把服务列表放到一个数组,然后根据数组的长度来获取随机数,取到的随机数就是服务在数组中的索引,根据这个索引,我们就可以拿到服务地址来发送请求了。

    /**
     * 随机策略 Demo
     */
    public class RandomStrategy {
        public static void main(String[] args) {
            // 服务地址数组
            String[] serverList = {"192.168.0.77","192.168.0.88","192.168.0.99"};
            // 模拟发送 5 次请求
            for (int j = 0; j < 5; j++) {
                // 随机获取数组的索引
                int i = new Random().nextInt(serverList.length);
                // 根据索引获取服务器地址
                System.out.println(serverList[i]);
            }
        }
    }
    
    

    执行 main 方法,查看控制台输出:

    192.168.0.88
    192.168.0.88
    192.168.0.99
    192.168.0.77
    192.168.0.77
    

    我们可以观察到控制台输出的服务地址是随机的,还有可能会出现多次请求连续随机到同一个服务的情况。

  • Consistent Hashing 一致性哈希策略

    一致性哈希策略的实现方式:我们先把服务列表中的地址进行哈希计算,把计算后的值放到哈希环上,接收到请求后,根据请求的固定属性值来进行哈希计算,然后根据请求的哈希值在哈希环上顺时针寻找服务地址的哈希值,寻找到哪个服务地址的哈希值,就把请求分配给哪个服务。

一致性哈希

Tips: 哈希环的范围,从 0 开始,到 2 的32 次方减 1 结束,也就是到 Integer 的最大取值范围。

在示例的图中,哈希环上有 3 个 Server 的 Hash 值,每个请求的 Hash 值都顺时针去寻找 Server 的 Hash 值,找到哪个就将请求分配给哪个服务。接下来我们用 Java 实现一致性哈希策略,使用 IP 地址进行 Hash 计算:

/**
 * 一致性哈希策略 Demo
 */
public class ConsistentHashingStrategy {
    public static void main(String[] args) {
        // 模拟 Server 地址列表
        String[] serverList = {"192.168.0.15", "192.168.0.30", "192.168.0.45"};
        // 新建 TreeMap 集合 ,以 Key,Value 的方式绑定 Hash 值与地址
        SortedMap<Integer, String> serverHashMap = new TreeMap<>();
        // 计算 Server 地址的 Hash 值
        for (String address : serverList) {
            int serverHash = Math.abs(address.hashCode());
            // 绑定 Hash 值与地址
            serverHashMap.put(serverHash, address);
        }
        // 模拟 Request 地址
        String[] requestList = {"192.168.0.10", "192.168.0.20", "192.168.0.40", "192.168.0.50"};
        // 计算 Request 地址的 Hash 值
        for (String request : requestList) {
            int requestHash = Math.abs(request.hashCode());
            // 在 serverHashMap 中寻找所有大于 requestHash 的 key
            SortedMap<Integer, String> tailMap = serverHashMap.tailMap(requestHash);
            //如果有大于 requestHash 的 key, 第一个 key 就是离 requestHash 最近的 serverHash
            if (!tailMap.isEmpty()) {
                Integer key = tailMap.firstKey();
                // 根据 key 获取 Server address
                String address = serverHashMap.get(key);
                System.out.println("请求 " + request + " 被分配给服务 " + address);
            } else {
                // 如果 serverHashMap 中没有比 requestHash 大的 key
                // 则直接在 serverHashMap 取第一个服务
                Integer key = serverHashMap.firstKey();
                // 根据 key 获取 Server address
                String address = serverHashMap.get(key);
                System.out.println("请求 " + request + " 被分配给服务 " + address);
            }
        }
    }
}

执行 main 方法,查看控制台输出:

请求 192.168.0.10 被分配给服务 192.168.0.15
请求 192.168.0.20 被分配给服务 192.168.0.30
请求 192.168.0.40 被分配给服务 192.168.0.45
请求 192.168.0.50 被分配给服务 192.168.0.15
  • 加权轮询策略

    加权轮询策略就是在轮询策略的基础上,对 Server 地址进行加权处理,除了按照服务地址列表的顺序来分配请求外,还要按照权重大小来决定请求的分配次数。加权的目的是为了让性能和网络较好的服务多承担请求分配的压力。

    比如 Server_1 的权重是 3,Server_2 的权重是 2,Server_3 的权重是 1,那么在进行请求分配时,Server_1 会被分配 3 次请求,Server_2 会被分配 2 次请求,Server_3 会被分配 1 次请求,就这样完成一轮请求的分配,然后再从 Server_1 开始进行分配。

  • 加权随机策略

    加权随机策略就是在随机策略的基础上,对 Server 地址进行加权处理,Server 地址的加权有多少,那么 Server 地址的数组中的地址就会有几个,然后再从这个数组中进行随机选址。

  • Least Connection 最小连接数策略

    最小连接数策略,就是根据客户端与服务端会话数量来决定请求的分配情况,它会把请求分配到会话数量小的服务,会话的数量越少,也能说明服务的性能和网络较好。

学习完负载均衡的策略,接下来我们使用 Zookeeper 实现负载均衡。

3. Zookeeper 实现负载均衡

Zookeeper 实现负载均衡,我们可以使用 Zookeeper 的临时节点来维护 Server 的地址列表,然后选择负载均衡策略来对请求进行分配。

我们回顾一下临时节点的特性:当创建该节点的 Zookeeper 客户端与 Zookeeper 服务端断开连接时,该节点会被 Zookeeper 服务端移除。使用临时节点来维护 Server 的地址列表就保证了请求不会被分配到已经停机的服务上。

在上面的讲解中,轮询策略,随机策略和一致性哈希策略都使用 Java 简单的实现了 Demo,那么接下来我们就使用最小连接数策略来实现请求的分配。

3.1 临时节点和最小连接数策略实现负载均衡

首先我们需要在集群的每一个 Server 中都使用 Zookeeper 客户端 Curator 来连接 Zookeeper 服务端,当 Server 启动时,使用 Curator 连接 Zookeeper 服务端,并用自身的地址信息创建临时节点到 Zookeeper 服务端。

我们还可以提供手动下线 Server 的方法,需要 Server 下线时可以手动调用删除节点的方法,需要 Server 上线时再次使用自身的地址信息来创建临时节点。

除了维护 Server 的地址信息外,我们还需要维护请求的会话连接数,我们可以使用节点的 data 来保存请求会话的连接数。

我们使用在 Zookeeper Curator 一节创建的 Spring Boot 测试项目来实现:

/**
 * 最小连接数策略 Demo
 * Server 服务端注册地址
 */
@Component
public class MinimumConnectionsStrategyServer implements ApplicationRunner {

    @Autowired
    private CuratorService curatorService;

    // Curator 客户端
    public CuratorFramework client;
    // 当前服务地址的临时节点
    public static String SERVER_IP;
    // 当前服务地址临时节点的父节点,节点类型为持久节点
    public static final String IMOOC_SERVER = "/imooc-server";

    /**
     * 服务启动后自动执行
     *
     * @param args args
     * @throws Exception Exception
     */
    @Override
    public void run(ApplicationArguments args) throws Exception {
        // Curator 客户端开启会话
        client = curatorService.getCuratorClient();
        client.start();
        // 注册地址信息到 Zookeeper
        registerAddressToZookeeper();
    }

    /**
     * 注册地址信息到 Zookeeper
     * 服务启动时和服务手动上线时调用此方法
     *
     * @throws Exception Exception
     */
    public void registerAddressToZookeeper() throws Exception {
        // 判断父节点是否存在,不存在则创建持久节点
        Stat stat = client.checkExists().forPath(IMOOC_SERVER);
        if (stat == null) {
            client.create().creatingParentsIfNeeded().forPath(IMOOC_SERVER);
        }
        // 获取本机地址
        String address = InetAddress.getLocalHost().getHostAddress();
        // 创建临时节点,节点路径为 /IMOOC_SERVER/address,节点 data 为 请求会话数,初始化时为 0.
        // /imooc-server/192.168.0.77
        SERVER_IP = client.create()
                .withMode(CreateMode.EPHEMERAL)
                .forPath(IMOOC_SERVER + "/" + address, "0".getBytes());
    }

    /**
     * 注销在 Zookeeper 上的注册的地址
     * 服务手动下线时调用此方法
     *
     * @throws Exception Exception
     */
    public void deregistrationAddress() throws Exception {
        // 检查该节点是否存在
        Stat stat = client.checkExists().forPath(SERVER_IP);
        // 存在则删除
        if (stat != null) {
            client.delete().forPath(SERVER_IP);
        }
    }
}

在客户端的请求调用集群服务之前,先使用 Curator 获取 IMOOC_SERVER 下所有的临时节点,并寻找出 data 最小的临时节点,也就是最小连接数的服务。

在客户端发送请求时,我们可以让当前 Server 的请求会话数加 1,并更新到临时节点的 data,完成请求时,我们可以让当前 Server 的请求会话数减 1,并更新到临时节点的 data 。

/**
 * 最小连接数策略 Demo
 * Client 客户端发送请求
 */
@Component
public class MinimumConnectionsStrategyClient implements ApplicationRunner {

    @Autowired
    private CuratorService curatorService;

    // Curator 客户端
    public CuratorFramework client;
    // 服务列表节点的 父节点
    public static final String IMOOC_SERVER = "/imooc-server";

    @Override
    public void run(ApplicationArguments args) throws Exception {
        // Curator 客户端开启会话
        client = curatorService.getCuratorClient();
        client.start();
    }

    /**
     * 获取最小连接数的服务
     * 发送请求前调用此方法,获取服务地址
     *
     * @return String
     * @throws Exception Exception
     */
    public String getTheMinimumNumberOfConnectionsService() throws Exception {
        // 获取所有子节点
        List<String> list = client.getChildren().forPath(IMOOC_SERVER);
        // 新建 Map
        Map<String, Integer> map = new HashMap<>();
        // 遍历服务列表,保存服务地址与请求会话数的映射关系
        for (String s : list) {
            byte[] bytes = client.getData().forPath(IMOOC_SERVER + "/" + s);
            int i = Integer.parseInt(new String(bytes));
            map.put(s, i);
        }
        // 寻找 map 中会话数最小的值
        Optional<Map.Entry<String, Integer>> min = map.entrySet().stream().min(Map.Entry.comparingByValue());
        // 不为空的话
        if (min.isPresent()) {
            // 返回 服务地址 ip
            Map.Entry<String, Integer> entry = min.get();
            return entry.getKey();
        } else {
            // 没有则返回服务列表第一个服务地址 ip
            return list.get(0);
        }
    }

    /**
     * 增加该服务的请求会话数量
     * 使用服务地址处理业务前调用此方法
     *
     * @param ip 服务地址
     * @throws Exception Exception
     */
    public void increaseTheNumberOfRequestedSessions(String ip) throws Exception {
        byte[] bytes = client.getData().forPath(IMOOC_SERVER + "/" + ip);
        int i = Integer.parseInt(new String(bytes));
        i++;
        client.setData().forPath(IMOOC_SERVER + "/" + ip, String.valueOf(i).getBytes());
    }

    /**
     * 减少该服务的请求会话数量
     * 请求结束时调用此方法减少会话数量
     *
     * @param ip 服务地址
     * @throws Exception Exception
     */
    public void reduceTheNumberOfRequestedSessions(String ip) throws Exception {
        byte[] bytes = client.getData().forPath(IMOOC_SERVER + "/" + ip);
        int i = Integer.parseInt(new String(bytes));
        i--;
        client.setData().forPath(IMOOC_SERVER + "/" + ip, String.valueOf(i).getBytes());
    }
}

这样我们就使用 Zookeeper 的临时节点完成了一个简单的最小连接数策略的负载均衡。

4. 总结

在本节的内容中,我们学习了为什么要使用负载均衡,负载均衡的策略,以及使用 Zookeeper 的临时节点来实现负载均衡。以下是本节内容的总结:

  1. 分布式环境下为什么要使用负载均衡。
  2. 负载均衡的策略有哪些。
  3. 使用 Zookeeper 的临时节点实现负载均衡。