草庐IT

Zookeeper实战——分布式锁实现以及原理

Kim_smile 2023-04-10 原文

文章目录


分布式锁是控制分布式系统之间同步访问共享资源的一种方式。分布式锁的实现方式有很多种,比如 Redis 、数据库 、zookeeper 等。这篇文章主要介绍用 Zookeeper 实现分布式锁。

Zookeeper 分布式锁实现原理

先说结论:Zookeeper 是基于临时顺序节点以及 Watcher 监听器机制实现分布式锁的

(1)ZooKeeper 的每一个节点都是一个天然的顺序发号器

在每一个节点下面创建临时顺序节点(EPHEMERAL_SEQUENTIAL)类型,新的子节点后面会加上一个次序编号,而这个生成的次序编号是上一个生成的次序编号加一。

例如,有一个用于发号的节点 “/test/lock” 为父节点,可以在这个父节点下面创建相同前缀的临时顺序子节点,假定相同的前缀为 “/test/lock/seq-”。第一个创建的子节点基本上应该为 /test/lock/seq-0000000001,下一个节点则为 /test/lock/seq-0000000002,依次类推。

(2)ZooKeeper 节点的递增有序性可以确保锁的公平

一个 ZooKeeper 分布式锁,首先需要创建一个父节点,尽量是持久节点(PERSISTENT 类型),然后每个要获得锁的线程都在这个节点下创建一个临时顺序节点,该节点是按照创建的次序依次递增的。

为了确保公平,可以简单的规定:编号最小的那个节点表示获得了锁。所以,每个线程在尝试占用锁之前,首先判断自己是序号是不是当前最小,如果是则获取锁。

(3)ZooKeeper 的节点监听机制,可以保障占有锁的传递有序而且高效

每个线程抢占锁之前,先尝试创建自己的 ZNode。同样,释放锁的时候需要删除创建的 Znode。创建成功后,如果不是序号最小的节点,就处于等待通知的状态。每一个等通知的 Znode 节点,需要监视(watch)序号在自己前面的那个 Znode,以获取其删除事件。只要上一个节点被删除了,就进行再一次判断,看看自己是不是序号最小的那个节点,如果是,自己就获得锁。就这样不断地通知后一个 ZNode 节点。

另外,ZooKeeper 的内部优越的机制,能保证由于网络异常或者其他原因,集群中占用锁的客户端失联时锁能够被有效释放。什么机制呢,就是临时顺序节点。一旦占用 Znode 锁的客户端与 ZooKeeper 集群服务器失去联系,这个临时 Znode 也将自动删除。排在它后面的那个节点,也能收到删除事件,从而获得锁。

也正是这个原因,zk 中不需要向 redis 那样考虑锁可能出现的无法释放的问题了,因为当客户端挂了,节点也挂了,锁也释放了。

(四)ZooKeeper 的节点监听机制,能避免羊群效应

ZooKeeper 这种首尾相接、后面监听前面的方式,可以避免羊群效应。所谓羊群效应就是一个节点挂掉,所有节点都去监听,然后做出反应,这样会给服务器带来巨大压力。有了临时顺序节点以及节点监听机制,当一个节点挂掉,只有它后面的那一个节点才做出反应。


具体流程

  1. 一把分布式锁通常使用一个 Znode 节点表示;如果锁对应的 Znode 节点不存在,首先创建 Znode 节点。这里假设为 /test/lock,代表了一把需要创建的分布式锁。
  2. 抢占锁的所有客户端,使用锁的 Znode 节点的子节点列表来表示;如果某个客户端需要占用锁,则在 /test/lock 下创建一个临时顺序的子节点。比如,如果子节点的前缀为 /test/lock/seq-,则第一次抢锁对应的子节点为 /test/lock/seq-000000001,第二次抢锁对应的子节点为 /test/lock/seq-000000002,以此类推。
  3. 当客户端创建子节点后,需要进行判断:自己创建的子节点,是否为当前子节点列表中序号最小的子节点。如果是,则加锁成功;如果不是,则监听前一个 Znode 子节点变更消息,等待前一个节点释放锁。
  4. 一旦队列中的后面的节点,获得前一个子节点变更通知,则开始进行判断,判断自己是否为当前子节点列表中序号最小的子节点,如果是,则认为加锁成功;如果不是,则持续监听,一直到获得锁。
  5. 获取锁后,开始处理业务流程。完成业务流程后,删除自己的对应的子节点,完成释放锁的工作,以方面后继节点能捕获到节点变更通知,获得分布式锁。

代码实现

Curator 是Netflix公司开源的一套 ZooKeeper Java客户端框架,相比于 Zookeeper 自带的客户端 zookeeper 来说,Curator 的封装更加完善,各种 API 都可以比较方便地使用。

这里使用 Curator 作为 Zookeeper 的客户端实现。需要先导入依赖:

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>5.2.1</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.2.1</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-client</artifactId>
    <version>5.2.1</version>
</dependency>

客户端创建工厂类

public class ClientFactory {
    //连接地址
    private static final String connectionString = "127.0.0.1:2181";
    //等待事件的基础单位,单位毫秒
    private static final int BASE_SLEEP_TIME = 1000;
    //最大重试次数
    private static final int MAX_RETRIES = 3;
    private static volatile CuratorFramework zkClient;

    public static CuratorFramework getClient() { //单例
        if (zkClient == null) {
            synchronized (ClientFactory.class) {
                if (zkClient == null) {
                    createSimple();
                }
            }
        }
        return zkClient;
    }

    public static void createSimple() {
        //重试策略: 第一次重试等待1秒,第二次重试等待2秒,第三次重试等待4秒
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME, MAX_RETRIES);
        zkClient = CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
        zkClient.start();
    }

    public static void createWithOptions(int connectionTimeoutMs, int sessionTimeoutMs) {
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME, MAX_RETRIES);
        zkClient = CuratorFrameworkFactory.builder()
                .connectString(connectionString)
                .retryPolicy(retryPolicy)
                .connectionTimeoutMs(connectionTimeoutMs) //连接超时
                .sessionTimeoutMs(sessionTimeoutMs) //会话超时
                .build();
        zkClient.start();
    }
}

创建 Lock 锁接口

public interface Lock {
    //加锁
    boolean lock() throws Exception;
	//释放锁
    boolean unlock() throws Exception;
}

Lock 实现类(ZkLock)

public class ZkLock implements Lock{
    
    private String zkPath;  //分布式锁节点,如"/test/lock"
    private String lockPrefix;  //子节点前缀,如"/test/lock/seq-"
    private long waitTime;  //超时等待
    CuratorFramework zkClient;  //ZK客户端
    private Thread thread;  //当前线程
    private String lockPath;  //当前加锁节点
    private String waitPath;  //前一个等待节点
    final AtomicInteger lockCount = new AtomicInteger(0);  //重入计数器

    public ZkLock(String zkPath) throws Exception {
        this.zkPath = zkPath;
        this.lockPrefix = zkPath + "/seq-";
        this.waitTime = 0L;
        this.zkClient = ClientFactory.getClient();
        try {
            if (zkClient.checkExists().forPath(zkPath) == null) {
                zkClient.create().creatingParentsIfNeeded().forPath(zkPath);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public ZkLock(String zkPath, long waitTime) {
        this.zkPath = zkPath;
        this.lockPrefix = zkPath + "/seq-";
        this.waitTime = waitTime;
        this.zkClient = ClientFactory.getClient();
        try {
            if (zkClient.checkExists().forPath(zkPath) == null) {
                zkClient.create().creatingParentsIfNeeded().forPath(zkPath);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 加锁
     */
    @Override
    public boolean lock() throws Exception {
        //可重入
        synchronized (this) {
            if (lockCount.get() == 0) {
                thread = Thread.currentThread();
                lockCount.incrementAndGet();
            } else {
                if (!thread.equals(Thread.currentThread())) {
                    return false;
                }
                lockCount.incrementAndGet();
                return true;
            }
        }
        return tryLock();
    }

    /**
     * 尝试获取锁
     */
    private boolean tryLock() throws Exception {
        lockPath = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(lockPrefix);
        List<String> childList = zkClient.getChildren().forPath(zkPath);
        if (childList.size() == 1) {
            return true;
        } else {
            Collections.sort(childList);
            String curNode = lockPath.substring(zkPath.length() + 1);
            int index = childList.indexOf(curNode);
            if (index < 0) {
                throw new Exception("加锁异常");
            } else if (index == 0) {
                //第一个节点,加锁成功
                return true;
            } else {
                //监听前一个节点
                waitPath = zkPath + "/" + childList.get(index - 1);
                final CountDownLatch waitLatch = new CountDownLatch(1);
                Watcher w = new Watcher() {
                    @Override
                    public void process(WatchedEvent watchedEvent) {
                        if (watchedEvent.getType() == Event.EventType.NodeDeleted &&
                            watchedEvent.getPath().equals(waitPath)) {
                            System.out.println("监听到节点删除事件:" + watchedEvent);
                            waitLatch.countDown();
                        }
                    }
                };
                zkClient.getData().usingWatcher(w).forPath(waitPath);
                if (waitTime == 0L) {
                    waitLatch.await();
                    return true;
                } else {
                    return waitLatch.await(waitTime, TimeUnit.SECONDS);
                }
            }
        }
    }

    /**
     * 释放锁
     */
    @Override
    public boolean unlock() throws Exception {
        if (!thread.equals(Thread.currentThread())) {
            return false;
        }
        int newLockCount = lockCount.decrementAndGet();
        if (newLockCount < 0) {
            throw new Exception("解锁异常");
        } else if (newLockCount > 0) {
            return true;
        } else {
            try {
                if (zkClient.checkExists().forPath(lockPath) != null) {
                    zkClient.delete().forPath(lockPath);
                }
            } catch (Exception e) {
                e.printStackTrace();
                return false;
            }
            return true;
        }
    }
}

自定义 ZK 分布式锁测试

public class ZkLockTest {

    public static void main(String[] args) throws Exception {
        System.out.println("开始测试ZK分布式锁...");

        new Thread(new Runnable() {
            @Override
            public void run() {
                Lock zkLock = new ZkLock("/test/lock", 3L);
                System.out.println("线程1启动");
                try {
                    boolean lock = zkLock.lock();
                    if (lock) {
                        System.out.println("线程1获取到锁");
                        Thread.sleep(2000);
                        zkLock.unlock();
                        System.out.println("线程1释放锁");
                    } else {
                        System.out.println("线程1获取锁失败");
                    }
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                Lock zkLock = new ZkLock("/test/lock", 3L);
                System.out.println("线程2启动");
                try {
                    boolean lock = zkLock.lock();
                    if (lock) {
                        System.out.println("线程2获取到锁");
                        Thread.sleep(2000);
                        zkLock.unlock();
                        System.out.println("线程2释放锁");
                    } else {
                        System.out.println("线程2获取锁失败");
                    }
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();
    }
}

测试结果:


独占锁 & 共享锁

上面讲的都是基于独占锁的,那么能否实现共享锁呢?答案是可以的。

当操作是读请求,也就是要获取共享锁,如果没有比自己更小的节点,或比自己小的节点都是读请求 ,则可以获取到读锁。若比自己小的节点中有写请求 ,则当前客户端无法获取到读锁,只能等待前面的写请求完成。

如果操作是写请求,也就是要获取独占锁,如果没有比自己更小的节点 ,则表示当前客户端可以直接获取到写锁,对数据进行修改。如果发现有比自己更小的节点,无论是读操作还是写操作,当前客户端都无法获取到写锁,等待前面所有的操作完成。


Curator 实现分布式锁

实际开发过程中,建议使用 Curator 客户端封装的 API 帮助我们实现分布式锁。

Curator 的几种锁方案:

  • InterProcessMutex:分布式可重入排它锁
  • InterProcessSemaphoreMutex:分布式排它锁
  • InterProcessReadWriteLock:分布式读写锁
public class InterProcessMutexTest {

    public static void main(String[] args) {
        CuratorFramework zkClient = ClientFactory.getClient();
        InterProcessMutex zkMutex = new InterProcessMutex(zkClient, "/test/mutex");

        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("线程1启动");
                try {
                    zkMutex.acquire(); //阻塞等待,也可超时等待
                    System.out.println("线程1获取到锁");
                    Thread.sleep(2000);
                    zkMutex.release();
                    System.out.println("线程1释放锁");
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("线程2启动");
                try {
                    zkMutex.acquire();
                    System.out.println("线程2获取到锁");
                    Thread.sleep(2000);
                    zkMutex.release();
                    System.out.println("线程2释放锁");
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();
    }
}

ZooKeeper 分布式锁的优缺点

这里把 Zookeeper 与 Redis 实现分布式锁对比一下:

  • 优点:ZooKeeper分布式锁(如 InterProcessMutex),除了独占锁、可重入锁,还能实现读写锁,并且可靠性比 Redis 更好。
  • 缺点:ZooKeeper实现的分布式锁,性能并不太高。因为每次在创建锁和释放锁的过程中,都要动态创建、销毁瞬时节点来实现锁功能。而 ZK 中创建和删除节点只能通过 Leader 服务器来执行,然后 Leader 服务器还需要将数据同不到所有的 Follower 机器上,同步之后才返回,这样频繁的网络通信,性能的短板是非常突出的;而 Redis 则是异步复制。

Redis 是 AP 架构,而 ZooKeeper 是 CP 架构。在高性能,高并发的场景下,不建议使用ZooKeeper的分布式锁,可以使用 Redis 分布式锁。而由于ZooKeeper的可靠性,所以在并发量不是太高的场景,推荐使用ZooKeeper的分布式锁。

使用 zk 临时节点会存在另一个问题:由于 zk 依靠 session 定期的心跳来维持客户端,如果客户端进入长时间的 GC,可能会导致 zk 认为客户端宕机而释放锁,让其他的客户端获取锁,但是客户端在 GC 恢复后,会认为自己还持有锁,从而可能出现多个客户端同时获取到锁的情形。

针对这种情况,可以通过 JVM 调优,尽量避免长时间 GC 的情况发生。



参考资料:《Java 高并发核心编程——卷1》

有关Zookeeper实战——分布式锁实现以及原理的更多相关文章

  1. ruby - 什么是填充的 Base64 编码字符串以及如何在 ruby​​ 中生成它们? - 2

    我正在使用的第三方API的文档状态:"[O]urAPIonlyacceptspaddedBase64encodedstrings."什么是“填充的Base64编码字符串”以及如何在Ruby中生成它们。下面的代码是我第一次尝试创建转换为Base64的JSON格式数据。xa=Base64.encode64(a.to_json) 最佳答案 他们说的padding其实就是Base64本身的一部分。它是末尾的“=”和“==”。Base64将3个字节的数据包编码为4个编码字符。所以如果你的输入数据有长度n和n%3=1=>"=="末尾用于填充n%

  2. ruby - 如何根据特征实现 FactoryGirl 的条件行为 - 2

    我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden

  3. ruby - 分布式事务和队列,ruby,erlang,scala - 2

    我有一个涉及多台机器、消息队列和事务的问题。因此,例如用户点击网页,点击将消息发送到另一台机器,该机器将付款添加到用户的帐户。每秒可能有数千次点击。事务的所有方面都应该是容错的。我以前从未遇到过这样的事情,但一些阅读表明这是一个众所周知的问题。所以我的问题。我假设安全的方法是使用两阶段提交,但协议(protocol)是阻塞的,所以我不会获得所需的性能,我是否正确?我通常写Ruby,但似乎Redis之类的数据库和Rescue、RabbitMQ等消息队列系统对我的帮助不大——即使我实现某种两阶段提交,如果Redis崩溃,数据也会丢失,因为它本质上只是内存。所有这些让我开始关注erlang和

  4. 华为OD机试用Python实现 -【明明的随机数】 2023Q1A - 2

    华为OD机试题本篇题目:明明的随机数题目输入描述输出描述:示例1输入输出说明代码编写思路最近更新的博客华为od2023|什么是华为od,od薪资待遇,od机试题清单华为OD机试真题大全,用Python解华为机试题|机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为o

  5. 【鸿蒙应用开发系列】- 获取系统设备信息以及版本API兼容调用方式 - 2

    在应用开发中,有时候我们需要获取系统的设备信息,用于数据上报和行为分析。那在鸿蒙系统中,我们应该怎么去获取设备的系统信息呢,比如说获取手机的系统版本号、手机的制造商、手机型号等数据。1、获取方式这里分为两种情况,一种是设备信息的获取,一种是系统信息的获取。1.1、获取设备信息获取设备信息,鸿蒙的SDK包为我们提供了DeviceInfo类,通过该类的一些静态方法,可以获取设备信息,DeviceInfo类的包路径为:ohos.system.DeviceInfo.具体的方法如下:ModifierandTypeMethodDescriptionstatic StringgetAbiList​()Obt

  6. 基于C#实现简易绘图工具【100010177】 - 2

    C#实现简易绘图工具一.引言实验目的:通过制作窗体应用程序(C#画图软件),熟悉基本的窗体设计过程以及控件设计,事件处理等,熟悉使用C#的winform窗体进行绘图的基本步骤,对于面向对象编程有更加深刻的体会.Tutorial任务设计一个具有基本功能的画图软件**·包括简单的新建文件,保存,重新绘图等功能**·实现一些基本图形的绘制,包括铅笔和基本形状等,学习橡皮工具的创建**·设计一个合理舒适的UI界面**注明:你可能需要先了解一些关于winform窗体应用程序绘图的基本知识,以及关于GDI+类和结构的知识二.实验环境Windows系统下的visualstudio2017C#窗体应用程序三.

  7. MIMO-OFDM无线通信技术及MATLAB实现(1)无线信道:传播和衰落 - 2

     MIMO技术的优缺点优点通过下面三个增益来总体概括:阵列增益。阵列增益是指由于接收机通过对接收信号的相干合并而活得的平均SNR的提高。在发射机不知道信道信息的情况下,MIMO系统可以获得的阵列增益与接收天线数成正比复用增益。在采用空间复用方案的MIMO系统中,可以获得复用增益,即信道容量成倍增加。信道容量的增加与min(Nt,Nr)成正比分集增益。在采用空间分集方案的MIMO系统中,可以获得分集增益,即可靠性性能的改善。分集增益用独立衰落支路数来描述,即分集指数。在使用了空时编码的MIMO系统中,由于接收天线或发射天线之间的间距较远,可认为它们各自的大尺度衰落是相互独立的,因此分布式MIMO

  8. 微信小程序开发入门与实战(Behaviors使用) - 2

    @作者:SYFStrive @博客首页:HomePage📜:微信小程序📌:个人社区(欢迎大佬们加入)👉:社区链接🔗📌:觉得文章不错可以点点关注👉:专栏连接🔗💃:感谢支持,学累了可以先看小段由小胖给大家带来的街舞👉微信小程序(🔥)目录自定义组件-behaviors    1、什么是behaviors    2、behaviors的工作方式    3、创建behavior    4、导入并使用behavior    5、behavior中所有可用的节点    6、同名字段的覆盖和组合规则总结最后自定义组件-behaviors    1、什么是behaviorsbehaviors是小程序中,用于实现

  9. 阿里云国际版免费试用:如何注册以及注意事项 - 2

    作为新的阿里云用户,您可以50免费试用多种优惠,价值高达1,700美元(或8,500美元)。这将让您了解和体验阿里云平台上提供的一系列产品和服务。如果您以个人身份注册免费试用,您将获得价值1,700美元的优惠。但是,如果您是注册公司,您可以选择企业免费试用,提交基本信息通过企业实名注册验证,即可开始价值$8,500的免费试用!本教程介绍了如何设置您的帐户并使用您的免费试用版。​关于免费试用在我们开始此试用之前,您还必须遵守以下条款和条件才能访问您的免费试用:只有在一年内创建的账户才有资格获得阿里云免费试用。通过此免费试用优惠,用户可以免费试用免费试用活动页面上列出的每种产品一次。如果您有多个帐

  10. 【Java入门】使用Java实现文件夹的遍历 - 2

    遍历文件夹我们通常是使用递归进行操作,这种方式比较简单,也比较容易理解。本文为大家介绍另一种不使用递归的方式,由于没有使用递归,只用到了循环和集合,所以效率更高一些!一、使用递归遍历文件夹整体思路1、使用File封装初始目录,2、打印这个目录3、获取这个目录下所有的子文件和子目录的数组。4、遍历这个数组,取出每个File对象4-1、如果File是否是一个文件,打印4-2、否则就是一个目录,递归调用代码实现publicclassSearchFile{publicstaticvoidmain(String[]args){//初始目录Filedir=newFile("d:/Dev");Datebeg

随机推荐