本文共 9910 字,大约阅读时间需要 33 分钟。
这篇文章主要是讲解FairSync公平锁的源码分析,整个内容分为加锁过程、解锁过程,CLH队列等概念。
首先一直困扰我的CLH队列的CLH的缩写我终于明白,看似三个人的人名的首字符缩写"CLH" (Craig, Landin, andHagersten)。 加锁过程主要核心逻辑在于尝试获取锁,获锁失败后进入等待队列,以及进入等待队列的过程是需要进行多次循环判断的。 解锁过程相对加锁过程会简单许多,核心逻辑在释放锁、唤醒下一个等待线程两个过程。 CLH的概念在加锁过程已经提及了,可以一并看看。ReentrantLock的的锁过程如下:
tryAcquire的操作流程
1、如果锁未占用的情况下:判断当前线程是否处于CLH的首位,如果位于首位就通过原子更新操作设置锁占用。 2、如果锁被占用的情况下:判断当前线程是否是占用锁线程,如果是则实现锁的可重入功能,设置锁占用次数。static final class FairSync extends Sync { // lock的入口,内部调用acquire方法实现加锁操作 final void lock() { // lock的入口 acquire(1); } public final void acquire(int arg) { // 第一步尝试获取锁,成功则返回 // 获取锁失败后通过addWaiter添加到CLH队列的末尾 // 通过acquireQueued判断是否轮到自己唤醒了 // 可以理解为之前没获取锁但是等执行到这里的时候可能锁已经释放了 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } // acquires的参数值为1 protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); // 获取当前锁状态,0表示锁未占用,>0表示被占用 int c = getState(); if (c == 0) { // 首先判断是不是CLH队列的第一个元素,没有祖先则表示第一个元素 // 然后从unsafe把state设置为1,表示锁被占用 // 设置锁占用线程为当前线程 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { // 设置锁占用线程为当前线程 setExclusiveOwnerThread(current); // 返回锁占用成功 return true; } } // 判断锁占用线程是不是本线程,说明是可重入锁 else if (current == getExclusiveOwnerThread()) { // 重入锁增加锁定次数 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); // 设置state会占用次数 setState(nextc); // 返回锁占用成功 return true; } // 否则返回锁占用失败 return false; } }
acquire的操作流程
public final void acquire(int arg) { // 第一步尝试获取锁,成功则返回 // 获取锁失败后通过addWaiter添加到CLH队列的末尾 // 通过acquireQueued判断是否轮到自己唤醒了 // 可以理解为之前没获取锁但是等执行到这里的时候可能锁已经释放了 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
addWaiter的操作流程
1、将当前线程包装成Node对象。 2、先尝试通过快速失败法尝试在CLH队尾插入Node对象 3、如果快速插入失败后那么就通过enq方法在CLH队尾插入Node对象private Node addWaiter(Node mode) { // 将线程包装成为Node对象,便于添加CLH队列 Node node = new Node(Thread.currentThread(), mode); // 先尝试快速插入到CLH队尾,插入成功就返回Node对象 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 快速插入CLH队尾失败后,通过enq方法实现 enq(node); return node; } // 将Node节点插入CLH队尾的实现 private Node enq(final Node node) { for (;;) { Node t = tail; // 如果CLH队列为空,那么设置Head和Tail都为Node if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { // 通过unsafe来保证Node插入队尾 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
acquireQueued的操作流程
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 先判断节点的祖先 final Node p = node.predecessor(); // 如果前驱是head,即该结点已成老二, // 那么便有资格去尝试获取资源, // tryAcquire成功说明head已经释放锁 // 休眠线程被唤醒的时候会继续执行这里 if (p == head && tryAcquire(arg)) { // 设置当前节点为head节点 setHead(node); // 释放原head节点用于gc回收 p.next = null; // help GC failed = false; return interrupted; } // 如果自己可以休息了,就进入waiting状态,直到被unpark() if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
shouldParkAfterFailedAcquire的操作流程
// shouldParkAfterFailedAcquire外层for循环调用 // 第一次设置Node前置节点状态为SIGNAL // 下一次循环就前置节点庄为SIGNAL,那么线程自身就需要被阻塞了 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 如果前继节点是SIGNAL状态,则意味这当前线程需要被阻塞。此时,返回true。 int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; // ws>0代表线程被取消了 // static final int CANCELLED = 1; // waitStatus value to indicate thread has cancelled if (ws > 0) { // 如果前驱处于取消状态,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 如果前继节点为“0”或者“共享锁”状态,则设置前继节点为SIGNAL状态。 // 状态为0的情况只可能是初始化的时候的默认值 // 当前线程进入等待状态的时候需要设置前置状态为SIGNAL // SIGNAL状态表示后置线程需要被唤醒 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } private final boolean parkAndCheckInterrupt() { // parkAndCheckInterrupt()的作用是阻塞当前线程,并且返回“线程被唤醒之后”的中断状态。 LockSupport.park(this); return Thread.interrupted(); }
Node的介绍
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { protected AbstractQueuedSynchronizer() { } /** * +------+ prev +-----+ +-----+ * head | | <---- | | <---- | | tail * +------+ +-----+ +-----+ */ static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; // 当前线程已被取消 static final int CANCELLED = 1; // “当前线程的后继线程需要被unpark(唤醒)”。 // 一般发生情况是:当前线程的后继线程处于阻塞状态, // 而当前线程被release或cancel掉,因此需要唤醒当前线程的后继线程。 static final int SIGNAL = -1; // 当前线程(处在Condition休眠状态)在等待Condition唤醒 static final int CONDITION = -2; // (共享锁)其它线程获取到“共享锁”,状态为0表示当前线程不属于上面的任何一种状态。 static final int PROPAGATE = -3; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; final boolean isShared() { return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } } private transient volatile Node head; private transient volatile Node tail; private volatile int state;}
release过程
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
tryRelease过程
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
unparkSuccessor过程
1、设置当前Node状态为0 2、寻找下一个等待线程节点来唤醒等待线程并通过LockSupport.unpark()唤醒线程 3、寻找下一个等待线程,如果当前Node的下一个节点符合状态就直接进行唤醒,否则从队尾开始进行倒序查找,找到最优先的线程进行唤醒。private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; // 找到状态<0的线程进行唤醒 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }##参考文章
转载地址:http://zcxca.baihongyu.com/