Curator
是ZooKeeper
的一个客户端框架,其中封装了分布式互斥锁
的实现,最为常用的是InterProcessMutex
,本文将对其进行代码剖析
简介 InterProcessMutex
基于Zookeeper
实现了_**分布式的公平可重入互斥锁
**_,类似于单个JVM进程内的ReentrantLock(fair=true)
构造函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public InterProcessMutex (CuratorFramework client, String path) { this (client, path, new StandardLockInternalsDriver ()); } public InterProcessMutex (CuratorFramework client, String path, LockInternalsDriver driver) { this (client, path, LOCK_NAME, 1 , driver); } InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver){ basePath = PathUtils.validatePath(path); internals = new LockInternals (client, driver, path, lockName, maxLeases); }
获取锁 InterProcessMutex.acquire 1 2 3 4 5 6 7 8 9 10 11 public void acquire () throws Exception{ if ( !internalLock(-1 , null ) ){ throw new IOException ("Lost connection while trying to acquire lock: " + basePath); } } public boolean acquire (long time, TimeUnit unit) throws Exception{ return internalLock(time, unit); }
InterProcessMutex.internalLock 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private boolean internalLock (long time, TimeUnit unit) throws Exception{ Thread currentThread = Thread.currentThread(); LockData lockData = threadData.get(currentThread); if ( lockData != null ){ lockData.lockCount.incrementAndGet(); return true ; } String lockPath = internals.attemptLock(time, unit, getLockNodeBytes()); if ( lockPath != null ){ LockData newLockData = new LockData (currentThread, lockPath); threadData.put(currentThread, newLockData); return true ; } return false ; }
1 2 3 private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
1 2 3 4 5 6 7 8 9 10 11 12 private static class LockData { final Thread owningThread; final String lockPath; final AtomicInteger lockCount = new AtomicInteger (1 ); private LockData (Thread owningThread, String lockPath) { this .owningThread = owningThread; this .lockPath = lockPath; } }
LockInternals.attemptLock 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 String attemptLock (long time, TimeUnit unit, byte [] lockNodeBytes) throws Exception{ final long startMillis = System.currentTimeMillis(); final Long millisToWait = (unit != null ) ? unit.toMillis(time) : null ; final byte [] localLockNodeBytes = (revocable.get() != null ) ? new byte [0 ] : lockNodeBytes; int retryCount = 0 ; String ourPath = null ; boolean hasTheLock = false ; boolean isDone = false ; while ( !isDone ){ isDone = true ; try { ourPath = driver.createsTheLock(client, path, localLockNodeBytes); hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath); } catch ( KeeperException.NoNodeException e ) { if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ){ isDone = false ; } else { throw e; } } } if ( hasTheLock ){ return ourPath; } return null ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public String createsTheLock (CuratorFramework client, String path, byte [] lockNodeBytes) throws Exception{ String ourPath; if ( lockNodeBytes != null ){ ourPath = client.create().creatingParentContainersIfNeeded() .withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes); } else { ourPath = client.create().creatingParentContainersIfNeeded() .withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path); } return ourPath; }
LockInternals.internalLockLoop 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 private boolean internalLockLoop (long startMillis, Long millisToWait, String ourPath) throws Exception { boolean haveTheLock = false ; boolean doDelete = false ; try { if (revocable.get() != null ) { client.getData().usingWatcher(revocableWatcher).forPath(ourPath); } while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) { List<String> children = getSortedChildren(); String sequenceNodeName = ourPath.substring(basePath.length() + 1 ); PredicateResults predicateResults = driver.getsTheLock(client, children , sequenceNodeName , maxLeases); if (predicateResults.getsTheLock()) { haveTheLock = true ; } else { String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch(); synchronized (this ) { try { client.getData().usingWatcher(watcher).forPath(previousSequencePath); if (millisToWait != null ) { millisToWait -= (System.currentTimeMillis() - startMillis); startMillis = System.currentTimeMillis(); if (millisToWait <= 0 ) { doDelete = true ; break ; } wait(millisToWait); } else { wait(); } } catch (KeeperException.NoNodeException e) { } } } } } catch (Exception e) { ThreadUtils.checkInterrupted(e); doDelete = true ; throw e; } finally { if (doDelete) { deleteOurPath(ourPath); } } return haveTheLock; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public PredicateResults getsTheLock (CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception{ int ourIndex = children.indexOf(sequenceNodeName); validateOurIndex(sequenceNodeName, ourIndex); boolean getsTheLock = ourIndex < maxLeases; String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases); return new PredicateResults (pathToWatch, getsTheLock); } static void validateOurIndex (String sequenceNodeName, int ourIndex) throws KeeperException{ if ( ourIndex < 0 ){ throw new KeeperException .NoNodeException("Sequential path not found: " + sequenceNodeName); } }
1 2 3 4 5 6 7 8 9 10 private final Watcher watcher = new Watcher (){ @Override public void process (WatchedEvent event) { notifyFromWatcher(); } }; private synchronized void notifyFromWatcher () { notifyAll(); }
1 2 3 4 5 6 7 8 9 10 private void deleteOurPath (String ourPath) throws Exception{ try { client.delete().guaranteed().forPath(ourPath); } catch ( KeeperException.NoNodeException e ) { } }
释放锁 弄明白了获取锁的原理,释放锁的逻辑就很清晰了
InterProcessMutex.release 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public void release () throws Exception{ Thread currentThread = Thread.currentThread(); LockData lockData = threadData.get(currentThread); if ( lockData == null ){ throw new IllegalMonitorStateException ("You do not own the lock: " + basePath); } int newLockCount = lockData.lockCount.decrementAndGet(); if ( newLockCount > 0 ){ return ; } if ( newLockCount < 0 ){ throw new IllegalMonitorStateException ("Lock count has gone negative for lock: " + basePath); } try { internals.releaseLock(lockData.lockPath); } finally { threadData.remove(currentThread); } }
LockInternals.releaseLock 1 2 3 4 5 void releaseLock (String lockPath) throws Exception{ revocable.set(null ); deleteOurPath(lockPath); }
1 2 3 4 5 6 7 8 9 10 private void deleteOurPath (String ourPath) throws Exception{ try { client.delete().guaranteed().forPath(ourPath); } catch ( KeeperException.NoNodeException e ) { } }
总结 InterProcessMutex
的特性
分布式锁(基于Zookeeper
)
互斥锁
公平锁(监听上一临时顺序节点
+ wait() / notifyAll()
)
可重入