App下載

Java并發(fā)中的ReentrantLock和AQS的源碼詳解

猿友 2021-08-04 11:28:49 瀏覽數(shù) (1724)
反饋

說起 Java 的并發(fā)就不得不提到 ReentrantLock,說起 ReentrantLock 就不得不說到 AQS。下面,我將為大家簡單地聊聊 Java 并發(fā)中的 ReentrantLock和AQS,剖析一下此二者的源碼。

一.前言

首先在聊ReentrantLock之前,我們需要知道整個JUC的并發(fā)同步的基石,currrent里面所有的共享變量都是由volatile修飾的,我們知道volatile的語義有2大特點,可見性以及防止重排序(內(nèi)存屏障,volatie寫與volatile讀)
1、當(dāng)?shù)诙€操作為volatile寫操做時,不管第一個操作是什么(普通讀寫或者volatile讀寫),都不能進行重排序。這個規(guī)則確保volatile寫之前的所有操作都不會被重排序到volatile之后;

2、當(dāng)?shù)谝粋€操作為volatile讀操作時,不管第二個操作是什么,都不能進行重排序。這個規(guī)則確保volatile讀之后的所有操作都不會被重排序到volatile之前;

3、當(dāng)?shù)谝粋€操作是volatile寫操作時,第二個操作是volatile讀操作,不能進行重排序。
而cas操作同時包含了volatile寫/讀語義,這二者的完美結(jié)合就組成了current的基石

二.ReentrantLock的基礎(chǔ)用法

1.

public class ReentrantLockText {
		
	public static void main(String[] args) {
		Lock lock = new ReentrantLock();
		
		
		Thread t1 = new Thread(()->{
			try {
				lock.lock();
				System.out.println("t1 start");
				TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
				System.out.println("t1 end");
			} catch (InterruptedException e) {
				System.out.println("interrupted!");
			} finally {
				lock.unlock();
			}
		});
		t1.start();
		
		Thread t2 = new Thread(()->{
			try {
				//lock.lock();
				lock.lockInterruptibly(); //可以對interrupt()方法做出響應(yīng)
				System.out.println("t2 start");
				TimeUnit.SECONDS.sleep(5);
				System.out.println("t2 end");
			} catch (InterruptedException e) {
				System.out.println("interrupted!");
			} finally {
				lock.unlock();
			}
		});
		t2.start();
		
		try {
			TimeUnit.SECONDS.sleep(1);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		t2.interrupt(); //打斷線程2的等待
		
	}
}

運行結(jié)果

reentrantlock用于替代synchronized

  • 需要注意的是,必須要必須要必須要手動釋放鎖(重要的事情說三遍)
  • 使用syn鎖定的話如果遇到異常,jvm會自動釋放鎖,但是lock必須手動釋放鎖,因此經(jīng)常在finally中進行鎖的釋放
  • 使用reentrantlock可以進行“嘗試鎖定”tryLock,這樣無法鎖定,或者在指定時間內(nèi)無法鎖定,線程可以決定是否繼續(xù)等待
  • 使用ReentrantLock還可以調(diào)用lockInterruptibly方法,可以對線程interrupt方法做出響應(yīng)
  • 在一個線程等待鎖的過程中,可以被打斷

2.ReentrantLock還有一個tryLock(time),可以指定時間,如果指定時間內(nèi)沒有獲得鎖,則放棄,可以通過其返回值來決定是否繼續(xù)等待

3.還有就是Condition了(我個人覺得這是最靈活的一個地方)

public class Lock_condition {

    public static void main(String[] args) {

        char[] aI = "1234567".toCharArray();
        char[] aC = "ABCDEFG".toCharArray();

        Lock lock = new ReentrantLock();
        Condition conditionT1 = lock.newCondition();
        Condition conditionT2 = lock.newCondition();

        new Thread(()->{
            try {
                lock.lock();

                for(char c : aI) {
                    System.out.print(c);
                    conditionT2.signal();
                    conditionT1.await();
                }
                conditionT2.signal();

            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }

        }, "t1").start();

        new Thread(()->{
            try {
                lock.lock();

                for(char c : aC) {
                    System.out.print(c);
                    conditionT1.signal();
                    conditionT2.await();
                }

                conditionT1.signal();

            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }

        }, "t2").start();
    }
}

這是condition結(jié)合lock(獨占鎖)的用法
condition目前只實現(xiàn)了獨占鎖,關(guān)于condition的源碼理解,后續(xù)也會繼續(xù)更新,暫時我們只需要知道類似object的wait與notifiy

三.原理+源碼

我們現(xiàn)在知道了基本用法,那么我們就可以開始探究源碼了

20214892121854

1.AQS
我們知道JUC里面的核心類就是AQS,那么AQS究竟是個啥東西呢?
1)先上內(nèi)部類NODE源碼

static final class Node {
        static final Node SHARED = new Node();
        static final Node EXCLUSIVE = null;
        static final int CANCELLED =  1;
        static final int SIGNAL    = -1;
        static final int PROPAGATE = -3;
        volatile int waitStatus;
        volatile Node next;
        volatile Node prev;
        volatile Thread thread;
        Node nextWaiter;

不知道大家在看到這個 Node next;Node prev;的時候是啥感覺,反正我當(dāng)時是激動壞了,這不就是一個雙向鏈表嘛,
再看volatile Thread thread;這個屬性,這是一個管理線程的雙向鏈表,換句話說就是將線程打包成立節(jié)點放入AQS的鏈表中
基礎(chǔ)的結(jié)構(gòu)清楚之后。
SHARED 與EXCLUSIVE 代表是獨占節(jié)點還是共享節(jié)點
2)再上AQS屬性源碼

private transient volatile Node head;
    private transient volatile Node tail;
    private volatile int state;

Node0 head與tail不用說,這是來管理節(jié)點的,
這里我們要核心介紹一個屬性是state,這也是AQS這個類的靈魂,
1.再獨占鎖中這個state是1或者0,(如果大于1則表示鎖重入,這個稍后會有源碼分析)
2在共享鎖中代表還有多少共享鎖資源,
3.在讀寫鎖中,高16位代表寫鎖是否被占用,低16位代表有多少讀鎖,
4.在CountDownLatch中,通過構(gòu)造參數(shù)代碼門閘剩余個數(shù)
5.在Semaphore中,同樣通過構(gòu)造參數(shù)代表信號燈個數(shù)

2.ReentrantLock獲取鎖源碼(獨占鎖)
首先公平鎖與非公平鎖,分別繼承與Sync
FairSync NoFairSync ,默認(rèn)是非公平鎖,可以在構(gòu)造方法上指定

ReentrantLock lock = new ReentrantLock(true);//ture則是公平鎖

公平鎖故名思意,在AQS中管理著一個線程隊列,如果這時候 有一個線程過來搶這把鎖,如果是公平鎖,那么會判斷隊列是不是存在不同與當(dāng)前線程的等待隊列(FIFO),如果存在則去排隊,非公平鎖則是直接去排隊
(2.1.非公平鎖獲取鎖)

final void lock() {
            if (compareAndSetState(0, 1))//cas原子操作嘗試去修改值,如果修改成功說明成功獲取到了鎖
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

首先cas原子操作嘗試去修改值,如果修改成功說明成功獲取到了鎖,進入setExclusiveOwnerThread()方法

protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

將當(dāng)前線程記錄,實現(xiàn)偏向鎖,一行代碼便完美實現(xiàn)了偏向鎖?。?/p>

如果失敗則,調(diào)用acquire();這個方法調(diào)用的實際上是子類的nonfairTryAcquire方法

final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

首先,獲取state的值,判斷是否為0,如果為0,則說明鎖沒有被占有,(可能是剛剛被釋放)那么cas操作開始嘗試獲取鎖,
**(注意注意注意)**重要的事情說三遍,這里僅僅嘗試獲取一次,沒有自旋??!這是獨占鎖與共享鎖的區(qū)別之一,因為如果state>=0(對于共享鎖來說state代表剩余數(shù)量),那么共享鎖會不斷嘗試自旋獲取鎖,之道state<0,因為只要》0那么就可能共享到鎖
接下來的else if就是重入鎖的操作了,判斷當(dāng)先線程是不是記錄的線程,如果是,每次重入state+1,如果不是就返回flase,直接拜拜
(2.2公平鎖獲取鎖)
剛剛介紹了公平鎖的意義,所有直接上源碼,公平鎖比非公平鎖多了一個公平判斷

public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

我們可以看到hasQueuedPredecessors是用來判斷隊列是否有不同于當(dāng)前線程的節(jié)點等待,這里重點討論一個情況,
h != t返回true,(s = h.next) == null返回true
首先可以知道隊列中有2個節(jié)點,但是頭節(jié)點沒有后繼結(jié)點,在這里列舉一種情況,有另一個線程已經(jīng)執(zhí)行到初始化隊列的操作了,介于compareAndSetHead(new Node())與tail = head之間,也就是之后說的enq自旋方法,請繼續(xù)往下看

繼續(xù)如果非公平鎖沒有獲取到鎖,那么會調(diào)用acquireQueued和addwaiter方法

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

首先來看addwaiter方法

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

首先將,當(dāng)前線程封裝成一個,Node,然后通過判斷尾結(jié)點是不是空的方式,判斷隊列是不是空的,如果存在尾結(jié)點,那么直接進先驅(qū)后繼的改造,放入雙向鏈表,完成鏈表結(jié)構(gòu),那么如果是空的呢?這么調(diào)用了一個enq的自旋方法

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

我們可以看到這個一個自旋方法,第一次循環(huán):t為null,那么cas就new出來一個新結(jié)點,頭尾都指向這個新結(jié)點,
第二次循環(huán),將傳進來的這個線程結(jié)點的前驅(qū)指向剛剛new的這個結(jié)點,然后cas操作進行,將這個線程結(jié)點替換為尾部結(jié)點,然后head后繼指向線程結(jié)點,返回head
經(jīng)過二次循環(huán),得到了一個由2個節(jié)點組成的隊列,head-》node,head是假節(jié)點(里面不包括線程為null),node才是真正的線程結(jié)點(addwrite封裝好的傳進來的線程結(jié)點)
問題1.為什么一定要用cas操作,因為防止別的線程修改了該隊列

好,現(xiàn)在我們繼續(xù),看acquireQueued

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

這是一個最最核心的方法,從隊列中取線程
首先這是一個自旋,判斷該節(jié)點的前驅(qū)節(jié)點是不是head,因為(FIFO)先進先出隊列。
如果不是直接拜拜進入shouldParkAfterFailedAcquire,繼續(xù)上源碼

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

  1. CANCELLED:因為超時或者中斷,結(jié)點會被設(shè)置為取消狀態(tài),被取消狀態(tài)的結(jié)點不應(yīng)該去競爭鎖,只能保持取消狀態(tài)不變,不能轉(zhuǎn)換為其他狀態(tài)。處于這種狀態(tài)的結(jié)點會被踢出隊列,被GC回收;
  2. SIGNAL:表示這個結(jié)點的繼任結(jié)點被阻塞了,到時需要通知它;
  3. CONDITION:表示這個結(jié)點在條件隊列中,因為等待某個條件而被阻塞;
  4. PROPAGATE:使用在共享模式頭結(jié)點有可能牌處于這種狀態(tài),表示鎖的下一次獲取可以無條件傳播;
  5. 0:None of the above,新結(jié)點會處于這種狀態(tài)。

首先說明一下waitStatus這個屬性,為什么之前不提呢,因為之前沒有對waitStatus進行操作,我們在new節(jié)點與封裝結(jié)點的時候沒有考慮這個屬性,所以我們現(xiàn)在當(dāng)成一個新屬性,值為0來看待,

shouldParkAfterFailedAcquire這個代碼的邏輯意義是說明呢?
如果是SIGNAL那么,直接ture,
如果大于0,則是CANCELLED,被取消了,直接剔除隊列
如果都不是,那么將其前驅(qū)結(jié)點設(shè)為SIGNAL,也就是可以安心睡覺了,定好鬧鐘了,可以被等著喚醒了,
我們現(xiàn)在很明顯是第三種,因為之前啥都沒干,就是0,
所以本來狀態(tài)

進行shouldParkAfterFailedAcquire之后,
那么現(xiàn)在鏈表中,對線程1的前驅(qū)設(shè)鬧鐘,0變成-1

20214892653069

假設(shè)這時候又來了個線程2,那么同理,對線程而的先驅(qū)設(shè)鬧鐘0變成-1

20214892720173

在調(diào)用了shouldParkAfterFailedAcquire()之后,調(diào)用parkAndCheckInterrupt方法用于阻塞,

這里提一下,關(guān)于parkAndCheckInterrupt,lock里面用于阻塞都是基于lockSupper.park()與lockSupper.unpark()完成了,而lockSupper調(diào)用的又是unsafe這個類,我們知道java是基于jvm實現(xiàn)的,并不能和c++一樣直接對os進行操作,所以jvm給我們提供了一個梯子,這個梯子就是unsafe這個類,直接將線程的的交個操作系統(tǒng)阻塞

四,釋放鎖

終于到了釋放鎖,獨占鎖的釋放鎖的邏輯相對與共享鎖來說比較簡單,后續(xù)我也會繼續(xù)更新共享鎖的源碼

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

首先我們來看tryRelease方法,

protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

首先獲取,int c = getState() - releases;這里可能c是>0的,因為獨占鎖的重入鎖(上面以及說明了獨占鎖的重入的源碼操作),所以有可能是需要進行多次解鎖的,繼續(xù)
判斷當(dāng)前線程是不是獨占線程,如果不是則報IllegalMonitorStateException異常
一直解鎖到c=0的時候,那么線程已經(jīng)解鎖,則設(shè)setExclusiveOwnerThread=null
設(shè)置當(dāng)前獨占線程為null,然后設(shè)置state為0

繼續(xù)回到release,如果頭節(jié)點不是null而且h.waitStatus != 0,說明是-1,說明設(shè)置鬧鐘了,需要喚醒aqs隊列中的阻塞結(jié)點,調(diào)用的是unparkSuccessor方法,繼續(xù)看源碼

 private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

這里首先明確,這里傳進來的node是啥?是頭節(jié)點!?。?!,一定要明確這個,博主就是因為剛開始沒明確這個,半天沒明白,因為喚醒一定是喚醒頭節(jié)點之后的waitStatus不為1的結(jié)點

首先判斷頭節(jié)點,如果是-1則設(shè)置為0,這個中間狀態(tài),表示有結(jié)點被喚醒了,
然后拿到,head的后繼節(jié)點,進行判斷,如何是null或者waitStatus >0,(就是1,CANCELLED,代表被取消了),這個時候從尾部開始遍歷,剔除waitStatus >0的節(jié)點,找到第一個waitStatus <=0的節(jié)點,用LockSupport.unpark(s.thread);將其喚醒,喚醒之后的線程回到acquireQueued方法中,

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
            //被阻塞的線程,被喚醒后在進行循環(huán),
            //然后通過return interrupted;
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

首先,將該結(jié)點設(shè)置為head,然后將老head指向null,幫助gc回收,然后return返回,至此線程自由
那么此時該隊列為

20214892854471

五,總結(jié)

寫了很久,如果有啥不對的地方,歡迎大家指正

以上就是 Java 并發(fā)中的 ReentrantLock 和 AQS 源碼的詳細(xì)內(nèi)容,想要了解更多關(guān)于 Java 并發(fā)的 ReentrantLock 和 AQS 的其他資料請關(guān)注W3Cschool其它相關(guān)文章!


0 人點贊