跳表(Skip List)是一種隨機化的數據結構,基于有序鏈表,通過在鏈表上增加多級索引來提高數據的查找效率。它是由 William Pugh 在 1990 年提出的。
Redis 的有序集合(Sorted Set)使用跳躍表(Skip List)作為底層實現,主要是因為跳躍表在性能、實現復雜度和靈活性等方面具有顯著優(yōu)勢,可以替代平衡樹的數據結構。
跳躍表是一種擴展的有序鏈表,通過維護多個層級的索引來提高查找效率。每個節(jié)點包含一個數據元素和一組指向其他節(jié)點的指針,這些指針分布在不同的層級。最底層的鏈表包含所有元素,而每一層的節(jié)點數量逐漸減少。這樣,查找操作可以從高層開始,快速跳過不需要的元素,減少遍歷的節(jié)點數,從而提高查找效率。
正是因為有這些優(yōu)勢,Redis 選擇使用跳躍表來實現有序集合,而不是紅黑樹或其他數據結構。這使得 Redis 在處理有序集合時能夠高效地支持插入、刪除和查找操作,同時保持元素的有序性,非常適合實現如排行榜、范圍查詢等功能。
為了講明白跳表的原理,現在我們來模擬一個簡單的跳表實現。
在 Java 中模擬實現 Redis 的 Sorted Set 跳表,我們需要定義跳表的數據結構,包括節(jié)點和跳表本身。以下是一個簡單的實現:
import java.util.Random;
public class SkipList {
private static final double P = 0.5; // 節(jié)點晉升的概率
private static final int MAX_LEVEL = 16; // 最大層數
private int levelCount; // 當前跳表的層數
private Node head; // 頭節(jié)點
private int size; // 跳表中元素的數量
private Random random; // 隨機數生成器
public SkipList() {
this.levelCount = 0;
this.size = 0;
this.head = new Node(-1, Integer.MIN_VALUE, MAX_LEVEL);
this.random = new Random();
}
// 節(jié)點定義
private class Node {
int value;
int key;
Node[] forward; // 向前指針數組
Node(int value, int key, int level) {
this.value = value;
this.key = key;
this.forward = new Node[level + 1];
}
}
// 隨機生成節(jié)點的層數
private int randomLevel() {
int level = 0;
while (random.nextFloat() < P && level < MAX_LEVEL) {
level++;
}
return level;
}
// 搜索指定值的節(jié)點
public Node search(int key) {
Node current = head;
for (int i = levelCount - 1; i >= 0; i--) {
while (current.forward[i] != null && current.forward[i].key < key) {
current = current.forward[i]; // 沿著當前層的指針移動
}
}
current = current.forward[0]; // 移動到第0層
return current;
}
// 插入節(jié)點
public void insert(int key, int value) {
Node[] update = new Node[MAX_LEVEL + 1];
Node current = head;
// 查找插入位置
for (int i = levelCount - 1; i >= 0; i--) {
while (current.forward[i] != null && current.forward[i].key < key) {
current = current.forward[i];
}
update[i] = current;
}
int level = randomLevel(); // 隨機生成層數
if (level > levelCount) {
levelCount = level;
for (int i = levelCount - 1; i >= 0; i--) {
update[i] = head;
}
}
current = new Node(value, key, level);
for (int i = 0; i < level; i++) {
current.forward[i] = update[i].forward[i];
update[i].forward[i] = current;
}
size++;
}
// 刪除節(jié)點
public void delete(int key) {
Node[] update = new Node[MAX_LEVEL + 1];
Node current = head;
// 查找要刪除的節(jié)點
for (int i = levelCount - 1; i >= 0; i--) {
while (current.forward[i] != null && current.forward[i].key < key) {
current = current.forward[i];
}
update[i] = current;
}
current = current.forward[0];
if (current != null && current.key == key) {
for (int i = 0; i < levelCount; i++) {
if (update[i].forward[i] != current) {
break;
}
update[i].forward[i] = current.forward[i];
}
size--;
while (levelCount > 0 && head.forward[levelCount - 1] == null) {
levelCount--;
}
}
}
// 打印跳表
public void printList() {
Node current = head.forward[0];
while (current != null) {
System.out.println(current.key + ":" + current.value);
current = current.forward[0];
}
}
// 主函數測試跳表
public static void main(String[] args) {
SkipList list = new SkipList();
list.insert(3, 100);
list.insert(6, 300);
list.insert(4, 400);
list.insert(7, 700);
list.insert(5, 500);
list.printList();
list.delete(3);
list.printList();
}
}
下面,V 哥來解釋一下:
P
隨機生成的,這樣可以模擬出不同高度的索引層。從這個簡化版的跳表實現可以看到跳表的基本操作??梢詭椭覀兝斫馓韺崿F的原理。
在 Java 中實現跳表的并發(fā)操作需要考慮線程安全問題??梢酝ㄟ^以下方法來實現:
synchronized
關鍵字來確保同一時間只有一個線程可以執(zhí)行方法。ReentrantLock
或其他并發(fā)鎖來控制對跳表的并發(fā)訪問。AtomicReference
或 AtomicReferenceArray
來確保節(jié)點的原子更新。ConcurrentHashMap
等并發(fā)集合作為輔助工具來實現線程安全的跳表。
以下是一個使用 synchronized
關鍵字實現線程安全的跳表的示例:
import java.util.Random;
public class ConcurrentSkipList {
private static final double P = 0.5; // 節(jié)點晉升的概率
private static final int MAX_LEVEL = 16; // 最大層數
private int levelCount; // 當前跳表的層數
private Node head; // 頭節(jié)點
private int size; // 跳表中元素的數量
private Random random; // 隨機數生成器
public ConcurrentSkipList() {
this.levelCount = 0;
this.size = 0;
this.head = new Node(-1, Integer.MIN_VALUE, MAX_LEVEL);
this.random = new Random();
}
private class Node {
int value;
int key;
Node[] forward; // 向前指針數組
Node(int value, int key, int level) {
this.value = value;
this.key = key;
this.forward = new Node[level + 1];
}
}
private int randomLevel() {
int level = 0;
while (random.nextFloat() < P && level < MAX_LEVEL) {
level++;
}
return level;
}
public synchronized Node search(int key) {
Node current = head;
for (int i = levelCount - 1; i >= 0; i--) {
while (current.forward[i] != null && current.forward[i].key < key) {
current = current.forward[i];
}
}
current = current.forward[0];
return current;
}
public synchronized void insert(int key, int value) {
Node[] update = new Node[MAX_LEVEL + 1];
Node current = head;
for (int i = levelCount - 1; i >= 0; i--) {
while (current.forward[i] != null && current.forward[i].key < key) {
current = current.forward[i];
}
update[i] = current;
}
int level = randomLevel();
if (level > levelCount) {
for (int i = levelCount; i < level; i++) {
update[i] = head;
}
levelCount = level;
}
current = new Node(value, key, level);
for (int i = 0; i < level; i++) {
current.forward[i] = update[i].forward[i];
update[i].forward[i] = current;
}
size++;
}
public synchronized void delete(int key) {
Node[] update = new Node[MAX_LEVEL + 1];
Node current = head;
for (int i = levelCount - 1; i >= 0; i--) {
while (current.forward[i] != null && current.forward[i].key < key) {
current = current.forward[i];
}
update[i] = current;
}
current = current.forward[0];
if (current != null && current.key == key) {
for (int i = 0; i < levelCount; i++) {
if (update[i].forward[i] != current) {
break;
}
update[i].forward[i] = current.forward[i];
}
size--;
while (levelCount > 0 && head.forward[levelCount - 1] == null) {
levelCount--;
}
}
}
public synchronized void printList() {
Node current = head.forward[0];
while (current != null) {
System.out.println(current.key + ":" + current.value);
current = current.forward[0];
}
}
public static void main(String[] args) {
ConcurrentSkipList list = new ConcurrentSkipList();
list.insert(3, 100);
list.insert(6, 300);
list.insert(4, 400);
list.insert(7, 700);
list.insert(5, 500);
list.printList();
list.delete(3);
list.printList();
}
}
在這個示例中,我們使用了 synchronized
關鍵字來確保 search
、insert
和 delete
方法是線程安全的。這會鎖定當前對象,確保同一時間只有一個線程可以執(zhí)行這些方法。
請注意,雖然 synchronized
可以提供線程安全,但它也可能導致性能瓶頸,特別是在高并發(fā)環(huán)境下。在實際應用中,可以考慮使用更細粒度的鎖,如 ReentrantLock
,或者使用原子引用和其他并發(fā)工具來提高性能。
ReentrantLock
的實現方式
使用 ReentrantLock
實現跳表的并發(fā)操作可以提供比 synchronized
更細粒度的鎖定,從而提高并發(fā)性能。ReentrantLock
允許您在不同的方法中鎖定和解鎖,甚至可以在不同的類中使用同一個鎖對象。
以下是使用 ReentrantLock
實現線程安全的跳表的示例:
import java.util.Random;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ConcurrentSkipList {
private static final double P = 0.5; // 節(jié)點晉升的概率
private static final int MAX_LEVEL = 16; // 最大層數
private final Lock lock = new ReentrantLock(); // 創(chuàng)建一個 ReentrantLock 對象
private int levelCount; // 當前跳表的層數
private Node head; // 頭節(jié)點
private int size; // 跳表中元素的數量
private Random random; // 隨機數生成器
public ConcurrentSkipList() {
this.levelCount = 0;
this.size = 0;
this.head = new Node(Integer.MIN_VALUE, MAX_LEVEL);
this.random = new Random();
}
private class Node {
int value;
int key;
Node[] forward; // 向前指針數組
Node(int key, int level) {
this.key = key;
this.forward = new Node[level + 1];
}
}
private int randomLevel() {
int level = 0;
while (random.nextFloat() < P && level < MAX_LEVEL) {
level++;
}
return level;
}
public void search(int key) {
lock.lock(); // 加鎖
try {
Node current = head;
for (int i = levelCount - 1; i >= 0; i--) {
while (current.forward[i] != null && current.forward[i].key < key) {
current = current.forward[i];
}
}
current = current.forward[0];
// ... 處理找到的節(jié)點
} finally {
lock.unlock(); // 釋放鎖
}
}
public void insert(int key, int value) {
lock.lock(); // 加鎖
try {
Node[] update = new Node[MAX_LEVEL + 1];
Node current = head;
for (int i = levelCount - 1; i >= 0; i--) {
while (current.forward[i] != null && current.forward[i].key < key) {
current = current.forward[i];
}
update[i] = current;
}
int level = randomLevel();
if (level > levelCount) {
levelCount = level;
}
current = new Node(value, key, level);
for (int i = 0; i < level; i++) {
current.forward[i] = update[i].forward[i];
update[i].forward[i] = current;
}
size++;
} finally {
lock.unlock(); // 釋放鎖
}
}
public void delete(int key) {
lock.lock(); // 加鎖
try {
Node[] update = new Node[MAX_LEVEL + 1];
Node current = head;
for (int i = levelCount - 1; i >= 1; i--) {
while (current.forward[i] != null && current.forward[i].key < key) {
current = current.forward[i];
}
update[i] = current;
}
current = current.forward[0];
if (current != null && current.key == key) {
for (int i = 0; i < levelCount; i++) {
if (update[i].forward[i] != current) {
break;
}
update[i].forward[i] = current.forward[i];
}
size--;
while (levelCount > 0 && head.forward[levelCount - 1] == null) {
levelCount--;
}
}
} finally {
lock.unlock(); // 釋放鎖
}
}
public void printList() {
lock.lock(); // 加鎖
try {
Node current = head.forward[0];
while (current != null) {
System.out.println(current.key + ":" + current.value);
current = current.forward[0];
}
} finally {
lock.unlock(); // 釋放鎖
}
}
public static void main(String[] args) {
ConcurrentSkipList list = new ConcurrentSkipList();
list.insert(3, 100);
list.insert(6, 300);
list.insert(4, 400);
list.insert(7, 700);
list.insert(5, 500);
list.printList();
list.delete(3);
list.printList();
}
}
在這個示例中,我們使用了 ReentrantLock
對象來控制對跳表的并發(fā)訪問。每個公共方法在執(zhí)行之前都會調用 lock.lock()
加鎖,并在執(zhí)行完畢后(包括正常執(zhí)行和異常退出)調用 lock.unlock()
釋放鎖。
使用 ReentrantLock
的好處是它提供了比 synchronized
更靈活的鎖定機制,例如:
ReentrantLock
允許線程在嘗試獲取鎖時被中斷。ReentrantLock
提供了 tryLock()
方法,允許線程嘗試獲取鎖而不無限等待。ReentrantLock
還提供了 tryLock(long timeout, TimeUnit unit)
方法,允許線程在指定時間內等待鎖。ReentrantLock
可以選擇是否使用公平鎖,公平鎖會按照線程請求鎖的順序來分配鎖。ReentrantLock
可以與多個 Condition
對象配合使用,而 synchronized
只能與一個條件變量配合使用。理解了以上代碼實現原理后,我們再來理解 Redis Sorted Set 就不難了。
Redis 的 Sorted Set 是一種包含元素和關聯(lián)分數的數據結構,它能夠根據分數對元素進行排序。Sorted Set 在 Redis 中的內部實現可以是跳躍表(Skip List)和字典(Hash Table)的組合,或者是壓縮列表(Zip List),具體使用哪種實現取決于 Sorted Set 的大小和元素的長度。
當 Sorted Set 的元素數量較多或者元素長度較長時,Redis 使用跳躍表和字典來實現 Sorted Set。跳躍表是一種概率平衡的數據結構,它通過多級索引來提高搜索效率,類似于二分查找。字典則用于快速查找和更新操作。
跳躍表的每個節(jié)點包含以下信息:
字典則存儲了元素到分數的映射,以便快速訪問。
當 Sorted Set 的元素數量較少(默認小于128個),并且所有元素的長度都小于64字節(jié)時,Redis 使用壓縮列表來存儲 Sorted Set。壓縮列表是一種連續(xù)內存塊的順序存儲結構,它將所有的元素和分數緊湊地存儲在一起,以節(jié)省內存空間。
Sorted Set 常用于以下場景:
在 Redis 源碼中,Sorted Set 的實現主要在 t_zset.c
文件中。插入操作(zaddCommand
)會根據 Sorted Set 的編碼類型(跳躍表或壓縮列表)來執(zhí)行不同的邏輯。如果是跳躍表編碼,那么插入操作會涉及到字典的更新和跳躍表節(jié)點的添加。如果是壓縮列表編碼,則會檢查是否需要轉換為跳躍表編碼。
Sorted Set 是 Redis 提供的一種強大的有序數據結構,它結合了跳躍表和字典的優(yōu)點,提供了高效的插入、刪除、更新和范圍查詢操作。通過合理的使用 Sorted Set,可以有效地解決許多實際問題。如何以上內容對你有幫助,懇請點贊轉發(fā),V 哥在此感謝各位兄弟的支持。88,洗洗睡了。
更多建議: