博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
zookeeper — 实现分布式锁
阅读量:7017 次
发布时间:2019-06-28

本文共 6011 字,大约阅读时间需要 20 分钟。

一.前言

在之前的文章中介绍过分布式锁的特点和利用Redis实现简单的分布式锁。但是分布式锁的实现还有很多其他方式,但是万变不离其宗,始终遵循一个特点:同一时刻只能有一个操作获取。这篇文章主要介绍如何基于zookeeper实现分布式锁。

  • zookeeper能够作为分布式锁实现的基础
  • 算法流程
  • 实现

关于分布式锁的相关特性,这里不再赘述,请参考。

二.zookeeper能够作为分布式锁实现的基础

这里回顾下分布式锁的特点:

  • 每次只能一个占用锁;
  • 可以重复进入锁;
  • 只有占用者才可以解锁;
  • 获取锁和释放锁都需要原子
  • 不能产生死锁
  • 尽量满足性能

zookeeper中有一种临时顺序节点,它具有以下特征:

  • 时效性,当会话结束,节点将自动被删除
  • 顺序性,当多个应用向其注册顺序节点时,每个顺序号将只能被一个应用获取

利用以上的特点可以满足分布式锁实现的基本要求:

  1. 因为顺序性,可以让最小顺序号的应用获取到锁,从而满足分布式锁的每次只能一个占用锁,因为只有它一个获取到,所以可以实现重复进入,只要设置标识即可。锁的释放,即删除应用在zookeeper上注册的节点,因为每个节点只被自己注册拥有,所以只有自己才能删除,这样就满足只有占用者才可以解锁

  2. zookeeper的序号分配是原子的,分配后即不会再改变,让最小序号者获取锁,所以获取锁是原子的

  3. 因为注册的是临时节点,在会话期间内有效,所以不会产生死锁

  4. zookeeper注册节点的性能能满足几千,而且支持集群,能够满足大部分情况下的性能

三.算法流程

1.获取锁

需要获取分布式锁的应用都向zookeeper的/lock/{resouce}目录下注册sequence-前缀的节点,序号最小者获取到操作资源的权限:

Note:

这里的resource需要依据竞争的具体资源确定,如竞争账户则可以使用账户号作为resource。

从图中可以看出,clientA的顺序号最小,由它获取到锁,操作资源。

算法步骤

  1. client判断/lock目录是否存在,如果不存在则向其注册/lock的持久节点
  2. client判断/lock目录下是否存在竞争的资源resouce目录,如果不存在则向其注册/lock/resource的持久节点
  3. client向/lock/resource目录下注册/lock/resource/sequence-前缀的临时顺序节点,并得到顺序号
  4. client获取/lock/resource目录下的所有临时顺序子节点
  5. client判断临时子节点序号中是否存在比自身的序号小的节点。如果不存在,则获取到锁;如果存在,则对象该临时节点做watch监控
  6. 如果收到监控的临时节点被删除的通知,则再重复4、5步骤,直到获取到锁

流程图

2.释放锁

因为最小的节点只被获取到锁的client持有,所以该锁不可能被其他client释放。同时释放锁只需要将临时顺序节点删除,也是原子性操作。

三.实现

/** * 基于Zookeeper实现分布式锁 * * @author huaijin */public class DistributedLockBaseZookeeper implements DistributedLock {    private static final Logger log = LoggerFactory.getLogger(DistributedLockBaseZookeeper.class);    /**     * 利用空串作为各个节点存储的数据     */    private static final String EMPTY_DATA = "";    /**     * 分布式锁的根目录     */    private static final String LOCK_ROOT = "/lock";    /**     * zookeeper目录分隔符     */    private static final String PATH_SEPARATOR = "/";    /**     * 临时顺序节点前缀     */    private static final String LOCK_NODE_PREFIX = "sequence-";    /**     * 利用Lock和Condition实现等待通知     */    private Lock waitNotifierLock = new ReentrantLock();    private Condition waitNotifier = waitNotifierLock.newCondition();    /**     * 操作zookeeper的client     */    private ZkClient zkClient;    /**     * 分布式资源的路径     */    private String resourcePath;    /**     * 锁节点完整前缀     */    private String lockNodePrefix;    /**     * 当前注册的临时顺序节点路径     */    private String currentLockNodePath;    public DistributedLockBaseZookeeper(String resource, ZkClient zkClient) {        Objects.requireNonNull(zkClient, "zkClient must not be null!");        if (resource == null || resource.isEmpty()) {            throw new IllegalArgumentException("resource must not be null!");        }        this.zkClient = zkClient;        this.resourcePath = LOCK_ROOT + PATH_SEPARATOR + resource;        this.lockNodePrefix = resourcePath + PATH_SEPARATOR + LOCK_NODE_PREFIX;        // 创建分布式锁根目录        if (!this.zkClient.exists(LOCK_ROOT)) {            try {                this.zkClient.create(LOCK_ROOT, EMPTY_DATA, CreateMode.PERSISTENT);            } catch (ZkNodeExistsException e) {                // ignore, logging                log.warn("The root path for lock already exists.");            }        }        // 创建资源目录        if (!this.zkClient.exists(resourcePath)) {            try {                this.zkClient.create(resourcePath, EMPTY_DATA, CreateMode.PERSISTENT);            } catch (ZkNodeExistsException e) {                // ignore, logging                log.warn("The resource path for [" + resourcePath + "] already exists.");            }        }    }    @Override    public void lock() throws DistributedLockException {        if (!acquireLock()) {            // 如果获取锁不成功,则等待            waitNotifierLock.lock();            try {                waitNotifier.await();            } catch (Exception e) {                throw new DistributedLockException("Interrupt when waiting notification.");            } finally {                waitNotifierLock.unlock();            }        }    }    @Override    public void unlock() {        // 删除自身节点,释放锁        zkClient.delete(currentLockNodePath);    }    private boolean acquireLock() throws DistributedLockException {        // 如果当前未注册临时顺序节点,则注册        if (this.currentLockNodePath == null) {            this.currentLockNodePath = zkClient.create(lockNodePrefix, EMPTY_DATA, CreateMode.EPHEMERAL_SEQUENTIAL);        }        // 获取顺序号        long lockNodeSeq = fetchSeqFromNodePath(currentLockNodePath);        // 获取所有子节点        List
childNodePaths = zkClient.getChildren(resourcePath); if (childNodePaths == null || childNodePaths.isEmpty()) { throw new DistributedLockException("Not exists child nodes."); } // 从所有子节点中获取最小子节点的顺序号 long minSeq = 1000000L; int minIndex = -1; for (int i = 0; i < childNodePaths.size(); i++) { long nodeSeq = fetchSeqFromNodePath(resourcePath + childNodePaths.get(i)); if (nodeSeq < minSeq) { minSeq = nodeSeq; minIndex = i; } } // 比较自身顺序号与最小序号 if (lockNodeSeq > minSeq) { // 如果存在更小序号,则监控最小序号的子节点 String minLockNodePath = childNodePaths.get(minIndex); zkClient.subscribeDataChanges(resourcePath + PATH_SEPARATOR + minLockNodePath, new ListenerForLockRelease()); return false; } // 成功获取锁,返回 return true; } private long fetchSeqFromNodePath(String nodePath) { String seq = nodePath.substring(lockNodePrefix.length()); return Long.valueOf(seq); } private class ListenerForLockRelease implements IZkDataListener { @Override public void handleDataChange(String dataPath, Object data) throws Exception { } @Override public void handleDataDeleted(String dataPath) throws Exception { // 如果成功获取锁,则通知,让主线程返回 if (acquireLock()) { waitNotifierLock.lock(); try { waitNotifier.signal(); } finally { waitNotifierLock.unlock(); } } } }}

转载于:https://www.cnblogs.com/lxyit/p/10387123.html

你可能感兴趣的文章
git 使用教程,常用命令
查看>>
使用SVI实现Vlan间路由
查看>>
Linux学习笔记5月28日任务
查看>>
解决Td内容为空时不显示边框的问题-兼容IE、firefox、chrome
查看>>
SylixOS x86中断探测(二)
查看>>
HDFS总结
查看>>
scala 中导出excel
查看>>
http长轮询&短轮询
查看>>
Android 应用换肤功能(白天黑夜主题切换)
查看>>
Linux编程操作知识整理(continued)
查看>>
2012.8.13 onEnter与触摸事件
查看>>
基于 HTML5 WebGL 的 3D 棉花加工监控系统
查看>>
[redis] 获得 database, key, value
查看>>
swift之mutating关键字
查看>>
Nginx 0.8.x + PHP 5.2.13(FastCGI)搭建胜过Apache十倍的W...
查看>>
eclipse连接android设备的问题
查看>>
.pem引发的血案
查看>>
如何在rootscope 获取angular ui的tab子域 scope 也叫子域暴露
查看>>
HashMap,HashTable,HashSet区别
查看>>
如何学习iOS开发——对新手的几句废话
查看>>