今天 V 哥聊聊并發(fā)數(shù)據(jù)結(jié)構(gòu)的問題,我們知道,并發(fā)編程中,保障數(shù)據(jù)的安全訪問是第一要?jiǎng)?wù),JDK 提供了一系列JUC并發(fā)數(shù)據(jù)結(jié)構(gòu),這些數(shù)據(jù)結(jié)構(gòu)是線程安全的,可以在多線程環(huán)境中使用而無需額外的同步措施。以下是一些主要的并發(fā)數(shù)據(jù)結(jié)構(gòu):
一個(gè)線程安全的哈希表,用于存儲(chǔ)鍵值對(duì)。它在內(nèi)部使用了分段鎖(Segment Locking)或其他形式的并發(fā)控制機(jī)制,允許多個(gè)線程并發(fā)讀寫,同時(shí)保持較高的性能。
ConcurrentHashMap 是 Java 并發(fā)編程中非常重要的一個(gè)線程安全的哈希表實(shí)現(xiàn),它在 java.util.concurrent 包中。ConcurrentHashMap 允許并發(fā)讀和并發(fā)寫,旨在提供比同步的 HashMap 更高的并發(fā)性能。
實(shí)現(xiàn)原理:
在 JDK 1.7 及之前的版本中,ConcurrentHashMap 使用了分段鎖(Segment Locking)機(jī)制。整個(gè)哈希表被分割成多個(gè)段(Segment),每個(gè)段是一個(gè)小的哈希表,它們有自己的鎖。當(dāng)多個(gè)線程訪問不同段的數(shù)據(jù)時(shí),它們可以并發(fā)執(zhí)行,因?yàn)槊總€(gè)段都有自己的鎖。
ConcurrentHashMap 使用了無鎖的 compare-and-swap(CAS)操作來更新數(shù)據(jù),這進(jìn)一步提高了并發(fā)性能。
讀取操作通常不需要加鎖,因?yàn)?ConcurrentHashMap 的設(shè)計(jì)保證了讀取數(shù)據(jù)的可見性和一致性。
在 JDK 1.8 中,ConcurrentHashMap 的實(shí)現(xiàn)發(fā)生了變化,它取消了分段鎖,轉(zhuǎn)而使用了 synchronized 關(guān)鍵字來保護(hù)哈希表的節(jié)點(diǎn)(Node)。同時(shí),它也引入了紅黑樹來處理哈希碰撞導(dǎo)致的鏈表過長的問題,提高了最壞情況下的性能。
作用:
ConcurrentHashMap 的主要作用是在多線程環(huán)境中提供高效的并發(fā)訪問。它適用于以下場景:
示例代碼:
以下是一個(gè)簡單的 ConcurrentHashMap 使用示例:
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ConcurrentHashMapExample {
public static void main(String[] args) throws InterruptedException {
// 創(chuàng)建一個(gè)ConcurrentHashMap
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// 創(chuàng)建一個(gè)線程池
ExecutorService executor = Executors.newFixedThreadPool(10);
// 提交10個(gè)任務(wù)到線程池,每個(gè)任務(wù)都會(huì)更新ConcurrentHashMap
for (int i = 0; i < 10; i++) {
final int taskNumber = i;
executor.submit(() -> {
map.put("key" + taskNumber, taskNumber);
System.out.println("Task " + taskNumber + " put value: " + map.get("key" + taskNumber));
});
}
// 關(guān)閉線程池
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
}
}
在這個(gè)示例中,我們創(chuàng)建了一個(gè) ConcurrentHashMap 并使用一個(gè)線程池來并發(fā)地更新它。每個(gè)任務(wù)都會(huì)向哈希表中插入一個(gè)鍵值對(duì),并打印出對(duì)應(yīng)的值。由于 ConcurrentHashMap 是線程安全的,所以這個(gè)程序可以正確地運(yùn)行而不會(huì)出現(xiàn)并發(fā)問題。
解釋:
這個(gè)示例展示了如何在多線程環(huán)境中安全地使用 ConcurrentHashMap。
實(shí)現(xiàn)原理的代碼分析:
ConcurrentHashMap 是 Java 中的一個(gè)線程安全的哈希表實(shí)現(xiàn),用于存儲(chǔ)鍵值對(duì)。在 Java 1.8 之前,ConcurrentHashMap 使用了分段鎖機(jī)制,而在 Java 1.8 之后,它采用了更為高效的鎖分離技術(shù)。
Java 1.8 之前的實(shí)現(xiàn)原理:
在 Java 1.8 之前,ConcurrentHashMap 使用分段鎖(Segment Locking)機(jī)制。每個(gè) Segment 是一個(gè)可重入的 ReentrantLock,它用于鎖定整個(gè)哈希表的一個(gè)部分。哈希表被分割成多個(gè)段,每個(gè)段有自己的鎖,因此可以同時(shí)進(jìn)行讀寫操作。
ConcurrentHashMap 使用分段鎖來保護(hù)多個(gè)哈希表段。每個(gè)段有一個(gè)自己的鎖,這使得在多線程環(huán)境中可以并發(fā)地讀寫不同的段。
在 Java 1.8 之前,ConcurrentHashMap 在進(jìn)行寫操作時(shí),會(huì)復(fù)制整個(gè)段,而不是整個(gè)哈希表。這減少了加鎖的范圍,提高了并發(fā)性能。
Java 1.8 之后的實(shí)現(xiàn)原理:
在 Java 1.8 中,ConcurrentHashMap 的實(shí)現(xiàn)發(fā)生了變化,它取消了分段鎖,轉(zhuǎn)而使用了 synchronized 關(guān)鍵字來保護(hù)哈希表的節(jié)點(diǎn)(Node)。同時(shí),它也引入了紅黑樹來處理哈希碰撞導(dǎo)致的鏈表過長的問題,提高了最壞情況下的性能。
在 Java 1.8 中,ConcurrentHashMap 使用了一種稱為“鎖分離”的技術(shù)。它將鎖的范圍縮小到鏈表的頭部節(jié)點(diǎn),而不是整個(gè)哈希表或整個(gè)段。這減少了鎖競爭,提高了并發(fā)性能。
為了提高哈希表的性能,ConcurrentHashMap 引入了紅黑樹。當(dāng)鏈表的長度超過某個(gè)閾值時(shí),鏈表會(huì)被轉(zhuǎn)換為紅黑樹,這樣可以減少搜索時(shí)間,提高最壞情況下的性能。
代碼分析:
以下是 ConcurrentHashMap 類的一些關(guān)鍵方法的代碼分析:
put(K key, V value)
:這個(gè)方法用于向 ConcurrentHashMap 中添加一個(gè)鍵值對(duì)。get(Object key)
:這個(gè)方法用于從 ConcurrentHashMap 中獲取與指定鍵關(guān)聯(lián)的值。remove(Object key)
:這個(gè)方法用于從 ConcurrentHashMap 中移除與指定鍵關(guān)聯(lián)的鍵值對(duì)。這些方法都使用了 synchronized 關(guān)鍵字來保護(hù)哈希表的節(jié)點(diǎn)。在 Java 1.8 之前,這些方法會(huì)使用分段鎖來保護(hù)整個(gè)段。而在 Java 1.8 之后,這些方法會(huì)使用鎖分離技術(shù)來保護(hù)鏈表的頭部節(jié)點(diǎn)。
這個(gè)示例展示了如何在多線程環(huán)境中使用 ConcurrentHashMap 來安全地進(jìn)行鍵值對(duì)的添加、獲取和移除操作。由于 ConcurrentHashMap 是線程安全的,所以這個(gè)程序可以正確地運(yùn)行而不會(huì)出現(xiàn)并發(fā)問題。
CopyOnWriteArrayList是一個(gè)線程安全的列表,它在進(jìn)行修改操作(如添加、刪除元素)時(shí)會(huì)創(chuàng)建底層數(shù)組的一個(gè)新副本,從而實(shí)現(xiàn)讀寫分離。適用于讀多寫少的場景。
實(shí)現(xiàn)原理:
CopyOnWriteArrayList 是一個(gè)線程安全的變體,其中所有對(duì)列表的修改(添加、刪除、設(shè)置元素等)都是在底層數(shù)組的一個(gè)副加上進(jìn)行的。這意味著在修改操作發(fā)生時(shí),會(huì)創(chuàng)建一個(gè)新的數(shù)組,并將現(xiàn)有的所有元素復(fù)制到新數(shù)組中,然后在新數(shù)組上進(jìn)行修改。完成修改后,再將內(nèi)部引用指向新數(shù)組。由于寫操作是在新數(shù)組上進(jìn)行的,讀操作可以安全地訪問舊數(shù)組,而不會(huì)受到寫操作的干擾。
這是 CopyOnWriteArrayList 的核心原理。在發(fā)生寫操作時(shí),不直接修改原有數(shù)組,而是復(fù)制出一個(gè)新數(shù)組,修改完成后,再將內(nèi)部引用指向新數(shù)組。
由于讀操作不需要修改數(shù)組,它們可以安全地讀取當(dāng)前的數(shù)組,而不需要任何鎖。這提高了讀取操作的并發(fā)性能。
為了確保寫操作的原子性和一致性,寫操作需要加鎖。這是通過在修改方法(如 add, remove, set)中使用 ReentrantLock 實(shí)現(xiàn)的。
作用:
CopyOnWriteArrayList 的主要作用是在讀多寫少的場景中提供線程安全的列表操作,同時(shí)盡量減少讀操作的鎖競爭。它適用于以下場景:
示例代碼:
以下是一個(gè)簡單的 CopyOnWriteArrayList 使用示例:
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CopyOnWriteArrayListExample {
public static void main(String[] args) {
// 創(chuàng)建一個(gè)CopyOnWriteArrayList
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
// 添加元素
list.add("A");
list.add("B");
list.add("C");
// 創(chuàng)建一個(gè)線程池
ExecutorService executor = Executors.newFixedThreadPool(2);
// 提交一個(gè)任務(wù)到線程池,該任務(wù)會(huì)修改CopyOnWriteArrayList
executor.submit(() -> {
list.add("D");
list.remove("A");
});
// 提交另一個(gè)任務(wù)到線程池,該任務(wù)會(huì)讀取CopyOnWriteArrayList
executor.submit(() -> {
for (String element : list) {
System.out.println(element);
}
});
// 關(guān)閉線程池
executor.shutdown();
}
}
在這個(gè)示例中,我們創(chuàng)建了一個(gè) CopyOnWriteArrayList 并使用一個(gè)線程池來并發(fā)地修改和讀取它。一個(gè)任務(wù)嘗試添加和刪除元素,而另一個(gè)任務(wù)遍歷列表并打印所有元素。由于 CopyOnWriteArrayList 是線程安全的,所以這個(gè)程序可以正確地運(yùn)行而不會(huì)出現(xiàn)并發(fā)問題。
解釋:
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
:創(chuàng)建了一個(gè)新的 CopyOnWriteArrayList 實(shí)例。list.add("A");
和 list.remove("A");
:添加和刪除元素的操作。ExecutorService executor = Executors.newFixedThreadPool(2);
:創(chuàng)建了一個(gè)大小為2的線程池。executor.submit(() -> {...});
:提交了一個(gè) Runnable 任務(wù)到線程池,任務(wù)中修改了 CopyOnWriteArrayList。for (String element : list) {...}
:遍歷列表并打印元素。executor.shutdown();
:關(guān)閉線程池。這個(gè)示例展示了如何在多線程環(huán)境中使用 CopyOnWriteArrayList 來安全地進(jìn)行讀寫操作。由于寫操作相對(duì)昂貴(因?yàn)樾枰獜?fù)制數(shù)組),所以 CopyOnWriteArrayList 適用于讀多寫少的場景。
代碼分析:
以下是 CopyOnWriteArrayList 類的一些關(guān)鍵方法的代碼分析:
add(E e)
:這個(gè)方法用于向 CopyOnWriteArrayList 中添加一個(gè)元素。get(int index)
:這個(gè)方法用于從 CopyOnWriteArrayList 中獲取指定索引的元素。set(int index, E element)
:這個(gè)方法用于將指定索引的元素設(shè)置為新元素。remove(int index)
:這個(gè)方法用于從 CopyOnWriteArrayList 中移除指定索引的元素。這些方法都使用了原子操作來更新數(shù)據(jù),并確保了線程安全。在 add、set 和 remove 方法中,會(huì)創(chuàng)建一個(gè)新數(shù)組,并將現(xiàn)有元素復(fù)制到新數(shù)組中,然后修改新數(shù)組。完成修改后,再將內(nèi)部引用指向新數(shù)組。
與 CopyOnWriteArrayList 類似,但它存儲(chǔ)的是不包含重復(fù)元素的集合。
實(shí)現(xiàn)原理:
CopyOnWriteArraySet 是一個(gè)線程安全的變體,其中所有對(duì)集合的修改(添加、刪除元素等)都是在底層數(shù)組的一個(gè)副加上進(jìn)行的。這意味著在修改操作發(fā)生時(shí),會(huì)創(chuàng)建一個(gè)新的數(shù)組,然后將現(xiàn)有的所有元素復(fù)制到新數(shù)組中,并在新數(shù)組上進(jìn)行修改。完成修改后,再將內(nèi)部引用指向新數(shù)組。由于寫操作是在新數(shù)組上進(jìn)行的,讀操作可以安全地訪問舊數(shù)組,而不會(huì)受到寫操作的干擾。
這是 CopyOnWriteArraySet 的核心原理。在發(fā)生寫操作時(shí),不直接修改原有數(shù)組,而是復(fù)制出一個(gè)新數(shù)組,修改完成后,再將內(nèi)部引用指向新數(shù)組。
由于讀操作不需要修改數(shù)組,它們可以安全地讀取當(dāng)前的數(shù)組,而不需要任何鎖。這提高了讀取操作的并發(fā)性能。
為了確保寫操作的原子性和一致性,寫操作需要加鎖。這是通過在修改方法(如 add, remove)中使用 ReentrantLock 實(shí)現(xiàn)的。
作用:
CopyOnWriteArraySet 的主要作用是在讀多寫少的場景中提供線程安全的集合操作,同時(shí)盡量減少讀操作的鎖競爭。它適用于以下場景:
示例代碼:
以下是一個(gè)簡單的 CopyOnWriteArraySet 使用示例:
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CopyOnWriteArraySetExample {
public static void main(String[] args) {
// 創(chuàng)建一個(gè)CopyOnWriteArraySet
CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>();
// 添加元素
set.add("A");
set.add("B");
set.add("C");
// 創(chuàng)建一個(gè)線程池
ExecutorService executor = Executors.newFixedThreadPool(2);
// 提交一個(gè)任務(wù)到線程池,該任務(wù)會(huì)修改CopyOnWriteArraySet
executor.submit(() -> {
set.add("D");
set.remove("A");
});
// 提交另一個(gè)任務(wù)到線程池,該任務(wù)會(huì)讀取CopyOnWriteArraySet
executor.submit(() -> {
for (String element : set) {
System.out.println(element);
}
});
// 關(guān)閉線程池
executor.shutdown();
}
}
在這個(gè)示例中,我們創(chuàng)建了一個(gè) CopyOnWriteArraySet 并使用一個(gè)線程池來并發(fā)地修改和讀取它。一個(gè)任務(wù)嘗試添加和刪除元素,而另一個(gè)任務(wù)遍歷集合并打印所有元素。由于 CopyOnWriteArraySet 是線程安全的,所以這個(gè)程序可以正確地運(yùn)行而不會(huì)出現(xiàn)并發(fā)問題。
解釋:
CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>();
:創(chuàng)建了一個(gè)新的 CopyOnWriteArraySet 實(shí)例。set.add("A");
和 set.remove("A");
:添加和刪除元素的操作。ExecutorService executor = Executors.newFixedThreadPool(2);
:創(chuàng)建了一個(gè)大小為2的線程池。executor.submit(() -> {...});
:提交了一個(gè) Runnable 任務(wù)到線程池,任務(wù)中修改了 CopyOnWriteArraySet。for (String element : set) {...}
:遍歷集合并打印元素。executor.shutdown();
:關(guān)閉線程池。這個(gè)示例展示了如何在多線程環(huán)境中使用 CopyOnWriteArraySet 來安全地進(jìn)行讀寫操作。由于寫操作相對(duì)昂貴(因?yàn)樾枰獜?fù)制數(shù)組),所以 CopyOnWriteArraySet 適用于讀多寫少的場景,并且需要集合中元素不重復(fù)的特性。
代碼分析:
以下是 CopyOnWriteArraySet 類的一些關(guān)鍵方法的代碼分析:
add(E e)
:這個(gè)方法用于向 CopyOnWriteArraySet 中添加一個(gè)元素。contains(Object o)
:這個(gè)方法用于檢查集合中是否包含指定元素。size()
:這個(gè)方法用于獲取集合中元素的數(shù)量。clear()
:這個(gè)方法用于清空集合中的所有元素。這些方法都使用了原子操作來更新數(shù)據(jù),并確保了線程安全。在 add 方法中,會(huì)創(chuàng)建一個(gè)新數(shù)組,并將現(xiàn)有元素復(fù)制到新數(shù)組中。如果新數(shù)組中已經(jīng)存在相同的元素,則不會(huì)添加該元素。完成修改后,再將內(nèi)部引用指向新數(shù)組。
一個(gè)線程安全的無界非阻塞隊(duì)列,基于鏈表實(shí)現(xiàn)。它使用原子操作來保證線程安全,適合在高并發(fā)環(huán)境下使用。
實(shí)現(xiàn)原理:
ConcurrentLinkedQueue 是一個(gè)基于鏈表實(shí)現(xiàn)的線程安全的無界非阻塞隊(duì)列。它使用原子操作來保證線程安全,適合在高并發(fā)環(huán)境下使用。
ConcurrentLinkedQueue 實(shí)現(xiàn)了 Queue 接口,提供了一組原子操作來支持隊(duì)列的基本功能,如入隊(duì)(offer)、出隊(duì)(poll)等,這些操作都是非阻塞的。
ConcurrentLinkedQueue 沒有容量限制,理論上可以無限增長,直到耗盡內(nèi)存。
ConcurrentLinkedQueue 使用了 compare-and-swap(CAS)操作來更新鏈表節(jié)點(diǎn),這保證了在多線程環(huán)境下的線程安全。
ConcurrentLinkedQueue 使用了無鎖算法,避免了鎖競爭帶來的性能開銷。
作用:
ConcurrentLinkedQueue 的主要作用是在多線程環(huán)境中提供一個(gè)高效且線程安全的隊(duì)列。它適用于以下場景:
示例代碼:
以下是一個(gè)簡單的 ConcurrentLinkedQueue 使用示例:
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ConcurrentLinkedQueueExample {
public static void main(String[] args) {
// 創(chuàng)建一個(gè)ConcurrentLinkedQueue
ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
// 創(chuàng)建一個(gè)線程池
ExecutorService executor = Executors.newFixedThreadPool(10);
// 提交任務(wù)到線程池,生產(chǎn)者線程
for (int i = 0; i < 5; i++) {
final int taskNumber = i;
executor.submit(() -> {
queue.offer(taskNumber);
System.out.println("Task " + taskNumber + " added to queue");
});
}
// 提交任務(wù)到線程池,消費(fèi)者線程
for (int i = 0; i < 5; i++) {
executor.submit(() -> {
Integer element = queue.poll();
if (element != null) {
System.out.println("Task " + element + " removed from queue");
}
});
}
// 關(guān)閉線程池
executor.shutdown();
}
}
在這個(gè)示例中,我們創(chuàng)建了一個(gè) ConcurrentLinkedQueue 并使用一個(gè)線程池來模擬生產(chǎn)者和消費(fèi)者。生產(chǎn)者線程向隊(duì)列中添加元素,而消費(fèi)者線程從隊(duì)列中移除元素。由于 ConcurrentLinkedQueue 是線程安全的,所以這個(gè)程序可以正確地運(yùn)行而不會(huì)出現(xiàn)并發(fā)問題。
解釋:
ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
:創(chuàng)建了一個(gè)新的 ConcurrentLinkedQueue 實(shí)例。queue.offer(taskNumber);
:生產(chǎn)者線程將元素添加到隊(duì)列尾部。Integer element = queue.poll();
:消費(fèi)者線程從隊(duì)列頭部移除元素。ExecutorService executor = Executors.newFixedThreadPool(10);
:創(chuàng)建了一個(gè)大小為10的線程池。executor.submit(() -> {...});
:提交了生產(chǎn)者和消費(fèi)者任務(wù)到線程池。executor.shutdown();
:關(guān)閉線程池。這個(gè)示例展示了如何在多線程環(huán)境中使用 ConcurrentLinkedQueue 來安全地進(jìn)行生產(chǎn)者和消費(fèi)者操作。由于 ConcurrentLinkedQueue 是無界的且非阻塞的,它適合用于生產(chǎn)者和消費(fèi)者數(shù)量不固定,或者需要高并發(fā)處理的場景。
代碼分析:
以下是 ConcurrentLinkedQueue 類的一些關(guān)鍵方法的代碼分析:
offer(E e)
:這個(gè)方法用于向 ConcurrentLinkedQueue 中添加一個(gè)元素。如果隊(duì)列已滿,該方法將返回 false。poll()
:這個(gè)方法用于從 ConcurrentLinkedQueue 中移除并返回第一個(gè)元素。如果隊(duì)列為空,該方法將返回 null。peek()
:這個(gè)方法用于返回 ConcurrentLinkedQueue 中第一個(gè)元素,但不從隊(duì)列中移除它。如果隊(duì)列為空,該方法將返回 null。size()
:這個(gè)方法用于返回 ConcurrentLinkedQueue 中元素的數(shù)量。這些方法都使用了原子操作來更新鏈表節(jié)點(diǎn),并確保了線程安全。在 offer 方法中,會(huì)使用 CAS 操作將新元素添加到鏈表的尾部。在 poll 方法中,會(huì)使用 CAS 操作從鏈表的頭部移除元素。
一個(gè)線程安全的雙端隊(duì)列,也是基于鏈表實(shí)現(xiàn),適用于需要從兩端插入和刪除元素的場景。
實(shí)現(xiàn)原理:
ConcurrentLinkedDeque 是一個(gè)基于鏈表實(shí)現(xiàn)的線程安全的雙端隊(duì)列。它支持在隊(duì)列的首尾進(jìn)行插入和刪除操作,并且是線程安全的。
ConcurrentLinkedDeque 實(shí)現(xiàn)了 Deque 接口,提供了一組原子操作來支持雙端隊(duì)列的基本功能,如從頭部插入(addFirst)、從尾部插入(addLast)、從頭部移除(removeFirst)、從尾部移除(removeLast)等。
ConcurrentLinkedDeque 沒有容量限制,理論上可以無限增長,直到耗盡內(nèi)存。
ConcurrentLinkedDeque 使用了 compare-and-swap(CAS)操作來更新鏈表節(jié)點(diǎn),這保證了在多線程環(huán)境下的線程安全。
ConcurrentLinkedDeque 使用了無鎖算法,避免了鎖競爭帶來的性能開銷。
作用:
ConcurrentLinkedDeque 的主要作用是在多線程環(huán)境中提供一個(gè)高效且線程安全的雙端隊(duì)列。它適用于以下場景:
示例代碼:
以下是一個(gè)簡單的 ConcurrentLinkedDeque 使用示例:
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ConcurrentLinkedDequeExample {
public static void main(String[] args) {
// 創(chuàng)建一個(gè)ConcurrentLinkedDeque
ConcurrentLinkedDeque<String> deque = new ConcurrentLinkedDeque<>();
// 創(chuàng)建一個(gè)線程池
ExecutorService executor = Executors.newFixedThreadPool(10);
// 提交任務(wù)到線程池,生產(chǎn)者線程
for (int i = 0; i < 5; i++) {
final int taskNumber = i;
executor.submit(() -> {
deque.addFirst("Task " + taskNumber);
System.out.println("Task " + taskNumber + " added to the front of the deque");
});
}
// 提交任務(wù)到線程池,消費(fèi)者線程
for (int i = 0; i < 5; i++) {
executor.submit(() -> {
String element = deque.removeLast();
if (element != null) {
System.out.println(element + " removed from the end of the deque");
}
});
}
// 關(guān)閉線程池
executor.shutdown();
}
}
在這個(gè)示例中,我們創(chuàng)建了一個(gè) ConcurrentLinkedDeque 并使用一個(gè)線程池來模擬生產(chǎn)者和消費(fèi)者。生產(chǎn)者線程向隊(duì)列的頭部添加元素,而消費(fèi)者線程從隊(duì)列的尾部移除元素。由于 ConcurrentLinkedDeque 是線程安全的,所以這個(gè)程序可以正確地運(yùn)行而不會(huì)出現(xiàn)并發(fā)問題。
解釋:
ConcurrentLinkedDeque<String> deque = new ConcurrentLinkedDeque<>();
:創(chuàng)建了一個(gè)新的 ConcurrentLinkedDeque 實(shí)例。deque.addFirst("Task " + taskNumber);
:生產(chǎn)者線程將元素添加到隊(duì)列的頭部。String element = deque.removeLast();
:消費(fèi)者線程從隊(duì)列的尾部移除元素。ExecutorService executor = Executors.newFixedThreadPool(10);
:創(chuàng)建了一個(gè)大小為10的線程池。executor.submit(() -> {...});
:提交了生產(chǎn)者和消費(fèi)者任務(wù)到線程池。executor.shutdown();
:關(guān)閉線程池。這個(gè)示例展示了如何在多線程環(huán)境中使用 ConcurrentLinkedDeque 來安全地進(jìn)行生產(chǎn)者和消費(fèi)者操作。由于 ConcurrentLinkedDeque 是無界的且非阻塞的,它適合用于生產(chǎn)者和消費(fèi)者數(shù)量不固定,或者需要高并發(fā)處理的場景,并且需要雙端隊(duì)列的特性。
代碼分析:
以下是 ConcurrentLinkedDeque 類的一些關(guān)鍵方法的代碼分析:
addFirst(E e)
:這個(gè)方法用于在 ConcurrentLinkedDeque 的頭部添加一個(gè)元素。addLast(E e)
:這個(gè)方法用于在 ConcurrentLinkedDeque 的尾部添加一個(gè)元素。removeFirst()
:這個(gè)方法用于從 ConcurrentLinkedDeque 的頭部移除并返回第一個(gè)元素。如果隊(duì)列為空,該方法將返回 null。removeLast()
:這個(gè)方法用于從 ConcurrentLinkedDeque 的尾部移除并返回最后一個(gè)元素。如果隊(duì)列為空,該方法將返回 null。這些方法都使用了原子操作來更新鏈表節(jié)點(diǎn),并確保了線程安全。在 addFirst 和 addLast 方法中,會(huì)使用 CAS 操作將新元素添加到鏈表的頭部或尾部。在 removeFirst 和 removeLast 方法中,會(huì)使用 CAS 操作從鏈表的頭部或尾部移除元素。
(如 ArrayBlockingQueue, LinkedBlockingQueue, PriorityBlockingQueue 等)提供了線程安全的隊(duì)列操作,支持阻塞的插入和獲取操作。當(dāng)隊(duì)列滿時(shí),插入操作會(huì)阻塞;當(dāng)隊(duì)列空時(shí),獲取操作會(huì)阻塞。
實(shí)現(xiàn)原理:
BlockingQueue 是一個(gè)支持阻塞操作的隊(duì)列。當(dāng)隊(duì)列滿時(shí),插入操作會(huì)阻塞;當(dāng)隊(duì)列空時(shí),獲取操作會(huì)阻塞。它實(shí)現(xiàn)了生產(chǎn)者-消費(fèi)者模式,用于線程間的數(shù)據(jù)共享。
BlockingQueue 提供了阻塞的 put 和 take 方法,這些方法在隊(duì)列滿或空時(shí)會(huì)使線程進(jìn)入等待狀態(tài),直到隊(duì)列有空閑空間或數(shù)據(jù)可用。
BlockingQueue 的實(shí)現(xiàn)類通常使用鎖(如 ReentrantLock)和條件變量(如 Condition)來實(shí)現(xiàn)線程同步。
BlockingQueue 通常有固定的容量限制,但也有一些實(shí)現(xiàn)(如 LinkedBlockingQueue)允許指定最大容量,如果沒有指定,則默認(rèn)為 Integer.MAX_VALUE。
作用:
BlockingQueue 的主要作用是在多線程環(huán)境中提供一個(gè)線程安全的隊(duì)列,用于生產(chǎn)者和消費(fèi)者之間的數(shù)據(jù)傳遞。它適用于以下場景:
示例代碼:
以下是一個(gè)簡單的 BlockingQueue 使用示例,使用 ArrayBlockingQueue 作為實(shí)現(xiàn):
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class BlockingQueueExample {
public static void main(String[] args) {
// 創(chuàng)建一個(gè)容量為10的ArrayBlockingQueue
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
// 創(chuàng)建一個(gè)線程池
ExecutorService executor = Executors.newFixedThreadPool(2);
// 提交生產(chǎn)者任務(wù)到線程池
executor.submit(() -> {
try {
for (int i = 0; i < 20; i++) {
queue.put("Task " + i);
System.out.println("Task " + i + " added to the queue");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 提交消費(fèi)者任務(wù)到線程池
executor.submit(() -> {
try {
for (int i = 0; i < 20; i++) {
String task = queue.take();
System.out.println(task + " removed from the queue");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 關(guān)閉線程池
executor.shutdown();
}
}
在這個(gè)示例中,我們創(chuàng)建了一個(gè) ArrayBlockingQueue 并使用一個(gè)線程池來模擬生產(chǎn)者和消費(fèi)者。生產(chǎn)者線程向隊(duì)列中添加元素,而消費(fèi)者線程從隊(duì)列中移除元素。由于 ArrayBlockingQueue 是線程安全的,所以這個(gè)程序可以正確地運(yùn)行而不會(huì)出現(xiàn)并發(fā)問題。
解釋:
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
:創(chuàng)建了一個(gè)容量為10的 ArrayBlockingQueue 實(shí)例。queue.put("Task " + i);
:生產(chǎn)者線程將元素添加到隊(duì)列中,如果隊(duì)列已滿,則線程會(huì)阻塞。String task = queue.take();
:消費(fèi)者線程從隊(duì)列中移除元素,如果隊(duì)列空,則線程會(huì)阻塞。ExecutorService executor = Executors.newFixedThreadPool(2);
:創(chuàng)建了一個(gè)大小為2的線程池。executor.submit(() -> {...});
:提交了生產(chǎn)者和消費(fèi)者任務(wù)到線程池。executor.shutdown();
:關(guān)閉線程池。這個(gè)示例展示了如何在多線程環(huán)境中使用 BlockingQueue 來安全地進(jìn)行生產(chǎn)者和消費(fèi)者操作。由于 ArrayBlockingQueue 是有界的,它在隊(duì)列滿時(shí)會(huì)阻塞生產(chǎn)者,在隊(duì)列空時(shí)會(huì)阻塞消費(fèi)者,適合用于需要阻塞隊(duì)列的場景。
代碼分析:
以下是 BlockingQueue 接口的一些關(guān)鍵方法的代碼分析:
put(E e)
:這個(gè)方法用于向 BlockingQueue 中添加一個(gè)元素。如果隊(duì)列已滿,該方法會(huì)阻塞,直到隊(duì)列有空閑空間。take()
:這個(gè)方法用于從 BlockingQueue 中移除并返回第一個(gè)元素。如果隊(duì)列為空,該方法會(huì)阻塞,直到隊(duì)列有數(shù)據(jù)可用。offer(E e)
:這個(gè)方法用于向 BlockingQueue 中添加一個(gè)元素。如果隊(duì)列已滿,該方法將返回 false。poll(long timeout, TimeUnit unit)
:這個(gè)方法用于從 BlockingQueue 中移除并返回第一個(gè)元素。如果隊(duì)列為空,該方法將在指定的時(shí)間內(nèi)阻塞,如果超時(shí)則返回 null。這些方法都使用了鎖和條件變量來實(shí)現(xiàn)線程同步。在 put 和 offer 方法中,會(huì)使用鎖來保護(hù)隊(duì)列,并使用條件變量來阻塞線程。在 take 和 poll 方法中,會(huì)使用鎖來保護(hù)隊(duì)列,并使用條件變量來喚醒等待的線程。
分別是線程安全的有序映射和有序集,基于跳表(Skip List)實(shí)現(xiàn),提供了高效的查找、插入和刪除操作。
實(shí)現(xiàn)原理:
ConcurrentSkipListMap 是一個(gè)線程安全的有序映射,它基于跳表(Skip List)數(shù)據(jù)結(jié)構(gòu)實(shí)現(xiàn)。跳表是一種平衡樹結(jié)構(gòu),它結(jié)合了紅黑樹和有序鏈表的特點(diǎn),提供了一種高效的數(shù)據(jù)結(jié)構(gòu),可以進(jìn)行快速的查找、插入和刪除操作。
跳表包含多層索引,每一層索引都是有序的鏈表。最底層是最簡單的有序鏈表,高層索引包含指向下一層索引的指針。通過這些指針,可以快速跳過大量節(jié)點(diǎn),從而提高查找、插入和刪除操作的效率。
ConcurrentSkipListMap 使用 ReentrantLock 來保證線程安全。當(dāng)多個(gè)線程同時(shí)進(jìn)行修改操作時(shí),它們會(huì)競爭鎖。
讀取操作(如 get)通常不需要加鎖,因?yàn)樘淼慕Y(jié)構(gòu)保證了讀取操作的可見性和一致性。 作用:
ConcurrentSkipListMap 的主要作用是在多線程環(huán)境中提供高效的并發(fā)訪問,同時(shí)保持元素的自然順序。它適用于以下場景:
示例代碼:
以下是一個(gè)簡單的 ConcurrentSkipListMap 使用示例:
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ConcurrentSkipListMapExample {
public static void main(String[] args) {
// 創(chuàng)建一個(gè)ConcurrentSkipListMap
ConcurrentSkipListMap<String, Integer> map = new ConcurrentSkipListMap<>();
// 創(chuàng)建一個(gè)線程池
ExecutorService executor = Executors.newFixedThreadPool(2);
// 提交任務(wù)到線程池,生產(chǎn)者線程
executor.submit(() -> {
for (int i = 0; i < 20; i++) {
map.put("Key " + i, i);
System.out.println("Task " + i + " added to the map");
}
});
// 提交任務(wù)到線程池,消費(fèi)者線程
executor.submit(() -> {
for (int i = 0; i < 20; i++) {
Integer value = map.get("Key " + i);
if (value != null) {
System.out.println("Task " + value + " found in the map");
}
}
});
// 關(guān)閉線程池
executor.shutdown();
}
}
在這個(gè)示例中,我們創(chuàng)建了一個(gè) ConcurrentSkipListMap 并使用一個(gè)線程池來模擬生產(chǎn)者和消費(fèi)者。生產(chǎn)者線程向映射中添加元素,而消費(fèi)者線程從映射中查找元素。由于 ConcurrentSkipListMap 是線程安全的,所以這個(gè)程序可以正確地運(yùn)行而不會(huì)出現(xiàn)并發(fā)問題。
解釋:
ConcurrentSkipListMap<String, Integer> map = new ConcurrentSkipListMap<>();
:創(chuàng)建了一個(gè)新的 ConcurrentSkipListMap 實(shí)例。map.put("Key " + i, i);
:生產(chǎn)者線程將鍵值對(duì)添加到映射中。Integer value = map.get("Key " + i);
:消費(fèi)者線程從映射中查找特定鍵對(duì)應(yīng)的值。ExecutorService executor = Executors.newFixedThreadPool(2);
:創(chuàng)建了一個(gè)大小為2的線程池。executor.submit(() -> {...});
:提交了生產(chǎn)者和消費(fèi)者任務(wù)到線程池。executor.shutdown();
:關(guān)閉
代碼分析:
以下是 ConcurrentSkipListMap 和 ConcurrentSkipListSet 類的一些關(guān)鍵方法的代碼分析:
put(K key, V value)
:這個(gè)方法用于向 ConcurrentSkipListMap 中添加一個(gè)鍵值對(duì)。get(Object key)
:這個(gè)方法用于從 ConcurrentSkipListMap 中獲取與指定鍵關(guān)聯(lián)的值。remove(Object key)
:這個(gè)方法用于從 ConcurrentSkipListMap 中移除與指定鍵關(guān)聯(lián)的鍵值對(duì)。這些方法都使用了 ReentrantLock 來保護(hù)跳表的修改操作,并確保線程安全。在 put、get 和 remove 方法中,會(huì)使用跳表的數(shù)據(jù)結(jié)構(gòu)進(jìn)行查找、插入和刪除操作。
這些方法同樣使用了 ReentrantLock 來保護(hù)跳表的修改操作,并確保線程安全。在 add、contains 和 remove 方法中,會(huì)使用跳表的數(shù)據(jù)結(jié)構(gòu)進(jìn)行查找、插入和刪除操作。
一個(gè)同步輔助類,允許一個(gè)或多個(gè)線程等待其他線程完成操作,可用于實(shí)現(xiàn)并發(fā)同步。
實(shí)現(xiàn)原理:
CountDownLatch 是一個(gè)同步輔助類,用于實(shí)現(xiàn)線程之間的等待/通知模式。它允許一個(gè)或多個(gè)線程等待直到一系列操作在其他線程中完成。
CountDownLatch 使用一個(gè)計(jì)數(shù)器來跟蹤完成操作的線程數(shù)量。初始時(shí),計(jì)數(shù)器的值等于線程的數(shù)量。
當(dāng)調(diào)用 CountDownLatch 的 await 方法時(shí),當(dāng)前線程會(huì)阻塞,直到計(jì)數(shù)器值為零。
其他線程通過調(diào)用 CountDownLatch 的 countDown 方法來遞減計(jì)數(shù)器的值。每個(gè)線程在完成自己的操作后調(diào)用此方法。
作用:
CountDownLatch 的主要作用是在多線程環(huán)境中提供一個(gè)同步點(diǎn),使得主線程可以等待其他線程完成各自的任務(wù)后再繼續(xù)執(zhí)行。它適用于以下場景:
示例代碼:
以下是一個(gè)簡單的 CountDownLatch 使用示例:
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CountDownLatchExample {
public static void main(String[] args) {
// 創(chuàng)建一個(gè)CountDownLatch,初始值為5
CountDownLatch latch = new CountDownLatch(5);
// 創(chuàng)建一個(gè)線程池
ExecutorService executor = Executors.newFixedThreadPool(5);
// 提交任務(wù)到線程池,每個(gè)任務(wù)都會(huì)遞減latch的計(jì)數(shù)器
for (int i = 0; i < 5; i++) {
final int taskNumber = i;
executor.submit(() -> {
System.out.println("Task " + taskNumber + " is running");
latch.countDown();
System.out.println("Task " + taskNumber + " is completed");
});
}
// 主線程等待latch的計(jì)數(shù)器歸零后繼續(xù)執(zhí)行
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("All tasks are completed, main thread continues");
// 關(guān)閉線程池
executor.shutdown();
}
}
在這個(gè)示例中,我們創(chuàng)建了一個(gè) CountDownLatch 并使用一個(gè)線程池來模擬5個(gè)并發(fā)任務(wù)。每個(gè)任務(wù)在完成自己的操作后都會(huì)遞減 CountDownLatch 的計(jì)數(shù)器。主線程在調(diào)用 latch.await() 方法時(shí)會(huì)阻塞,直到計(jì)數(shù)器歸零。當(dāng)所有任務(wù)完成后,主線程繼續(xù)執(zhí)行。
解釋:
CountDownLatch latch = new CountDownLatch(5);
:創(chuàng)建了一個(gè)初始值為5的 CountDownLatch 實(shí)例。latch.countDown();
:每個(gè)線程在完成任務(wù)后調(diào)用此方法遞減計(jì)數(shù)器的值。latch.await();
:主線程在調(diào)用此方法時(shí)會(huì)阻塞,直到計(jì)數(shù)器的值為零。ExecutorService executor = Executors.newFixedThreadPool(5);
:創(chuàng)建了一個(gè)大小為5的線程池。executor.submit(() -> {...});
:提交了5個(gè)任務(wù)到線程池。executor.shutdown();
:關(guān)閉線程池。這個(gè)示例展示了如何在多線程環(huán)境中使用 CountDownLatch 來確保主線程等待所有子線程完成任務(wù)后再繼續(xù)執(zhí)行。由于 CountDownLatch 提供了線程間的同步點(diǎn),它適合用于需要等待多個(gè)線程完成任務(wù)的場景。
代碼分析:
以下是 CountDownLatch 類的一些關(guān)鍵方法的代碼分析:
CountDownLatch(int count)
: 這個(gè)構(gòu)造方法用于創(chuàng)建一個(gè) CountDownLatch 對(duì)象,并初始化計(jì)數(shù)器的值。await()
: 這個(gè)方法用于使當(dāng)前線程等待,直到計(jì)數(shù)器的值為零。如果計(jì)數(shù)器的值不為零,當(dāng)前線程會(huì)阻塞。countDown()
: 這個(gè)方法用于遞減計(jì)數(shù)器的值。每個(gè)線程在完成自己的操作后調(diào)用此方法。getCount()
: 這個(gè)方法用于獲取當(dāng)前計(jì)數(shù)器的值。isLatchOpen()
: 這個(gè)方法用于檢查計(jì)數(shù)器的值是否為零。如果計(jì)數(shù)器的值為零,則返回 true,否則返回 false。這些方法都使用了 Object 類的 wait() 和 notify() 方法來實(shí)現(xiàn)線程間的同步。當(dāng)線程到達(dá) await 方法時(shí),它會(huì)調(diào)用 wait() 方法,這會(huì)導(dǎo)致線程進(jìn)入等待狀態(tài)。當(dāng)其他線程調(diào)用 countDown() 方法并遞減計(jì)數(shù)器的值時(shí),會(huì)調(diào)用 notify() 方法來喚醒等待的線程。
一個(gè)允許一組線程互相等待的同步輔助類,直到所有線程都達(dá)到某個(gè)屏障點(diǎn)后才繼續(xù)執(zhí)行。
實(shí)現(xiàn)原理:
CyclicBarrier 是一個(gè)同步輔助類,用于讓一組線程到達(dá)一個(gè)屏障(barrier)時(shí)被阻塞,直到最后一個(gè)線程到達(dá)屏障時(shí),屏障才會(huì)開門,所有被屏障攔截的線程才會(huì)繼續(xù)運(yùn)行。CyclicBarrier 的名稱中的 “Cyclic” 指的是它可以被重用。當(dāng)所有參與者到達(dá)屏障時(shí),它們會(huì)執(zhí)行 barrierAction 指定的動(dòng)作,這個(gè)動(dòng)作只會(huì)被最后一個(gè)到達(dá)屏障的線程執(zhí)行。
CyclicBarrier 使用一個(gè)內(nèi)部計(jì)數(shù)器來跟蹤到達(dá)屏障的線程數(shù)量。
當(dāng)線程到達(dá)屏障時(shí),它會(huì)阻塞,直到計(jì)數(shù)器的值為零。
CyclicBarrier 允許在屏障打開后重新使用它,而不是每次使用后都必須創(chuàng)建一個(gè)新的。
作用:
CyclicBarrier 的主要作用是在多線程環(huán)境中提供一個(gè)線程間的同步點(diǎn),使得一組線程在完成各自的任務(wù)后,能夠同時(shí)繼續(xù)執(zhí)行后續(xù)操作。它適用于以下場景:
示例代碼:
以下是一個(gè)簡單的 CyclicBarrier 使用示例:
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CyclicBarrierExample {
public static void main(String[] args) {
// 創(chuàng)建一個(gè)CyclicBarrier,初始值為5
CyclicBarrier barrier = new CyclicBarrier(5);
// 創(chuàng)建一個(gè)線程池
ExecutorService executor = Executors.newFixedThreadPool(5);
// 提交任務(wù)到線程池,每個(gè)任務(wù)都會(huì)到達(dá)屏障并執(zhí)行后續(xù)操作
for (int i = 0; i < 5; i++) {
final int taskNumber = i;
executor.submit(() -> {
System.out.println("Task " + taskNumber + " is running");
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
}
System.out.println("Task " + taskNumber + " is completed");
});
}
// 主線程等待所有任務(wù)完成后繼續(xù)執(zhí)行
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
}
System.out.println("All tasks are completed, main thread continues");
// 關(guān)閉線程池
executor.shutdown();
}
}
在這個(gè)示例中,我們創(chuàng)建了一個(gè) CyclicBarrier 并使用一個(gè)線程池來模擬5個(gè)并發(fā)任務(wù)。每個(gè)任務(wù)在完成自己的操作后會(huì)到達(dá)屏障并等待其他任務(wù)也到達(dá)屏障。當(dāng)所有任務(wù)都到達(dá)屏障時(shí),它們會(huì)繼續(xù)執(zhí)行后續(xù)操作。主線程在調(diào)用 barrier.await() 方法時(shí)會(huì)阻塞,直到所有子線程都到達(dá)屏障。
解釋:
CyclicBarrier barrier = new CyclicBarrier(5);
:創(chuàng)建了一個(gè)初始值為5的 CyclicBarrier 實(shí)例。barrier.await();
:每個(gè)線程在完成任務(wù)后調(diào)用此方法到達(dá)屏障并等待其他線程。ExecutorService executor = Executors.newFixedThreadPool(5);
:創(chuàng)建了一個(gè)大小為5的線程池。executor.submit(() -> {...});
:提交了5個(gè)任務(wù)到線程池。executor.shutdown();
:關(guān)閉線程池。這個(gè)示例展示了如何在多線程環(huán)境中使用 CyclicBarrier 來確保一組線程在完成各自的任務(wù)后,能夠同時(shí)繼續(xù)執(zhí)行后續(xù)操作。由于 CyclicBarrier 提供了線程間的同步點(diǎn),它適合用于需要線程同步執(zhí)行的場景。
代碼分析:
以下是 CyclicBarrier 類的一些關(guān)鍵方法的代碼分析:
CyclicBarrier(int parties)
: 這個(gè)構(gòu)造方法用于創(chuàng)建一個(gè) CyclicBarrier 對(duì)象,并指定屏障的參與者數(shù)量。await()
: 這個(gè)方法用于使當(dāng)前線程等待,直到計(jì)數(shù)器的值為零。如果計(jì)數(shù)器的值不為零,當(dāng)前線程會(huì)阻塞。await(long timeout, TimeUnit unit)
: 這個(gè)方法與 await() 類似,但它允許設(shè)置一個(gè)超時(shí)時(shí)間。如果其他線程在超時(shí)時(shí)間內(nèi)還沒有到達(dá)屏障,當(dāng)前線程將返回 false。reset()
: 這個(gè)方法用于重置屏障,將其計(jì)數(shù)器的值重置為初始值。getNumberWaiting()
: 這個(gè)方法用于獲取當(dāng)前等待在屏障上的線程數(shù)量。getParties()
: 這個(gè)方法用于獲取屏障的參與者數(shù)量。isBroken()
: 這個(gè)方法用于檢查屏障是否被破壞。如果屏障被破壞,所有等待的線程都會(huì)被中斷。這些方法都使用了 Object 類的 wait() 和 notify() 方法來實(shí)現(xiàn)線程間的同步。當(dāng)線程到達(dá) await 方法時(shí),它會(huì)調(diào)用 wait() 方法,這會(huì)導(dǎo)致線程進(jìn)入等待狀態(tài)。當(dāng)其他線程到達(dá)屏障時(shí),會(huì)調(diào)用 notify() 方法來喚醒等待的線程。
一個(gè)計(jì)數(shù)信號(hào)量,可以用來限制可以同時(shí)訪問某個(gè)特定資源的線程數(shù)量。
實(shí)現(xiàn)原理:
Exchanger 是一個(gè)同步輔助類,用于實(shí)現(xiàn)兩個(gè)線程間的數(shù)據(jù)交換。當(dāng)兩個(gè)線程都到達(dá) Exchanger 指定的交換點(diǎn)時(shí),它們可以交換彼此的數(shù)據(jù)。如果只有一個(gè)線程到達(dá)交換點(diǎn),它會(huì)阻塞,直到另一個(gè)線程也到達(dá)交換點(diǎn)。
Exchanger 使用一個(gè)內(nèi)部同步機(jī)制來跟蹤到達(dá)交換點(diǎn)的線程數(shù)量。
當(dāng)線程到達(dá)交換點(diǎn)時(shí),它會(huì)阻塞,直到另一個(gè)線程也到達(dá)交換點(diǎn)。
當(dāng)兩個(gè)線程都到達(dá)交換點(diǎn)時(shí),它們可以交換彼此的數(shù)據(jù)。
作用:
Exchanger 的主要作用是在多線程環(huán)境中提供一個(gè)線程間的同步點(diǎn),使得兩個(gè)線程可以交換數(shù)據(jù)。它適用于以下場景:
示例代碼:
以下是一個(gè)簡單的 Exchanger 使用示例:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Exchanger;
public class ExchangerExample {
public static void main(String[] args) {
// 創(chuàng)建一個(gè)Exchanger
Exchanger<String> exchanger = new Exchanger<>();
// 創(chuàng)建一個(gè)線程池
ExecutorService executor = Executors.newFixedThreadPool(2);
// 提交任務(wù)到線程池,第一個(gè)線程會(huì)生成數(shù)據(jù)并等待交換
executor.submit(() -> {
String data = "Data from the first thread";
try {
String receivedData = exchanger.exchange(data);
System.out.println("Received data from the second thread: " + receivedData);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 提交任務(wù)到線程池,第二個(gè)線程會(huì)生成數(shù)據(jù)并交換
executor.submit(() -> {
String data = "Data from the second thread";
try {
String receivedData = exchanger.exchange(data);
System.out.println("Received data from the first thread: " + receivedData);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 關(guān)閉線程池
executor.shutdown();
}
}
在這個(gè)示例中,我們創(chuàng)建了一個(gè) Exchanger 并使用一個(gè)線程池來模擬兩個(gè)線程。第一個(gè)線程在交換點(diǎn)等待,并準(zhǔn)備好交換數(shù)據(jù)。第二個(gè)線程在交換點(diǎn)準(zhǔn)備好數(shù)據(jù),并等待第一個(gè)線程到達(dá)交換點(diǎn)。當(dāng)兩個(gè)線程都到達(dá)交換點(diǎn)時(shí),它們可以交換數(shù)據(jù)。
解釋:
Exchanger<String> exchanger = new Exchanger<>();
:創(chuàng)建了一個(gè)新的 Exchanger 實(shí)例。String data = "Data from the first thread";
:第一個(gè)線程準(zhǔn)備的數(shù)據(jù)。exchanger.exchange(data);
:第一個(gè)線程在交換點(diǎn)等待,并準(zhǔn)備好交換數(shù)據(jù)。String receivedData = exchanger.exchange(data);
:第二個(gè)線程在交換點(diǎn)準(zhǔn)備好數(shù)據(jù),并等待第一個(gè)線程到達(dá)交換點(diǎn)。ExecutorService executor = Executors.newFixedThreadPool(2);
:創(chuàng)建了一個(gè)大小為2的線程池。executor.submit(() -> {...});
:提交了兩個(gè)任務(wù)到線程池。executor.shutdown();
:關(guān)閉線程池。這個(gè)示例展示了如何在多線程環(huán)境中使用 Exchanger 來確保兩個(gè)線程在某個(gè)點(diǎn)同步執(zhí)行并交換數(shù)據(jù)。由于 Exchanger 提供了線程間的同步點(diǎn),它適合用于需要線程間數(shù)據(jù)交換的場景。
代碼分析:
以下是 Semaphore 類的一些關(guān)鍵方法的代碼分析:
Semaphore(int permits)
: 這個(gè)構(gòu)造方法用于創(chuàng)建一個(gè) Semaphore 對(duì)象,并指定信號(hào)量的初始值。acquire()
: 這個(gè)方法用于嘗試獲取一個(gè)資源。如果信號(hào)量的值大于零,當(dāng)前線程可以繼續(xù)執(zhí)行;如果信號(hào)量的值等于零,當(dāng)前線程將阻塞。acquire(int permits)
: 這個(gè)方法與 acquire() 類似,但它允許指定要獲取的資源數(shù)量。release()
: 這個(gè)方法用于釋放一個(gè)資源。它會(huì)增加信號(hào)量的值,從而允許其他被阻塞的線程繼續(xù)執(zhí)行。tryAcquire()
: 這個(gè)方法用于嘗試獲取一個(gè)資源,但不阻塞。如果信號(hào)量的值大于零,當(dāng)前線程可以繼續(xù)執(zhí)行;如果信號(hào)量的值等于零,該方法將返回 false。tryAcquire(int permits)
: 這個(gè)方法與 tryAcquire() 類似,但它允許指定要獲取的資源數(shù)量。這些方法都使用了 Object 類的 wait() 和 notify() 方法來實(shí)現(xiàn)線程間的同步。當(dāng)線程到達(dá) acquire 方法時(shí),它會(huì)調(diào)用 wait() 方法,這會(huì)導(dǎo)致線程進(jìn)入等待狀態(tài)。當(dāng)其他線程調(diào)用 release 方法并釋放資源時(shí),會(huì)調(diào)用 notify() 方法來喚醒等待的線程。
一個(gè)用于在并發(fā)線程之間交換數(shù)據(jù)的工具,適用于遺傳算法、流水線設(shè)計(jì)等場景。
Exchanger 是一個(gè)用于線程間交換數(shù)據(jù)的同步輔助類,它允許兩個(gè)線程在某個(gè)點(diǎn)交換它們的數(shù)據(jù)。當(dāng)一個(gè)線程準(zhǔn)備好數(shù)據(jù)時(shí),它會(huì)等待另一個(gè)線程準(zhǔn)備好數(shù)據(jù),然后它們可以交換數(shù)據(jù)。如果一個(gè)線程準(zhǔn)備好數(shù)據(jù)而另一個(gè)線程還沒有準(zhǔn)備好,那么第一個(gè)線程會(huì)阻塞,直到第二個(gè)線程準(zhǔn)備好數(shù)據(jù)。
實(shí)現(xiàn)原理:
Exchanger 的實(shí)現(xiàn)原理是基于 Object 類的 wait() 和 notify() 方法。當(dāng)一個(gè)線程到達(dá)交換點(diǎn)時(shí),它會(huì)調(diào)用 exchange() 方法,該方法會(huì)嘗試將該線程的數(shù)據(jù)與另一個(gè)線程的數(shù)據(jù)交換。如果另一個(gè)線程還沒有準(zhǔn)備好數(shù)據(jù),那么第一個(gè)線程會(huì)阻塞,直到第二個(gè)線程到達(dá)交換點(diǎn)并準(zhǔn)備好數(shù)據(jù)。
Exchanger 使用一個(gè)內(nèi)部同步機(jī)制來跟蹤到達(dá)交換點(diǎn)的線程數(shù)量。
當(dāng)線程到達(dá)交換點(diǎn)時(shí),它會(huì)阻塞,直到另一個(gè)線程也到達(dá)交換點(diǎn)。
當(dāng)兩個(gè)線程都到達(dá)交換點(diǎn)時(shí),它們可以交換彼此的數(shù)據(jù)。
作用:
Exchanger 的主要作用是在多線程環(huán)境中提供一個(gè)線程間的同步點(diǎn),使得兩個(gè)線程可以交換數(shù)據(jù)。它適用于以下場景:
示例代碼:
以下是一個(gè)簡單的 Exchanger 使用示例:
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ExchangerExample {
public static void main(String[] args) {
// 創(chuàng)建一個(gè)Exchanger
Exchanger<String> exchanger = new Exchanger<>();
// 創(chuàng)建一個(gè)線程池
ExecutorService executor = Executors.newFixedThreadPool(2);
// 提交任務(wù)到線程池,第一個(gè)線程會(huì)生成數(shù)據(jù)并等待交換
executor.submit(() -> {
String data = "Data from the first thread";
try {
String receivedData = exchanger.exchange(data);
System.out.println("Received data from the second thread: " + receivedData);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 提交任務(wù)到線程池,第二個(gè)線程會(huì)生成數(shù)據(jù)并交換
executor.submit(() -> {
String data = "Data from the second thread";
try {
String receivedData = exchanger.exchange(data);
System.out.println("Received data from the first thread: " + receivedData);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 關(guān)閉線程池
executor.shutdown();
}
}
在這個(gè)示例中,我們創(chuàng)建了一個(gè) Exchanger 并使用一個(gè)線程池來模擬兩個(gè)線程。第一個(gè)線程在交換點(diǎn)等待,并準(zhǔn)備好交換數(shù)據(jù)。第二個(gè)線程在交換點(diǎn)準(zhǔn)備好數(shù)據(jù),并等待第一個(gè)線程到達(dá)交換點(diǎn)。當(dāng)兩個(gè)線程都到達(dá)交換點(diǎn)時(shí),它們可以交換數(shù)據(jù)。
解釋:
Exchanger<String> exchanger = new Exchanger<>();
:創(chuàng)建了一個(gè)新的 Exchanger 實(shí)例。String data = "Data from the first thread";
:第一個(gè)線程準(zhǔn)備的數(shù)據(jù)。String receivedData = exchanger.exchange(data);
:第一個(gè)線程在交換點(diǎn)等待,并準(zhǔn)備好交換數(shù)據(jù)。String receivedData = exchanger.exchange(data);
:第二個(gè)線程在交換點(diǎn)準(zhǔn)備好數(shù)據(jù),并等待第一個(gè)線程到達(dá)交換點(diǎn)。ExecutorService executor = Executors.newFixedThreadPool(2);
:創(chuàng)建了一個(gè)大小為2的線程池。executor.submit(() -> {...});
:提交了兩個(gè)任務(wù)到線程池。executor.shutdown();
:關(guān)閉線程池。這個(gè)示例展示了如何在多線程環(huán)境中使用 Exchanger 來確保兩個(gè)線程在某個(gè)點(diǎn)同步執(zhí)行并交換數(shù)據(jù)。由于 Exchanger 提供了線程間的同步點(diǎn),它適合用于需要線程間數(shù)據(jù)交換的場景。
在這個(gè)示例中,兩個(gè)線程分別準(zhǔn)備了一些數(shù)據(jù),并嘗試通過 Exchanger 進(jìn)行交換。如果一個(gè)線程到達(dá)交換點(diǎn)時(shí),另一個(gè)線程還沒有準(zhǔn)備好數(shù)據(jù),那么第一個(gè)線程會(huì)阻塞,直到第二個(gè)線程也到達(dá)交換點(diǎn)并準(zhǔn)備好數(shù)據(jù)。當(dāng)兩個(gè)線程都到達(dá)交換點(diǎn)時(shí),它們可以交換數(shù)據(jù),然后繼續(xù)執(zhí)行。
代碼分析:
在 Java 中,Exchanger 類的實(shí)現(xiàn)原理基于 Object 類的 wait() 和 notify() 方法。以下是 Exchanger 類的一些關(guān)鍵方法的實(shí)現(xiàn)原理:
boolean exchange(V x)
:這個(gè)方法允許一個(gè)線程嘗試交換數(shù)據(jù)。如果另一個(gè)線程已經(jīng)準(zhǔn)備好數(shù)據(jù),那么這兩個(gè)線程將交換數(shù)據(jù)。如果另一個(gè)線程還沒有準(zhǔn)備好數(shù)據(jù),那么當(dāng)前線程會(huì)阻塞,直到另一個(gè)線程到達(dá)交換點(diǎn)并準(zhǔn)備好數(shù)據(jù)。boolean exchange(V x, long timeout, TimeUnit unit)
:這個(gè)方法與 exchange(V x)
類似,但它允許設(shè)置一個(gè)超時(shí)時(shí)間。如果另一個(gè)線程在超時(shí)時(shí)間內(nèi)還沒有準(zhǔn)備好數(shù)據(jù),那么當(dāng)前線程將返回 false。V exchange(V x, Phaser phaser)
:這個(gè)方法允許一個(gè)線程嘗試交換數(shù)據(jù),并且使用一個(gè) Phaser 來管理線程的同步。如果另一個(gè)線程已經(jīng)準(zhǔn)備好數(shù)據(jù),那么這兩個(gè)線程將交換數(shù)據(jù)。如果另一個(gè)線程還沒有準(zhǔn)備好數(shù)據(jù),那么當(dāng)前線程會(huì)阻塞,直到另一個(gè)線程到達(dá)交換點(diǎn)并準(zhǔn)備好數(shù)據(jù)。
這些方法都使用了 Object 類的 wait()
和 notify()
方法來實(shí)現(xiàn)線程間的同步。當(dāng)一個(gè)線程到達(dá)交換點(diǎn)時(shí),它會(huì)調(diào)用 wait()
方法,這會(huì)導(dǎo)致線程進(jìn)入等待狀態(tài)。當(dāng)另一個(gè)線程到達(dá)交換點(diǎn)并調(diào)用 notify()
方法時(shí),第一個(gè)線程會(huì)被喚醒,并且可以繼續(xù)執(zhí)行。
一個(gè)可重用的同步屏障,適用于類似于 CyclicBarrier 的場景,但提供了更靈活的注冊和注銷機(jī)制。
實(shí)現(xiàn)原理:
Phaser 是一個(gè)可重用的同步屏障,它允許一組線程互相等待,直到它們都到達(dá)某個(gè)屏障點(diǎn)。與 CyclicBarrier 類似,Phaser 允許重用同一個(gè)屏障,這意味著在所有線程到達(dá)屏障點(diǎn)后,可以重新使用該屏障。
Phaser 使用一個(gè)內(nèi)部計(jì)數(shù)器來跟蹤到達(dá)屏障點(diǎn)的線程數(shù)量。
當(dāng)線程到達(dá)屏障點(diǎn)時(shí),它會(huì)阻塞,直到所有線程都到達(dá)屏障點(diǎn)。
Phaser 允許在屏障點(diǎn)被觸發(fā)后重新使用它,而不是每次使用后都必須創(chuàng)建一個(gè)新的。
作用:
Phaser 的主要作用是在多線程環(huán)境中提供一個(gè)線程間的同步點(diǎn),使得一組線程在完成各自的任務(wù)后,能夠同時(shí)繼續(xù)執(zhí)行后續(xù)操作。它適用于以下場景:
示例代碼:
以下是一個(gè)簡單的 Phaser 使用示例:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
public class PhaserExample {
public static void main(String[] args) {
// 創(chuàng)建一個(gè)Phaser,初始值為5
Phaser phaser = new Phaser(5);
// 創(chuàng)建一個(gè)線程池
ExecutorService executor = Executors.newFixedThreadPool(5);
// 提交任務(wù)到線程池,每個(gè)任務(wù)都會(huì)到達(dá)屏障點(diǎn)并執(zhí)行后續(xù)操作
for (int i = 0; i < 5; i++) {
final int taskNumber = i;
executor.submit(() -> {
System.out.println("Task " + taskNumber + " is running");
try {
phaser.arriveAndAwaitAdvance();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Task " + taskNumber + " is completed");
});
}
// 主線程等待所有任務(wù)完成后繼續(xù)執(zhí)行
try {
phaser.arriveAndAwaitAdvance();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("All tasks are completed, main thread continues");
// 關(guān)閉線程池
executor.shutdown();
}
}
在這個(gè)示例中,我們創(chuàng)建了一個(gè) Phaser 并使用一個(gè)線程池來模擬5個(gè)并發(fā)任務(wù)。每個(gè)任務(wù)在完成自己的操作后會(huì)到達(dá)屏障點(diǎn)并等待其他任務(wù)也到達(dá)屏障點(diǎn)。當(dāng)所有任務(wù)都到達(dá)屏障點(diǎn)時(shí),它們會(huì)繼續(xù)執(zhí)行后續(xù)操作。主線程在調(diào)用 phaser.arriveAndAwaitAdvance() 方法時(shí)會(huì)阻塞,直到所有子線程都到達(dá)屏障點(diǎn)。
解釋:
Phaser phaser = new Phaser(5);
:創(chuàng)建了一個(gè)初始值為5的 Phaser 實(shí)例。phaser.arriveAndAwaitAdvance();
:每個(gè)線程在完成任務(wù)后調(diào)用此方法到達(dá)屏障點(diǎn)并等待其他線程。ExecutorService executor = Executors.newFixedThreadPool(5);
:創(chuàng)建了一個(gè)大小為5的線程池。executor.submit(() -> {...});
:提交了5個(gè)任務(wù)到線程池。executor.shutdown();
:關(guān)閉線程池。這個(gè)示例展示了如何在多線程環(huán)境中使用 Phaser 來確保一組線程在完成各自的任務(wù)后,能夠同時(shí)繼續(xù)執(zhí)行后續(xù)操作。由于 Phaser 提供了線程間的同步點(diǎn),它適合用于需要線程同步執(zhí)行的場景。
代碼分析:
以下是 Phaser 類的一些關(guān)鍵方法的代碼分析:
Phaser(int parties)
: 這個(gè)構(gòu)造方法用于創(chuàng)建一個(gè) Phaser 對(duì)象,并指定屏障的參與者數(shù)量。arrive()
: 這個(gè)方法用于使當(dāng)前線程到達(dá)屏障點(diǎn)。每次調(diào)用此方法時(shí),計(jì)數(shù)器的值會(huì)遞減。arriveAndAwaitAdvance()
: 這個(gè)方法與 arrive() 類似,但它會(huì)阻塞當(dāng)前線程,直到計(jì)數(shù)器的值變?yōu)榱恪?/li>
awaitAdvance()
: 這個(gè)方法用于阻塞當(dāng)前線程,直到屏障點(diǎn)被觸發(fā)。getRegisteredParties()
: 這個(gè)方法用于獲取當(dāng)前注冊在屏障上的線程數(shù)量。getArrivedParties()
: 這個(gè)方法用于獲取已經(jīng)到達(dá)屏障的線程數(shù)量。isTerminated()
: 這個(gè)方法用于檢查屏障是否已經(jīng)終止。如果屏障已經(jīng)終止,返回 true;否則返回 false。forceTermination()
: 這個(gè)方法用于強(qiáng)制終止屏障。它將計(jì)數(shù)器的值設(shè)置為零,并喚醒所有等待的線程。
這些方法都使用了 Object 類的 wait()
和 notify()
方法來實(shí)現(xiàn)線程間的同步。當(dāng)線程到達(dá) arriveAndAwaitAdvance()
方法時(shí),它會(huì)調(diào)用 wait()
方法,這會(huì)導(dǎo)致線程進(jìn)入等待狀態(tài)。當(dāng)其他線程到達(dá)屏障點(diǎn)并調(diào)用 arrive()
方法時(shí),會(huì)調(diào)用 notify()
方法來喚醒等待的線程。
以上就是 V哥給大家整理的12個(gè)并發(fā)相關(guān)的數(shù)據(jù)結(jié)構(gòu),這些并發(fā)數(shù)據(jù)結(jié)構(gòu)是 Java 并發(fā)編程的基礎(chǔ),它們在 java.util.concurrent
包(J.U.C)中提供。使用這些數(shù)據(jù)結(jié)構(gòu)可以幫助開發(fā)者編寫出高效且線程安全的并發(fā)程序,分布式應(yīng)用開發(fā)的項(xiàng)目中,你會(huì)使用到的。Java并發(fā)
更多建議: