草庐IT

ZooKeeper应用---分布式锁

Deepdoo 2023-04-03 原文

实现要点

redis分布式锁的问题

redis通常也用来实现分布式锁,但是有一些问题

  • 进程需要主动请求redis判断锁是否被释放,会造成服务端的压力以及客户端循环的开销
  • 获得锁的进程需要设置过期时间来容错,有产生延迟的风险
  • redis主从切换有可能导致锁失效

zookeeper的优势

使用zookeeper实现分布式锁的优势

  • 临时节点,如果客户端失活节点被删除,可以通过设置session过期时间来控制删除时间
  • 有序节点,严格的单调递增顺序,可以控制并发时的顺序,类似排队
  • watch机制,客户端可以监听某个节点的任何事件
  • zookeeper集群是几乎高可用的,快速的选主,官方号称200ms内
  • zookeeper能够做到对外的统一视图,可线性化,leader节点失效会停止服务

实现思路

客户端创建临时有序节点 /testLock/lock ,然后拿到父目录 /testLock 下的所有节点并排序,判断自己是否是第一个节点,如果是获取锁成功,如果不是,就监听它的前一个节点的删除事件,当监听节点被删除时,获取锁成功。

代码实现

zookeeper集群的搭建和项目搭建细节请参考上一篇文章

ZooKeeper应用---分布式配置更新_Deepdoo的博客-CSDN博客zookeeper API的使用,基于回调和监听的响应式编程https://blog.csdn.net/weixin_44489428/article/details/123637468

ZkUtils文件 

package org.example.lock;

import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;


public class ZkUtils {
    public static String root = "/testLock";
    public static String path = "/lock";
    private static ZooKeeper zk;
    private static final String address = "zk01:2181,zk02:2181,zk03:2181,zk04:2181" + root;
    private static final CountDownLatch latch = new CountDownLatch(1);

    public static ZooKeeper getZk() {
        try {
            zk = new ZooKeeper(address, 3000, new DefaultWatcher(latch));
            latch.await();
        } catch (Exception e) {
            e.printStackTrace();
        }

        return zk;
    }
}

ZkUtils 作为一个工具类,主要是通过阻塞的方式获取 zookeeper 连接的实例。

address是zookeeper集群的连接地址,指定了本实验的根目录 /testLock,锁节点名称前缀 /lock。

DefaultWatcher是默认的监听类,当连接成功时把 CountDownLatch 减1,返回zk对象。

DefaultWatcher类

package org.example.lock;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;

import java.util.concurrent.CountDownLatch;


public class DefaultWatcher implements Watcher {
    private CountDownLatch latch;
    public DefaultWatcher(CountDownLatch latch) {
        this.latch = latch;
    }
    @Override
    public void process(WatchedEvent event) {
        switch (event.getState()) {
            case Unknown:
                break;
            case Disconnected:
                break;
            case NoSyncConnected:
                break;
            case SyncConnected:
                System.out.println("sync connected!!!");
                latch.countDown();
                break;
            case AuthFailed:
                break;
            case ConnectedReadOnly:
                break;
            case SaslAuthenticated:
                break;
            case Expired:
                break;
            case Closed:
                break;
        }
    }
}

连接成功的时候打印 “sync connnected!!!"

TestLock类

package org.example.lock;

import org.apache.zookeeper.ZooKeeper;

/**
 * @date 2022/3/22 11:00
 */
public class TestLock {
    private static ZooKeeper zk;

    public static void main(String[] args) {
        zk = ZkUtils.getZk();
        System.out.println(zk.toString());

        // 用子线程模拟分布式中的多个节点
        int childrenSize = 10;
        Child[] children = new Child[childrenSize];
        for (int i = 0; i < childrenSize; i++) {
            children[i] = new Child();
            children[i].start();
        }

        for (int i = 0; i < childrenSize; i++) {
            try {
                children[i].join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        System.out.println("I am father!");
    }

    static class Child extends Thread {
        @Override
        public void run() {
            String name = Thread.currentThread().getName();
            WatcherCallback watcherCallback = new WatcherCallback(zk, name);
            // 加锁
            watcherCallback.tryLock();
            // 业务逻辑
            System.out.println(name + " is working...");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 解锁
            watcherCallback.unLock();
        }
    }
}

main方法是测试入口,首先通过 ZkUtils 拿到 zookeeper 连接,这里会阻塞一下,连接成功后会打印zk的连接信息。

创建了10个线程来模拟10个分布式环境中的节点,主线程等待所有子线程结束后打印自己结束前的信息。

子线程通过 WatcherCallback 拿到锁,执行业务逻辑,然后再释放锁。

WatcherCallback类

package org.example.lock;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class WatcherCallback implements Watcher, AsyncCallback.StringCallback, AsyncCallback.Children2Callback, AsyncCallback.StatCallback {
    private ZooKeeper zk;
    private String threadName;
    private String nodeName;
    // 加锁时必须要等待锁释放
    private CountDownLatch latch = new CountDownLatch(1);

    public WatcherCallback(ZooKeeper zk, String threadName) {
        this.zk = zk;
        this.threadName = threadName;
    }

    @Override
    public void process(WatchedEvent event) {
        switch (event.getType()) {
            case None:
                break;
            case NodeCreated:
                break;
            case NodeDeleted:
                // 前一个节点被删除了 当前节点直接获得锁
                latch.countDown();
                System.out.println(threadName + " found prev node deleted: " + event.getPath());
                break;
            case NodeDataChanged:
                break;
            case NodeChildrenChanged:
                break;
            case DataWatchRemoved:
                break;
            case ChildWatchRemoved:
                break;
            case PersistentWatchRemoved:
                break;
        }
    }

    public void tryLock() {
        // 创建临时有序节点
        zk.create(
                ZkUtils.path,
                threadName.getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL_SEQUENTIAL,
                this, "ABC");
        // 等待 当前节点成功拿到锁 或者监听节点被删除
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void unLock() {
        // 解锁 - 直接删除当前节点
        try {
            zk.delete("/" + nodeName, -1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    /**
     * create 异步方法的回调
     * @param rc
     * @param path
     * @param ctx
     * @param name
     */
    @Override
    public void processResult(int rc, String path, Object ctx, String name) {
        // create 回调了 如果创建成功 去拿父节点下的所有有序节点信息
        if(name != null) {
            // 去掉 /
            nodeName = name.substring(1);
            System.out.println(threadName + " nodeName: " + nodeName);
            // 这里一定要注意不要对父节点设置 watcher 因为监听父节点每个节点的删除都会通知其他节点 增加不必要的通信负担
            // 只需要监听当前节点前面那个节点
            zk.getChildren("/", false, this, "ABC");
        }
    }

    /**
     * getChildren()异步方法回调
     * @param rc
     * @param path
     * @param ctx
     * @param children
     * @param stat
     */
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
        // 拿到了父节点下所有的节点信息 先排个序
        if(stat != null) {
            children.sort(String::compareTo);
            // 判断当前节点是否是第一个
            if(nodeName.compareTo(children.get(0)) == 0) {
                // 当前节点拿到锁成功
                latch.countDown();
            } else {
                // 需要对前一个节点设置监听
                String prevNodeName = "/" + children.get(children.indexOf(nodeName) - 1);
                // 通过exists设置监听并且处理回调
                zk.exists(prevNodeName, this, this, "ABC");
            }
        }
    }

    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        if(stat == null) {
            System.out.println(threadName + " found prev node missing: " + path);
        }
    }
}

先看 tryLock 方法:

    public void tryLock() {
        // 创建临时有序节点
        zk.create(
                ZkUtils.path,
                threadName.getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL_SEQUENTIAL,
                this, "ABC");
        // 等待 当前节点成功拿到锁 或者监听节点被删除
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

当要加锁时,请求 zookeeper 创建临时有序节点并且设置了回调对象为 this,然后阻塞住当前线程等待获取锁成功。create() 方法使用了异步方式,当方法请求结束时会调用回调对象的 processResult()方法,如下:

    /**
     * create 异步方法的回调
     * @param rc
     * @param path
     * @param ctx
     * @param name
     */
    @Override
    public void processResult(int rc, String path, Object ctx, String name) {
        // create 回调了 如果创建成功 去拿父节点下的所有有序节点信息
        if(name != null) {
            // 去掉 /
            nodeName = name.substring(1);
            System.out.println(threadName + " nodeName: " + nodeName);
            // 这里一定要注意不要对父节点设置 watcher 因为监听父节点每个节点的删除都会通知其他节点 增加不必要的通信负担
            // 只需要监听当前节点前面那个节点
            zk.getChildren("/", false, this, "ABC");
        }
    }

创建节点成功后 name 不为空,然后通过 getChildren() 去拿父节点下面的所有节点,同样也是异步方式,这里要注意第二个参数要设置false,否则就对父节点设置监听了,不但没用而且会增加通信负担,第三个参数是回调对象,方法执行完成后会调用下面这个方法:

   /**
     * getChildren()异步方法回调
     * @param rc
     * @param path
     * @param ctx
     * @param children
     * @param stat
     */
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
        // 拿到了父节点下所有的节点信息 先排个序
        if(stat != null) {
            children.sort(String::compareTo);
            // 判断当前节点是否是第一个
            if(nodeName.compareTo(children.get(0)) == 0) {
                // 当前节点拿到锁成功
                latch.countDown();
            } else {
                // 需要对前一个节点设置监听
                String prevNodeName = "/" + children.get(children.indexOf(nodeName) - 1);
                // 通过exists设置监听并且处理回调
                zk.exists(prevNodeName, this, this, "ABC");
            }
        }
    }

这里就开始对父节点下面所有的节点进行排序(从小到大),然后就有两种情况

情况一:当前节点是第一个,那么当前节点就顺理成章的获取到锁了,直接结束阻塞并返回即可

情况二:当前节点前面还有节点,那么就通过 exists() 方法对前一个节点设置监听,等它被删除的时候结束阻塞

exists() 方法的第二参数是监听对象,当监听的节点状态发生变化会调用这个对象的 process()方法

    @Override
    public void process(WatchedEvent event) {
        switch (event.getType()) {
            case None:
                break;
            case NodeCreated:
                break;
            case NodeDeleted:
                // 前一个节点被删除了 当前节点直接获得锁
                latch.countDown();
                System.out.println(threadName + " found prev node deleted: " + event.getPath());
                break;
            case NodeDataChanged:
                break;
            case NodeChildrenChanged:
                break;
            case DataWatchRemoved:
                break;
            case ChildWatchRemoved:
                break;
            case PersistentWatchRemoved:
                break;
        }
    }

当监听的节点被删除,会获得 NodeDeleted 的事件类型,直接对latch执行 countDown(),打印一条信息。

exists() 第三个参数是回调对象,当exists()异步执行完成会调用回调对象的下面这个方法:

    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        if(stat == null) {
            System.out.println(threadName + " found prev node missing: " + path);
        }
    }

这里没有什么操作,实验过程中也没走到这个if,不过如果当前节点要监听前一个节点然后突然发现前面这个节点不存在,可能是前面一个节点正好把锁释放了,所以如果要处理的话,这里可以直接让当前节点获取锁。

到这里,tryLock() 方法就执行完成了,反正就是节点要么获取到锁要么等待前序节点的删除然后获取到锁,因此还需要一个 unlock() 方法来删除节点:

    public void unLock() {
        // 解锁 - 直接删除当前节点
        try {
            zk.delete("/" + nodeName, -1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

unlock 很简单,就直接删除当前节点即可。

当然,如果当前线程因为某些情况挂掉了,那么zookeeper会在session的超时事件结束后替它把临时节点删除,那么后序节点就可以获得锁了。

验证实验

 10个线程分别拿到自己的节点然后按顺序执行。

总结

zookeeper本身提供的临时有序节点规避了其他分布式锁的问题,结合监听和回调机制可以方便的实现分布式锁。实验的重点其实就是 WatcherCallback 类,它实现了所有需要用的 Watcher 和 Callback 接口,进而把所有内部实现封装在一个类里,所有运行过程都通过回调方法以及监听方法串联起来,暴露给外部的 tryLock/unlock 方法即可以对使用方屏蔽内部细节。这种类似响应式编程的方法写起来虽然很烧脑但是很有趣。

有关ZooKeeper应用---分布式锁的更多相关文章

  1. ruby - 将差异补丁应用于字符串/文件 - 2

    对于具有离线功能的智能手机应用程序,我正在为Xml文件创建单向文本同步。我希望我的服务器将增量/差异(例如GNU差异补丁)发送到目标设备。这是计划:Time=0Server:hasversion_1ofXmlfile(~800kiB)Client:hasversion_1ofXmlfile(~800kiB)Time=1Server:hasversion_1andversion_2ofXmlfile(each~800kiB)computesdeltaoftheseversions(=patch)(~10kiB)sendspatchtoClient(~10kiBtransferred)Cl

  2. ruby-on-rails - Rails 应用程序之间的通信 - 2

    我构建了两个需要相互通信和发送文件的Rails应用程序。例如,一个Rails应用程序会发送请求以查看其他应用程序数据库中的表。然后另一个应用程序将呈现该表的json并将其发回。我还希望一个应用程序将存储在其公共(public)目录中的文本文件发送到另一个应用程序的公共(public)目录。我从来没有做过这样的事情,所以我什至不知道从哪里开始。任何帮助,将不胜感激。谢谢! 最佳答案 无论Rails是什么,几乎所有Web应用程序都有您的要求,大多数现代Web应用程序都需要相互通信。但是有一个小小的理解需要你坚持下去,网站不应直接访问彼此

  3. ruby - 无法运行 Rails 2.x 应用程序 - 2

    我尝试运行2.x应用程序。我使用rvm并为此应用程序设置其他版本的ruby​​:$rvmuseree-1.8.7-head我尝试运行服务器,然后出现很多错误:$script/serverNOTE:Gem.source_indexisdeprecated,useSpecification.Itwillberemovedonorafter2011-11-01.Gem.source_indexcalledfrom/Users/serg/rails_projects_terminal/work_proj/spohelp/config/../vendor/rails/railties/lib/r

  4. ruby-on-rails - Rails 应用程序中的 Rails : How are you using application_controller. rb 是新手吗? - 2

    刚入门rails,开始慢慢理解。有人可以解释或给我一些关于在application_controller中编码的好处或时间和原因的想法吗?有哪些用例。您如何为Rails应用程序使用应用程序Controller?我不想在那里放太多代码,因为据我了解,每个请求都会调用此Controller。这是真的? 最佳答案 ApplicationController实际上是您应用程序中的每个其他Controller都将从中继承的类(尽管这不是强制性的)。我同意不要用太多代码弄乱它并保持干净整洁的态度,尽管在某些情况下ApplicationContr

  5. ruby-on-rails - 如何在我的 Rails 应用程序 View 中打印 ruby​​ 变量的内容? - 2

    我是一个Rails初学者,但我想从我的RailsView(html.haml文件)中查看Ruby变量的内容。我试图在ruby​​中打印出变量(认为它会在终端中出现),但没有得到任何结果。有什么建议吗?我知道Rails调试器,但更喜欢使用inspect来打印我的变量。 最佳答案 您可以在View中使用puts方法将信息输出到服务器控制台。您应该能够在View中的任何位置使用Haml执行以下操作:-puts@my_variable.inspect 关于ruby-on-rails-如何在我的R

  6. ruby - 分布式事务和队列,ruby,erlang,scala - 2

    我有一个涉及多台机器、消息队列和事务的问题。因此,例如用户点击网页,点击将消息发送到另一台机器,该机器将付款添加到用户的帐户。每秒可能有数千次点击。事务的所有方面都应该是容错的。我以前从未遇到过这样的事情,但一些阅读表明这是一个众所周知的问题。所以我的问题。我假设安全的方法是使用两阶段提交,但协议(protocol)是阻塞的,所以我不会获得所需的性能,我是否正确?我通常写Ruby,但似乎Redis之类的数据库和Rescue、RabbitMQ等消息队列系统对我的帮助不大——即使我实现某种两阶段提交,如果Redis崩溃,数据也会丢失,因为它本质上只是内存。所有这些让我开始关注erlang和

  7. ruby-on-rails - 如何在 Gem 中获取 Rails 应用程序的根目录 - 2

    是否可以在应用程序中包含的gem代码中知道应用程序的Rails文件系统根目录?这是gem来源的示例:moduleMyGemdefself.included(base)putsRails.root#returnnilendendActionController::Base.send:include,MyGem谢谢,抱歉我的英语不好 最佳答案 我发现解决类似问题的解决方案是使用railtie初始化程序包含我的模块。所以,在你的/lib/mygem/railtie.rbmoduleMyGemclassRailtie使用此代码,您的模块将在

  8. 世界前沿3D开发引擎HOOPS全面讲解——集3D数据读取、3D图形渲染、3D数据发布于一体的全新3D应用开发工具 - 2

    无论您是想搭建桌面端、WEB端或者移动端APP应用,HOOPSPlatform组件都可以为您提供弹性的3D集成架构,同时,由工业领域3D技术专家组成的HOOPS技术团队也能为您提供技术支持服务。如果您的客户期望有一种在多个平台(桌面/WEB/APP,而且某些客户端是“瘦”客户端)快速、方便地将数据接入到3D应用系统的解决方案,并且当访问数据时,在各个平台上的性能和用户体验保持一致,HOOPSPlatform将帮助您完成。利用HOOPSPlatform,您可以开发在任何环境下的3D基础应用架构。HOOPSPlatform可以帮您打造3D创新型产品,HOOPSSDK包含的技术有:快速且准确的CAD

  9. 叮咚买菜基于 Apache Doris 统一 OLAP 引擎的应用实践 - 2

    导读:随着叮咚买菜业务的发展,不同的业务场景对数据分析提出了不同的需求,他们希望引入一款实时OLAP数据库,构建一个灵活的多维实时查询和分析的平台,统一数据的接入和查询方案,解决各业务线对数据高效实时查询和精细化运营的需求。经过调研选型,最终引入ApacheDoris作为最终的OLAP分析引擎,Doris作为核心的OLAP引擎支持复杂地分析操作、提供多维的数据视图,在叮咚买菜数十个业务场景中广泛应用。作者|叮咚买菜资深数据工程师韩青叮咚买菜创立于2017年5月,是一家专注美好食物的创业公司。叮咚买菜专注吃的事业,为满足更多人“想吃什么”而努力,通过美好食材的供应、美好滋味的开发以及美食品牌的孵

  10. 【鸿蒙应用开发系列】- 获取系统设备信息以及版本API兼容调用方式 - 2

    在应用开发中,有时候我们需要获取系统的设备信息,用于数据上报和行为分析。那在鸿蒙系统中,我们应该怎么去获取设备的系统信息呢,比如说获取手机的系统版本号、手机的制造商、手机型号等数据。1、获取方式这里分为两种情况,一种是设备信息的获取,一种是系统信息的获取。1.1、获取设备信息获取设备信息,鸿蒙的SDK包为我们提供了DeviceInfo类,通过该类的一些静态方法,可以获取设备信息,DeviceInfo类的包路径为:ohos.system.DeviceInfo.具体的方法如下:ModifierandTypeMethodDescriptionstatic StringgetAbiList​()Obt

随机推荐