深入了解跳表(Skip List):Redis Sorted Set 的高效數據結構

2024-12-17 12:00 更新

跳表(Skip List)是一種隨機化的數據結構,基于有序鏈表,通過在鏈表上增加多級索引來提高數據的查找效率。它是由 William Pugh 在 1990 年提出的。

為什么 Redis 中的 Sorted Set 使用跳躍表

Redis 的有序集合(Sorted Set)使用跳躍表(Skip List)作為底層實現,主要是因為跳躍表在性能、實現復雜度和靈活性等方面具有顯著優(yōu)勢,可以替代平衡樹的數據結構。

跳躍表的原理

跳躍表是一種擴展的有序鏈表,通過維護多個層級的索引來提高查找效率。每個節(jié)點包含一個數據元素和一組指向其他節(jié)點的指針,這些指針分布在不同的層級。最底層的鏈表包含所有元素,而每一層的節(jié)點數量逐漸減少。這樣,查找操作可以從高層開始,快速跳過不需要的元素,減少遍歷的節(jié)點數,從而提高查找效率。

  • 查找過程:從最高層的頭節(jié)點開始,沿著索引節(jié)點向右查找,如果當前節(jié)點的下一個節(jié)點的值大于或等于查找的目標值,則向下移動到下一層繼續(xù)查找;否則,向右移動到下一個索引節(jié)點。這個過程一直持續(xù)到找到目標節(jié)點或到達最底層鏈表。

  • 插入和刪除操作:跳躍表支持高效的動態(tài)插入和刪除,時間復雜度均為 O(log n)。插入時,首先找到插入位置,然后根據隨機函數決定新節(jié)點的層數,最后在相應的層中插入節(jié)點。

跳躍表的優(yōu)勢

  1. 簡單性:跳躍表的實現相對簡單,易于理解和維護,而平衡樹(如紅黑樹)的實現較為復雜,容易出錯。

  1. 高效的范圍查詢:跳躍表在進行范圍查詢時效率更高,因為它是有序的鏈表,可以直接遍歷后繼節(jié)點,而平衡樹需要中序遍歷,復雜度更高。

  1. 靈活性:跳躍表的層數可以根據需要動態(tài)調整,適應不同的查詢需求。

  1. 高并發(fā)性能:跳躍表的節(jié)點可以支持多個線程同時插入或刪除節(jié)點,而平衡樹和哈希表通常需要復雜的并發(fā)控制。

  1. 空間效率:跳躍表的空間復雜度為 O(n),并且通過調整節(jié)點的抽取間隔,可以靈活平衡空間和時間復雜度。

正是因為有這些優(yōu)勢,Redis 選擇使用跳躍表來實現有序集合,而不是紅黑樹或其他數據結構。這使得 Redis 在處理有序集合時能夠高效地支持插入、刪除和查找操作,同時保持元素的有序性,非常適合實現如排行榜、范圍查詢等功能。

為了講明白跳表的原理,現在我們來模擬一個簡單的跳表實現。

在 Java 中模擬實現 Redis 的 Sorted Set 跳表,我們需要定義跳表的數據結構,包括節(jié)點和跳表本身。以下是一個簡單的實現:

import java.util.Random;


public class SkipList {


    private static final double P = 0.5; // 節(jié)點晉升的概率
    private static final int MAX_LEVEL = 16; // 最大層數
    private int levelCount; // 當前跳表的層數
    private Node head; // 頭節(jié)點
    private int size; // 跳表中元素的數量
    private Random random; // 隨機數生成器


    public SkipList() {
        this.levelCount = 0;
        this.size = 0;
        this.head = new Node(-1, Integer.MIN_VALUE, MAX_LEVEL);
        this.random = new Random();
    }


    // 節(jié)點定義
    private class Node {
        int value;
        int key;
        Node[] forward; // 向前指針數組


        Node(int value, int key, int level) {
            this.value = value;
            this.key = key;
            this.forward = new Node[level + 1];
        }
    }


    // 隨機生成節(jié)點的層數
    private int randomLevel() {
        int level = 0;
        while (random.nextFloat() < P && level < MAX_LEVEL) {
            level++;
        }
        return level;
    }


    // 搜索指定值的節(jié)點
    public Node search(int key) {
        Node current = head;
        for (int i = levelCount - 1; i >= 0; i--) {
            while (current.forward[i] != null && current.forward[i].key < key) {
                current = current.forward[i]; // 沿著當前層的指針移動
            }
        }
        current = current.forward[0]; // 移動到第0層
        return current;
    }


    // 插入節(jié)點
    public void insert(int key, int value) {
        Node[] update = new Node[MAX_LEVEL + 1];
        Node current = head;


        // 查找插入位置
        for (int i = levelCount - 1; i >= 0; i--) {
            while (current.forward[i] != null && current.forward[i].key < key) {
                current = current.forward[i];
            }
            update[i] = current;
        }


        int level = randomLevel(); // 隨機生成層數
        if (level > levelCount) {
            levelCount = level;
            for (int i = levelCount - 1; i >= 0; i--) {
                update[i] = head;
            }
        }


        current = new Node(value, key, level);
        for (int i = 0; i < level; i++) {
            current.forward[i] = update[i].forward[i];
            update[i].forward[i] = current;
        }
        size++;
    }


    // 刪除節(jié)點
    public void delete(int key) {
        Node[] update = new Node[MAX_LEVEL + 1];
        Node current = head;


        // 查找要刪除的節(jié)點
        for (int i = levelCount - 1; i >= 0; i--) {
            while (current.forward[i] != null && current.forward[i].key < key) {
                current = current.forward[i];
            }
            update[i] = current;
        }


        current = current.forward[0];
        if (current != null && current.key == key) {
            for (int i = 0; i < levelCount; i++) {
                if (update[i].forward[i] != current) {
                    break;
                }
                update[i].forward[i] = current.forward[i];
            }
            size--;
            while (levelCount > 0 && head.forward[levelCount - 1] == null) {
                levelCount--;
            }
        }
    }


    // 打印跳表
    public void printList() {
        Node current = head.forward[0];
        while (current != null) {
            System.out.println(current.key + ":" + current.value);
            current = current.forward[0];
        }
    }


    // 主函數測試跳表
    public static void main(String[] args) {
        SkipList list = new SkipList();
        list.insert(3, 100);
        list.insert(6, 300);
        list.insert(4, 400);
        list.insert(7, 700);
        list.insert(5, 500);
        list.printList();
        list.delete(3);
        list.printList();
    }
}

下面,V 哥來解釋一下:

  1. 跳表節(jié)點:每個節(jié)點包含一個值、一個鍵和一個向前指針數組。向前指針數組存儲了指向同一層下一個節(jié)點的引用。

  1. 隨機層數:每個節(jié)點的層數是根據預設的概率 P 隨機生成的,這樣可以模擬出不同高度的索引層。

  1. 搜索操作:從最高層開始,沿著當前層的指針移動,直到找到插入點或到達底層。

  1. 插入操作:首先找到插入位置,然后根據隨機生成的層數創(chuàng)建新節(jié)點,并更新每一層的指針。

  1. 刪除操作:找到要刪除的節(jié)點,然后逐層更新指針,最后減少跳表的層數。

  1. 打印跳表:從底層的頭節(jié)點開始,遍歷打印每個節(jié)點的鍵和值。

從這個簡化版的跳表實現可以看到跳表的基本操作??梢詭椭覀兝斫馓韺崿F的原理。

如何在 Java 中實現跳表的并發(fā)操作?

在 Java 中實現跳表的并發(fā)操作需要考慮線程安全問題??梢酝ㄟ^以下方法來實現:

  1. 使用同步塊:在每個公共方法中使用 synchronized 關鍵字來確保同一時間只有一個線程可以執(zhí)行方法。

  1. 使用鎖:使用 ReentrantLock 或其他并發(fā)鎖來控制對跳表的并發(fā)訪問。

  1. 使用原子引用:使用 AtomicReferenceAtomicReferenceArray 來確保節(jié)點的原子更新。

  1. 使用并發(fā)集合:使用 ConcurrentHashMap 等并發(fā)集合作為輔助工具來實現線程安全的跳表。

以下是一個使用 synchronized 關鍵字實現線程安全的跳表的示例:

import java.util.Random;


public class ConcurrentSkipList {


    private static final double P = 0.5; // 節(jié)點晉升的概率
    private static final int MAX_LEVEL = 16; // 最大層數
    private int levelCount; // 當前跳表的層數
    private Node head; // 頭節(jié)點
    private int size; // 跳表中元素的數量
    private Random random; // 隨機數生成器


    public ConcurrentSkipList() {
        this.levelCount = 0;
        this.size = 0;
        this.head = new Node(-1, Integer.MIN_VALUE, MAX_LEVEL);
        this.random = new Random();
    }


    private class Node {
        int value;
        int key;
        Node[] forward; // 向前指針數組


        Node(int value, int key, int level) {
            this.value = value;
            this.key = key;
            this.forward = new Node[level + 1];
        }
    }


    private int randomLevel() {
        int level = 0;
        while (random.nextFloat() < P && level < MAX_LEVEL) {
            level++;
        }
        return level;
    }


    public synchronized Node search(int key) {
        Node current = head;
        for (int i = levelCount - 1; i >= 0; i--) {
            while (current.forward[i] != null && current.forward[i].key < key) {
                current = current.forward[i];
            }
        }
        current = current.forward[0];
        return current;
    }


    public synchronized void insert(int key, int value) {
        Node[] update = new Node[MAX_LEVEL + 1];
        Node current = head;


        for (int i = levelCount - 1; i >= 0; i--) {
            while (current.forward[i] != null && current.forward[i].key < key) {
                current = current.forward[i];
            }
            update[i] = current;
        }


        int level = randomLevel();
        if (level > levelCount) {
            for (int i = levelCount; i < level; i++) {
                update[i] = head;
            }
            levelCount = level;
        }


        current = new Node(value, key, level);
        for (int i = 0; i < level; i++) {
            current.forward[i] = update[i].forward[i];
            update[i].forward[i] = current;
        }
        size++;
    }


    public synchronized void delete(int key) {
        Node[] update = new Node[MAX_LEVEL + 1];
        Node current = head;


        for (int i = levelCount - 1; i >= 0; i--) {
            while (current.forward[i] != null && current.forward[i].key < key) {
                current = current.forward[i];
            }
            update[i] = current;
        }


        current = current.forward[0];
        if (current != null && current.key == key) {
            for (int i = 0; i < levelCount; i++) {
                if (update[i].forward[i] != current) {
                    break;
                }
                update[i].forward[i] = current.forward[i];
            }
            size--;
            while (levelCount > 0 && head.forward[levelCount - 1] == null) {
                levelCount--;
            }
        }
    }


    public synchronized void printList() {
        Node current = head.forward[0];
        while (current != null) {
            System.out.println(current.key + ":" + current.value);
            current = current.forward[0];
        }
    }


    public static void main(String[] args) {
        ConcurrentSkipList list = new ConcurrentSkipList();
        list.insert(3, 100);
        list.insert(6, 300);
        list.insert(4, 400);
        list.insert(7, 700);
        list.insert(5, 500);
        list.printList();
        list.delete(3);
        list.printList();
    }
}

在這個示例中,我們使用了 synchronized 關鍵字來確保 search、insertdelete 方法是線程安全的。這會鎖定當前對象,確保同一時間只有一個線程可以執(zhí)行這些方法。

請注意,雖然 synchronized 可以提供線程安全,但它也可能導致性能瓶頸,特別是在高并發(fā)環(huán)境下。在實際應用中,可以考慮使用更細粒度的鎖,如 ReentrantLock,或者使用原子引用和其他并發(fā)工具來提高性能。

使用 ReentrantLock 的實現方式

使用 ReentrantLock 實現跳表的并發(fā)操作可以提供比 synchronized 更細粒度的鎖定,從而提高并發(fā)性能。ReentrantLock 允許您在不同的方法中鎖定和解鎖,甚至可以在不同的類中使用同一個鎖對象。

以下是使用 ReentrantLock 實現線程安全的跳表的示例:

import java.util.Random;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;


public class ConcurrentSkipList {


    private static final double P = 0.5; // 節(jié)點晉升的概率
    private static final int MAX_LEVEL = 16; // 最大層數
    private final Lock lock = new ReentrantLock(); // 創(chuàng)建一個 ReentrantLock 對象
    private int levelCount; // 當前跳表的層數
    private Node head; // 頭節(jié)點
    private int size; // 跳表中元素的數量
    private Random random; // 隨機數生成器


    public ConcurrentSkipList() {
        this.levelCount = 0;
        this.size = 0;
        this.head = new Node(Integer.MIN_VALUE, MAX_LEVEL);
        this.random = new Random();
    }


    private class Node {
        int value;
        int key;
        Node[] forward; // 向前指針數組


        Node(int key, int level) {
            this.key = key;
            this.forward = new Node[level + 1];
        }
    }


    private int randomLevel() {
        int level = 0;
        while (random.nextFloat() < P && level < MAX_LEVEL) {
            level++;
        }
        return level;
    }


    public void search(int key) {
        lock.lock(); // 加鎖
        try {
            Node current = head;
            for (int i = levelCount - 1; i >= 0; i--) {
                while (current.forward[i] != null && current.forward[i].key < key) {
                    current = current.forward[i];
                }
            }
            current = current.forward[0];
            // ... 處理找到的節(jié)點
        } finally {
            lock.unlock(); // 釋放鎖
        }
    }


    public void insert(int key, int value) {
        lock.lock(); // 加鎖
        try {
            Node[] update = new Node[MAX_LEVEL + 1];
            Node current = head;
            for (int i = levelCount - 1; i >= 0; i--) {
                while (current.forward[i] != null && current.forward[i].key < key) {
                    current = current.forward[i];
                }
                update[i] = current;
            }


            int level = randomLevel();
            if (level > levelCount) {
                levelCount = level;
            }


            current = new Node(value, key, level);
            for (int i = 0; i < level; i++) {
                current.forward[i] = update[i].forward[i];
                update[i].forward[i] = current;
            }
            size++;
        } finally {
            lock.unlock(); // 釋放鎖
        }
    }


    public void delete(int key) {
        lock.lock(); // 加鎖
        try {
            Node[] update = new Node[MAX_LEVEL + 1];
            Node current = head;
            for (int i = levelCount - 1; i >= 1; i--) {
                while (current.forward[i] != null && current.forward[i].key < key) {
                    current = current.forward[i];
                }
                update[i] = current;
            }


            current = current.forward[0];
            if (current != null && current.key == key) {
                for (int i = 0; i < levelCount; i++) {
                    if (update[i].forward[i] != current) {
                        break;
                    }
                    update[i].forward[i] = current.forward[i];
                }
                size--;
                while (levelCount > 0 && head.forward[levelCount - 1] == null) {
                    levelCount--;
                }
            }
        } finally {
            lock.unlock(); // 釋放鎖
        }
    }


    public void printList() {
        lock.lock(); // 加鎖
        try {
            Node current = head.forward[0];
            while (current != null) {
                System.out.println(current.key + ":" + current.value);
                current = current.forward[0];
            }
        } finally {
            lock.unlock(); // 釋放鎖
        }
    }


    public static void main(String[] args) {
        ConcurrentSkipList list = new ConcurrentSkipList();
        list.insert(3, 100);
        list.insert(6, 300);
        list.insert(4, 400);
        list.insert(7, 700);
        list.insert(5, 500);
        list.printList();
        list.delete(3);
        list.printList();
    }
}

在這個示例中,我們使用了 ReentrantLock 對象來控制對跳表的并發(fā)訪問。每個公共方法在執(zhí)行之前都會調用 lock.lock() 加鎖,并在執(zhí)行完畢后(包括正常執(zhí)行和異常退出)調用 lock.unlock() 釋放鎖。

使用 ReentrantLock 的好處是它提供了比 synchronized 更靈活的鎖定機制,例如:

  1. 可中斷的鎖定ReentrantLock 允許線程在嘗試獲取鎖時被中斷。

  1. 嘗試非阻塞鎖定ReentrantLock 提供了 tryLock() 方法,允許線程嘗試獲取鎖而不無限等待。

  1. 超時獲取鎖ReentrantLock 還提供了 tryLock(long timeout, TimeUnit unit) 方法,允許線程在指定時間內等待鎖。

  1. 公平鎖ReentrantLock 可以選擇是否使用公平鎖,公平鎖會按照線程請求鎖的順序來分配鎖。

  1. 多個條件變量ReentrantLock 可以與多個 Condition 對象配合使用,而 synchronized 只能與一個條件變量配合使用。

理解了以上代碼實現原理后,我們再來理解 Redis Sorted Set 就不難了。

Redis 的 Sorted Set 是一種包含元素和關聯(lián)分數的數據結構,它能夠根據分數對元素進行排序。Sorted Set 在 Redis 中的內部實現可以是跳躍表(Skip List)和字典(Hash Table)的組合,或者是壓縮列表(Zip List),具體使用哪種實現取決于 Sorted Set 的大小和元素的長度。

跳躍表 + 字典實現

當 Sorted Set 的元素數量較多或者元素長度較長時,Redis 使用跳躍表和字典來實現 Sorted Set。跳躍表是一種概率平衡的數據結構,它通過多級索引來提高搜索效率,類似于二分查找。字典則用于快速查找和更新操作。

跳躍表的每個節(jié)點包含以下信息:

  • 元素(member)
  • 分數(score)
  • 后退指針(backward)
  • 多層前進指針(forward),每一層都是一個有序鏈表

字典則存儲了元素到分數的映射,以便快速訪問。

壓縮列表實現

當 Sorted Set 的元素數量較少(默認小于128個),并且所有元素的長度都小于64字節(jié)時,Redis 使用壓縮列表來存儲 Sorted Set。壓縮列表是一種連續(xù)內存塊的順序存儲結構,它將所有的元素和分數緊湊地存儲在一起,以節(jié)省內存空間。

應用場景

Sorted Set 常用于以下場景:

  1. 排行榜:例如游戲中的玩家分數排名。
  2. 范圍查詢:例如獲取一定分數范圍內的用戶。
  3. 任務調度:例如按照任務的優(yōu)先級執(zhí)行。
  4. 實時排名:例如股票價格的實時排名。

代碼分析

在 Redis 源碼中,Sorted Set 的實現主要在 t_zset.c 文件中。插入操作(zaddCommand)會根據 Sorted Set 的編碼類型(跳躍表或壓縮列表)來執(zhí)行不同的邏輯。如果是跳躍表編碼,那么插入操作會涉及到字典的更新和跳躍表節(jié)點的添加。如果是壓縮列表編碼,則會檢查是否需要轉換為跳躍表編碼。

總結

Sorted Set 是 Redis 提供的一種強大的有序數據結構,它結合了跳躍表和字典的優(yōu)點,提供了高效的插入、刪除、更新和范圍查詢操作。通過合理的使用 Sorted Set,可以有效地解決許多實際問題。如何以上內容對你有幫助,懇請點贊轉發(fā),V 哥在此感謝各位兄弟的支持。88,洗洗睡了。

以上內容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號