各人好,我是wave。这次我们继承接着讲锁,来给各人聊一聊Lock的一些底层原理。
根本使用
Lock的根本使用案例
- public class Solution { static int n = 0; public static void main(String[] args)throws Exception { //创建一个Lock对象 Lock lock = new ReentrantLock(); //创建10个线程对n自加10000 for (int i = 0; i < 10; i++) { new Thread(()->{ //加锁 lock.lock(); try { for (int j = 0; j < 10000; j++) { n++; } }catch (Exception e){ e.printStackTrace(); }finally { //解锁 lock.unlock(); } }).start(); } Thread.sleep(3000); System.out.println(n);//100000 }}
复制代码
- 这个代码案例就是创建了10个线程对一个变量n进行自加10000的操纵,然后使用了Lock进行加锁,最后结果是正确的100000。
- 可以看到这里Lock加锁的逻辑代码加了一个try-catch块,这里并不是必须的一个异常捕获,但是Lock比力标准的写法就是最好使用try-catch块写入业务逻辑,最后在finally中进行unlock(解锁)。这样做的优点是制止某个线程突然发生异常,导致反面的unlock代码没有执行,就会造成死锁。
- 本篇文章主要讨论的是Lock的实现类ReentrantLock。
加锁操纵
进入到lock()方法中,并找到ReentrantLock的实现方法
- public void lock() { sync.lock(); }
复制代码 继承进入lock()
- abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; /** * Performs {@link Lock#lock}. The main reason for subclassing * is to allow fast path for nonfair version. */ abstract void lock();
复制代码 这里我们发现Sync是一个继承了AbstractQueuedSynchronizer的类,AbstractQueuedSynchronizer就是我们常说的AQS,所以说Lock的底层使用的是AQS框架。AQS的细节我们反面继承说。
继承看lock()抽象方法的实现类,我们先选择看公平锁。
- final void lock() { acquire(1); }
复制代码 接着看 acquire(1),这个方法的tryAcquire、acquireQueued、addWaiter都会详细分析
- public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
复制代码
- 这个acquire就是Lock加锁的关键了
- tryAcquire(arg)这个方法就是在实验加锁,如果加锁乐成就返回true,然后根据if内里的判定!tryAcquire(arg)的值就是false,所以&&反面的代码就不会执行了。
- acquireQueued这个方法就是在把Node节点进行入队,也就是当点线程加锁失败了,所以需要把这个线程进行阻塞入队,之后比及共享资源被释放了之后再实验加锁。
- addWaiter(Node.EXCLUSIVE)会返回一个Node对象,这个对象就包罗了当前线程Id等信息,而且如果Lock内里还没有队列的话,这个方法会先创建一个队列。
深入看看tryAcquire内里代码
- /** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires) { //获取当前线程 final Thread current = Thread.currentThread(); //获取state变量值 int c = getState(); //当state变量为0体现当前对象未加锁,对其实验加锁 if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //如果是可重入锁就进入else if else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }}
复制代码 这个类的逻辑照旧比力悦目懂的,首先获取了一下当前的线程,然后再获取state变量的值,这个state变量如果为1,体现已经有别的线程持有这个锁,如果为0体现当前还没有线程持有这个锁。所以如果state为
0就进入if内里。如果state为1,而且是可重入锁,就进入到else if代码块内里。如果都不满意,就返回false,然后当前线程进入队列排队。
进入到hasQueuedPredecessors
- public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
复制代码
- hasQueuedPredecessors这个方法需要返回false才能真正进入到下面的cas进行加锁。
- 第一种是头节点与尾结点是同一个,这种情况只大概是队列还没初始化,所以h == t,返回false
- 第二种是头节点反面有下一个节点,而且这个节点是当前线程。也就是说尾结点前面的和头节点的下一个线程应该是正确的线程。一般前一个线程释放锁,后一个线程进行加锁走的就是这个方法。
- 走完hasQueuedPredecessors方法之后就使用compareAndSetState也就是一个cas操纵对state的值进行修改,如果修改成了1则加锁乐成,再 setExclusiveOwnerThread(current)设置可重入锁线程为当前这个线程,然后返回true就可以了。
- else if内里的可重入锁的代码就比力好理解了,判定当前这个线程是不是第一次设置的可重入锁线程(就是第一次设置过的线程和后序进入的线程是不是同一个),如果是,就对nextc这个变量进行 + 1,然后再修改state的值为nextc。加锁乐成返回true,否则返回false。
acquire
- public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
复制代码
- 我们上面分析的是tryAcquire方法,只有tryAcquire加锁失败,!tryAcquire(arg)为true才会继承执行反面的代码。既然是加锁失败,那么肯定是这把锁已经被其他线程锁持有了,所以这里我们可以想到肯定是会有一个入队的操纵。
- 首先lock内里的这个队列并不是Java聚集中的队列,而且AQS中的虚拟双向队列,虚拟的意思就是并没有用Java的API中的队列,而且自己使用了一个Node节点,内里定义有pre指针与next指针,自实现的这么一个队列。
- 这个队列并不是Lock对象一创建就会有的,上面我们也提到过队列未初始化的一种加锁情况。所以入队操纵首先要判定队列是否被创建,如果还没有被创建的话就需要先创建队列。
调用acquireQueued会先执行addWaiter(Node.EXCLUSIVE)这个函数,所以先进入到addWaiter
- private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
复制代码 再先看看Node节点
- static final class Node { volatile Node prev;//队列前一个 volatile Node next;//队列下一个 volatile Thread thread;//线程 private transient volatile Node head;//头节点 private transient volatile Node tail;//尾结点 private volatile int state;//体现锁状态的变量
复制代码 这里可以看到Node节点就是用来组成队列的元素,这里Node我只截取了关键的几个属性。
- 我们继承分析addWaiter,第一步创建一个Node,也就是把当前线程变成一个Node,然后当尾结点不会null的时候进入if代码块,尾结点不为null意味着这个队列已经被初始化了。所以如果当前队列还为初始化就进入到enq去初始化队列。
- 如果是队列已经被初始化进入到了if代码块内里,就是把node插入到队列的尾部。
如果未初始化,进入enq
- private Node enq(final Node node) { for (;;) { //未初始化的话tail肯定为null,进入到if内里。 Node t = tail; if (t == null) { // Must initialize //cas操纵设置头节点 if (compareAndSetHead(new Node())) //这里让tail不为null了,所以下次循环会进入到else内里 tail = head; } else { //node的前一个指向t,如果是初始化进入的话 //t是head也是tail,如果不是初始化进入t就是tail node.prev = t; //cas把Node节点中的tail指向node if (compareAndSetTail(t, node)) { //t此时反面加了一个node,所以t的下一个为node t.next = node; return t; } } } }
复制代码 这段代码有点难看懂,我画了一个图资助各人理解
[外链图片转存失败,源站大概有防盗链机制,发起将图片生存下来直接上传(img-b11krvJZ-1609316095610)( http://gtwave.gitee.io/image/images/wechart/2020-12/lock队列enq图解.png)]
最后再进入到acquireQueued
- final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //获取节点前一个 final Node p = node.predecessor(); //如果前一个节点是头节点而且对当前节点解锁乐成进入if代码块 if (p == head && tryAcquire(arg)) { //当前节点变为头节点,说明它持有了锁,前一个节点指向null setHead(node); p.next = null; // help GC failed = false; return interrupted; } //如果加锁失败需要park线程会进入到这里继承阻塞线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
复制代码
- acquireQueued这个方法主要功能就是循环不断的让队列中的第一个线程去获取锁。
- 这里的大抵流程就是让头节点的下一个节点变成头节点,然后当前的头结点的Thread属性会变成null,因为当前线程就是这个Thread了,所以就不消生存这个线程了。旧的头节点会变为null,方便gc。
整个加锁的流程大抵就是这个样子的了,其实我们回味一下lock的代码,写的真的是非常非常的轻便和有趣,很巧妙的用循环和一些逻辑判定简化了整个代码,不得不说Doug Lea实在太锋利了。
AQS
- 这里对AQS做一个简单的形貌:AQS是一个用来自定义锁的框架,AQS的底层就是使用了一个虚拟双向队列和一个State变量来完成加锁操纵的。虚拟双向队列的含义就是这个队列不是聚集内里的队列,而是用一个Node节点,内里包罗pre指针与next指针实现的一个队列。如果队列中的某个节点把state变量进行了修改,就可以视为这个节点持有了锁。
- 显而易见的lock的底层就是AQS了,我们上面分析lock的源码的思想就和AQS的思想是一样的,而且我们也看到了lock其实继承了AQS。
- 我们其实也可以自己继承AQS的类,然后自己实现一个自定义的锁,这里各人可以自行实验。
解锁操纵
同样进入到unlock内里
- public void unlock() { sync.release(1); }
复制代码- public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
复制代码
- tryRelease(arg)就是在对当前线程进行解锁,如果解锁乐成,则判定队列内里是否有线程了,如果有线程则唤起下一个线程
- waitStatus这个属性其实如果为0则体现线程处于生动状态,其他值都体现阻塞、取消等状态。
我们看看tryRelease(arg)
- protected final boolean tryRelease(int releases) { //取出当前线程的state变量,减去releases int c = getState() - releases; //如果当前线程不是持有锁的线程,抛出异常解锁失败 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //c == 0体现这个可重入锁已经把全部锁都解掉了 if (c == 0) { free = true; //把持有锁的线程设置为null setExclusiveOwnerThread(null); } //把c写回到state内里 setState(c); //如果全部锁都解完了,即c == 0,返回true,反之返回false return free; }
复制代码 这个类的方法照旧很好理解的,各人仔细看我写的注释就能明白这个类的意思了。
继承看一下unparkSuccessor(h)
[code]private void unparkSuccessor(Node node) { //获取Node中的waitStatus,就是在判定node中的线程的状态 int ws = node.waitStatus; //小于0就用cas操纵改为0,因为现在就在操纵这个线程,所以状态肯定是生动的 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); //获取下一node中下一个节点 Node s = node.next; //waitStatus > 0体现这个线程被取消了 if (s == null || s.waitStatus > 0) { s = null; //因为s == null 与线程被取消都体现这个线程已经没了 //所以从尾节点到头结点重新遍历找出一个是期待状态大概生动状态的线程 for (Node t = tail; t != null && t != node; t = t.prev) //waitStatus < 0 体现在期待,waitStatus == 0体现生动 if (t.waitStatus |