深入了解跳表(Skip List):Redis Sorted Set 的高效數(shù)據(jù)結(jié)構(gòu)

2024-12-17 12:00 更新

跳表(Skip List)是一種隨機(jī)化的數(shù)據(jù)結(jié)構(gòu),基于有序鏈表,通過在鏈表上增加多級(jí)索引來提高數(shù)據(jù)的查找效率。它是由 William Pugh 在 1990 年提出的。

為什么 Redis 中的 Sorted Set 使用跳躍表

Redis 的有序集合(Sorted Set)使用跳躍表(Skip List)作為底層實(shí)現(xiàn),主要是因?yàn)樘S表在性能、實(shí)現(xiàn)復(fù)雜度和靈活性等方面具有顯著優(yōu)勢(shì),可以替代平衡樹的數(shù)據(jù)結(jié)構(gòu)。

跳躍表的原理

跳躍表是一種擴(kuò)展的有序鏈表,通過維護(hù)多個(gè)層級(jí)的索引來提高查找效率。每個(gè)節(jié)點(diǎn)包含一個(gè)數(shù)據(jù)元素和一組指向其他節(jié)點(diǎn)的指針,這些指針分布在不同的層級(jí)。最底層的鏈表包含所有元素,而每一層的節(jié)點(diǎn)數(shù)量逐漸減少。這樣,查找操作可以從高層開始,快速跳過不需要的元素,減少遍歷的節(jié)點(diǎn)數(shù),從而提高查找效率。

  • 查找過程:從最高層的頭節(jié)點(diǎn)開始,沿著索引節(jié)點(diǎn)向右查找,如果當(dāng)前節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)的值大于或等于查找的目標(biāo)值,則向下移動(dòng)到下一層繼續(xù)查找;否則,向右移動(dòng)到下一個(gè)索引節(jié)點(diǎn)。這個(gè)過程一直持續(xù)到找到目標(biāo)節(jié)點(diǎn)或到達(dá)最底層鏈表。

  • 插入和刪除操作:跳躍表支持高效的動(dòng)態(tài)插入和刪除,時(shí)間復(fù)雜度均為 O(log n)。插入時(shí),首先找到插入位置,然后根據(jù)隨機(jī)函數(shù)決定新節(jié)點(diǎn)的層數(shù),最后在相應(yīng)的層中插入節(jié)點(diǎn)。

跳躍表的優(yōu)勢(shì)

  1. 簡(jiǎn)單性:跳躍表的實(shí)現(xiàn)相對(duì)簡(jiǎn)單,易于理解和維護(hù),而平衡樹(如紅黑樹)的實(shí)現(xiàn)較為復(fù)雜,容易出錯(cuò)。

  1. 高效的范圍查詢:跳躍表在進(jìn)行范圍查詢時(shí)效率更高,因?yàn)樗怯行虻逆湵?,可以直接遍歷后繼節(jié)點(diǎn),而平衡樹需要中序遍歷,復(fù)雜度更高。

  1. 靈活性:跳躍表的層數(shù)可以根據(jù)需要?jiǎng)討B(tài)調(diào)整,適應(yīng)不同的查詢需求。

  1. 高并發(fā)性能:跳躍表的節(jié)點(diǎn)可以支持多個(gè)線程同時(shí)插入或刪除節(jié)點(diǎn),而平衡樹和哈希表通常需要復(fù)雜的并發(fā)控制。

  1. 空間效率:跳躍表的空間復(fù)雜度為 O(n),并且通過調(diào)整節(jié)點(diǎn)的抽取間隔,可以靈活平衡空間和時(shí)間復(fù)雜度。

正是因?yàn)橛羞@些優(yōu)勢(shì),Redis 選擇使用跳躍表來實(shí)現(xiàn)有序集合,而不是紅黑樹或其他數(shù)據(jù)結(jié)構(gòu)。這使得 Redis 在處理有序集合時(shí)能夠高效地支持插入、刪除和查找操作,同時(shí)保持元素的有序性,非常適合實(shí)現(xiàn)如排行榜、范圍查詢等功能。

為了講明白跳表的原理,現(xiàn)在我們來模擬一個(gè)簡(jiǎn)單的跳表實(shí)現(xiàn)。

在 Java 中模擬實(shí)現(xiàn) Redis 的 Sorted Set 跳表,我們需要定義跳表的數(shù)據(jù)結(jié)構(gòu),包括節(jié)點(diǎn)和跳表本身。以下是一個(gè)簡(jiǎn)單的實(shí)現(xiàn):

import java.util.Random;


public class SkipList {


    private static final double P = 0.5; // 節(jié)點(diǎn)晉升的概率
    private static final int MAX_LEVEL = 16; // 最大層數(shù)
    private int levelCount; // 當(dāng)前跳表的層數(shù)
    private Node head; // 頭節(jié)點(diǎn)
    private int size; // 跳表中元素的數(shù)量
    private Random random; // 隨機(jī)數(shù)生成器


    public SkipList() {
        this.levelCount = 0;
        this.size = 0;
        this.head = new Node(-1, Integer.MIN_VALUE, MAX_LEVEL);
        this.random = new Random();
    }


    // 節(jié)點(diǎn)定義
    private class Node {
        int value;
        int key;
        Node[] forward; // 向前指針數(shù)組


        Node(int value, int key, int level) {
            this.value = value;
            this.key = key;
            this.forward = new Node[level + 1];
        }
    }


    // 隨機(jī)生成節(jié)點(diǎn)的層數(shù)
    private int randomLevel() {
        int level = 0;
        while (random.nextFloat() < P && level < MAX_LEVEL) {
            level++;
        }
        return level;
    }


    // 搜索指定值的節(jié)點(diǎn)
    public Node search(int key) {
        Node current = head;
        for (int i = levelCount - 1; i >= 0; i--) {
            while (current.forward[i] != null && current.forward[i].key < key) {
                current = current.forward[i]; // 沿著當(dāng)前層的指針移動(dòng)
            }
        }
        current = current.forward[0]; // 移動(dòng)到第0層
        return current;
    }


    // 插入節(jié)點(diǎn)
    public void insert(int key, int value) {
        Node[] update = new Node[MAX_LEVEL + 1];
        Node current = head;


        // 查找插入位置
        for (int i = levelCount - 1; i >= 0; i--) {
            while (current.forward[i] != null && current.forward[i].key < key) {
                current = current.forward[i];
            }
            update[i] = current;
        }


        int level = randomLevel(); // 隨機(jī)生成層數(shù)
        if (level > levelCount) {
            levelCount = level;
            for (int i = levelCount - 1; i >= 0; i--) {
                update[i] = head;
            }
        }


        current = new Node(value, key, level);
        for (int i = 0; i < level; i++) {
            current.forward[i] = update[i].forward[i];
            update[i].forward[i] = current;
        }
        size++;
    }


    // 刪除節(jié)點(diǎn)
    public void delete(int key) {
        Node[] update = new Node[MAX_LEVEL + 1];
        Node current = head;


        // 查找要?jiǎng)h除的節(jié)點(diǎn)
        for (int i = levelCount - 1; i >= 0; i--) {
            while (current.forward[i] != null && current.forward[i].key < key) {
                current = current.forward[i];
            }
            update[i] = current;
        }


        current = current.forward[0];
        if (current != null && current.key == key) {
            for (int i = 0; i < levelCount; i++) {
                if (update[i].forward[i] != current) {
                    break;
                }
                update[i].forward[i] = current.forward[i];
            }
            size--;
            while (levelCount > 0 && head.forward[levelCount - 1] == null) {
                levelCount--;
            }
        }
    }


    // 打印跳表
    public void printList() {
        Node current = head.forward[0];
        while (current != null) {
            System.out.println(current.key + ":" + current.value);
            current = current.forward[0];
        }
    }


    // 主函數(shù)測(cè)試跳表
    public static void main(String[] args) {
        SkipList list = new SkipList();
        list.insert(3, 100);
        list.insert(6, 300);
        list.insert(4, 400);
        list.insert(7, 700);
        list.insert(5, 500);
        list.printList();
        list.delete(3);
        list.printList();
    }
}

下面,V 哥來解釋一下:

  1. 跳表節(jié)點(diǎn):每個(gè)節(jié)點(diǎn)包含一個(gè)值、一個(gè)鍵和一個(gè)向前指針數(shù)組。向前指針數(shù)組存儲(chǔ)了指向同一層下一個(gè)節(jié)點(diǎn)的引用。

  1. 隨機(jī)層數(shù):每個(gè)節(jié)點(diǎn)的層數(shù)是根據(jù)預(yù)設(shè)的概率 P 隨機(jī)生成的,這樣可以模擬出不同高度的索引層。

  1. 搜索操作:從最高層開始,沿著當(dāng)前層的指針移動(dòng),直到找到插入點(diǎn)或到達(dá)底層。

  1. 插入操作:首先找到插入位置,然后根據(jù)隨機(jī)生成的層數(shù)創(chuàng)建新節(jié)點(diǎn),并更新每一層的指針。

  1. 刪除操作:找到要?jiǎng)h除的節(jié)點(diǎn),然后逐層更新指針,最后減少跳表的層數(shù)。

  1. 打印跳表:從底層的頭節(jié)點(diǎn)開始,遍歷打印每個(gè)節(jié)點(diǎn)的鍵和值。

從這個(gè)簡(jiǎn)化版的跳表實(shí)現(xiàn)可以看到跳表的基本操作??梢詭椭覀兝斫馓韺?shí)現(xiàn)的原理。

如何在 Java 中實(shí)現(xiàn)跳表的并發(fā)操作?

在 Java 中實(shí)現(xiàn)跳表的并發(fā)操作需要考慮線程安全問題??梢酝ㄟ^以下方法來實(shí)現(xiàn):

  1. 使用同步塊:在每個(gè)公共方法中使用 synchronized 關(guān)鍵字來確保同一時(shí)間只有一個(gè)線程可以執(zhí)行方法。

  1. 使用鎖:使用 ReentrantLock 或其他并發(fā)鎖來控制對(duì)跳表的并發(fā)訪問。

  1. 使用原子引用:使用 AtomicReferenceAtomicReferenceArray 來確保節(jié)點(diǎn)的原子更新。

  1. 使用并發(fā)集合:使用 ConcurrentHashMap 等并發(fā)集合作為輔助工具來實(shí)現(xiàn)線程安全的跳表。

以下是一個(gè)使用 synchronized 關(guān)鍵字實(shí)現(xiàn)線程安全的跳表的示例:

import java.util.Random;


public class ConcurrentSkipList {


    private static final double P = 0.5; // 節(jié)點(diǎn)晉升的概率
    private static final int MAX_LEVEL = 16; // 最大層數(shù)
    private int levelCount; // 當(dāng)前跳表的層數(shù)
    private Node head; // 頭節(jié)點(diǎn)
    private int size; // 跳表中元素的數(shù)量
    private Random random; // 隨機(jī)數(shù)生成器


    public ConcurrentSkipList() {
        this.levelCount = 0;
        this.size = 0;
        this.head = new Node(-1, Integer.MIN_VALUE, MAX_LEVEL);
        this.random = new Random();
    }


    private class Node {
        int value;
        int key;
        Node[] forward; // 向前指針數(shù)組


        Node(int value, int key, int level) {
            this.value = value;
            this.key = key;
            this.forward = new Node[level + 1];
        }
    }


    private int randomLevel() {
        int level = 0;
        while (random.nextFloat() < P && level < MAX_LEVEL) {
            level++;
        }
        return level;
    }


    public synchronized Node search(int key) {
        Node current = head;
        for (int i = levelCount - 1; i >= 0; i--) {
            while (current.forward[i] != null && current.forward[i].key < key) {
                current = current.forward[i];
            }
        }
        current = current.forward[0];
        return current;
    }


    public synchronized void insert(int key, int value) {
        Node[] update = new Node[MAX_LEVEL + 1];
        Node current = head;


        for (int i = levelCount - 1; i >= 0; i--) {
            while (current.forward[i] != null && current.forward[i].key < key) {
                current = current.forward[i];
            }
            update[i] = current;
        }


        int level = randomLevel();
        if (level > levelCount) {
            for (int i = levelCount; i < level; i++) {
                update[i] = head;
            }
            levelCount = level;
        }


        current = new Node(value, key, level);
        for (int i = 0; i < level; i++) {
            current.forward[i] = update[i].forward[i];
            update[i].forward[i] = current;
        }
        size++;
    }


    public synchronized void delete(int key) {
        Node[] update = new Node[MAX_LEVEL + 1];
        Node current = head;


        for (int i = levelCount - 1; i >= 0; i--) {
            while (current.forward[i] != null && current.forward[i].key < key) {
                current = current.forward[i];
            }
            update[i] = current;
        }


        current = current.forward[0];
        if (current != null && current.key == key) {
            for (int i = 0; i < levelCount; i++) {
                if (update[i].forward[i] != current) {
                    break;
                }
                update[i].forward[i] = current.forward[i];
            }
            size--;
            while (levelCount > 0 && head.forward[levelCount - 1] == null) {
                levelCount--;
            }
        }
    }


    public synchronized void printList() {
        Node current = head.forward[0];
        while (current != null) {
            System.out.println(current.key + ":" + current.value);
            current = current.forward[0];
        }
    }


    public static void main(String[] args) {
        ConcurrentSkipList list = new ConcurrentSkipList();
        list.insert(3, 100);
        list.insert(6, 300);
        list.insert(4, 400);
        list.insert(7, 700);
        list.insert(5, 500);
        list.printList();
        list.delete(3);
        list.printList();
    }
}

在這個(gè)示例中,我們使用了 synchronized 關(guān)鍵字來確保 search、insertdelete 方法是線程安全的。這會(huì)鎖定當(dāng)前對(duì)象,確保同一時(shí)間只有一個(gè)線程可以執(zhí)行這些方法。

請(qǐng)注意,雖然 synchronized 可以提供線程安全,但它也可能導(dǎo)致性能瓶頸,特別是在高并發(fā)環(huán)境下。在實(shí)際應(yīng)用中,可以考慮使用更細(xì)粒度的鎖,如 ReentrantLock,或者使用原子引用和其他并發(fā)工具來提高性能。

使用 ReentrantLock 的實(shí)現(xiàn)方式

使用 ReentrantLock 實(shí)現(xiàn)跳表的并發(fā)操作可以提供比 synchronized 更細(xì)粒度的鎖定,從而提高并發(fā)性能。ReentrantLock 允許您在不同的方法中鎖定和解鎖,甚至可以在不同的類中使用同一個(gè)鎖對(duì)象。

以下是使用 ReentrantLock 實(shí)現(xiàn)線程安全的跳表的示例:

import java.util.Random;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;


public class ConcurrentSkipList {


    private static final double P = 0.5; // 節(jié)點(diǎn)晉升的概率
    private static final int MAX_LEVEL = 16; // 最大層數(shù)
    private final Lock lock = new ReentrantLock(); // 創(chuàng)建一個(gè) ReentrantLock 對(duì)象
    private int levelCount; // 當(dāng)前跳表的層數(shù)
    private Node head; // 頭節(jié)點(diǎn)
    private int size; // 跳表中元素的數(shù)量
    private Random random; // 隨機(jī)數(shù)生成器


    public ConcurrentSkipList() {
        this.levelCount = 0;
        this.size = 0;
        this.head = new Node(Integer.MIN_VALUE, MAX_LEVEL);
        this.random = new Random();
    }


    private class Node {
        int value;
        int key;
        Node[] forward; // 向前指針數(shù)組


        Node(int key, int level) {
            this.key = key;
            this.forward = new Node[level + 1];
        }
    }


    private int randomLevel() {
        int level = 0;
        while (random.nextFloat() < P && level < MAX_LEVEL) {
            level++;
        }
        return level;
    }


    public void search(int key) {
        lock.lock(); // 加鎖
        try {
            Node current = head;
            for (int i = levelCount - 1; i >= 0; i--) {
                while (current.forward[i] != null && current.forward[i].key < key) {
                    current = current.forward[i];
                }
            }
            current = current.forward[0];
            // ... 處理找到的節(jié)點(diǎn)
        } finally {
            lock.unlock(); // 釋放鎖
        }
    }


    public void insert(int key, int value) {
        lock.lock(); // 加鎖
        try {
            Node[] update = new Node[MAX_LEVEL + 1];
            Node current = head;
            for (int i = levelCount - 1; i >= 0; i--) {
                while (current.forward[i] != null && current.forward[i].key < key) {
                    current = current.forward[i];
                }
                update[i] = current;
            }


            int level = randomLevel();
            if (level > levelCount) {
                levelCount = level;
            }


            current = new Node(value, key, level);
            for (int i = 0; i < level; i++) {
                current.forward[i] = update[i].forward[i];
                update[i].forward[i] = current;
            }
            size++;
        } finally {
            lock.unlock(); // 釋放鎖
        }
    }


    public void delete(int key) {
        lock.lock(); // 加鎖
        try {
            Node[] update = new Node[MAX_LEVEL + 1];
            Node current = head;
            for (int i = levelCount - 1; i >= 1; i--) {
                while (current.forward[i] != null && current.forward[i].key < key) {
                    current = current.forward[i];
                }
                update[i] = current;
            }


            current = current.forward[0];
            if (current != null && current.key == key) {
                for (int i = 0; i < levelCount; i++) {
                    if (update[i].forward[i] != current) {
                        break;
                    }
                    update[i].forward[i] = current.forward[i];
                }
                size--;
                while (levelCount > 0 && head.forward[levelCount - 1] == null) {
                    levelCount--;
                }
            }
        } finally {
            lock.unlock(); // 釋放鎖
        }
    }


    public void printList() {
        lock.lock(); // 加鎖
        try {
            Node current = head.forward[0];
            while (current != null) {
                System.out.println(current.key + ":" + current.value);
                current = current.forward[0];
            }
        } finally {
            lock.unlock(); // 釋放鎖
        }
    }


    public static void main(String[] args) {
        ConcurrentSkipList list = new ConcurrentSkipList();
        list.insert(3, 100);
        list.insert(6, 300);
        list.insert(4, 400);
        list.insert(7, 700);
        list.insert(5, 500);
        list.printList();
        list.delete(3);
        list.printList();
    }
}

在這個(gè)示例中,我們使用了 ReentrantLock 對(duì)象來控制對(duì)跳表的并發(fā)訪問。每個(gè)公共方法在執(zhí)行之前都會(huì)調(diào)用 lock.lock() 加鎖,并在執(zhí)行完畢后(包括正常執(zhí)行和異常退出)調(diào)用 lock.unlock() 釋放鎖。

使用 ReentrantLock 的好處是它提供了比 synchronized 更靈活的鎖定機(jī)制,例如:

  1. 可中斷的鎖定ReentrantLock 允許線程在嘗試獲取鎖時(shí)被中斷。

  1. 嘗試非阻塞鎖定ReentrantLock 提供了 tryLock() 方法,允許線程嘗試獲取鎖而不無限等待。

  1. 超時(shí)獲取鎖ReentrantLock 還提供了 tryLock(long timeout, TimeUnit unit) 方法,允許線程在指定時(shí)間內(nèi)等待鎖。

  1. 公平鎖ReentrantLock 可以選擇是否使用公平鎖,公平鎖會(huì)按照線程請(qǐng)求鎖的順序來分配鎖。

  1. 多個(gè)條件變量ReentrantLock 可以與多個(gè) Condition 對(duì)象配合使用,而 synchronized 只能與一個(gè)條件變量配合使用。

理解了以上代碼實(shí)現(xiàn)原理后,我們?cè)賮砝斫?Redis Sorted Set 就不難了。

Redis 的 Sorted Set 是一種包含元素和關(guān)聯(lián)分?jǐn)?shù)的數(shù)據(jù)結(jié)構(gòu),它能夠根據(jù)分?jǐn)?shù)對(duì)元素進(jìn)行排序。Sorted Set 在 Redis 中的內(nèi)部實(shí)現(xiàn)可以是跳躍表(Skip List)和字典(Hash Table)的組合,或者是壓縮列表(Zip List),具體使用哪種實(shí)現(xiàn)取決于 Sorted Set 的大小和元素的長(zhǎng)度。

跳躍表 + 字典實(shí)現(xiàn)

當(dāng) Sorted Set 的元素?cái)?shù)量較多或者元素長(zhǎng)度較長(zhǎng)時(shí),Redis 使用跳躍表和字典來實(shí)現(xiàn) Sorted Set。跳躍表是一種概率平衡的數(shù)據(jù)結(jié)構(gòu),它通過多級(jí)索引來提高搜索效率,類似于二分查找。字典則用于快速查找和更新操作。

跳躍表的每個(gè)節(jié)點(diǎn)包含以下信息:

  • 元素(member)
  • 分?jǐn)?shù)(score)
  • 后退指針(backward)
  • 多層前進(jìn)指針(forward),每一層都是一個(gè)有序鏈表

字典則存儲(chǔ)了元素到分?jǐn)?shù)的映射,以便快速訪問。

壓縮列表實(shí)現(xiàn)

當(dāng) Sorted Set 的元素?cái)?shù)量較少(默認(rèn)小于128個(gè)),并且所有元素的長(zhǎng)度都小于64字節(jié)時(shí),Redis 使用壓縮列表來存儲(chǔ) Sorted Set。壓縮列表是一種連續(xù)內(nèi)存塊的順序存儲(chǔ)結(jié)構(gòu),它將所有的元素和分?jǐn)?shù)緊湊地存儲(chǔ)在一起,以節(jié)省內(nèi)存空間。

應(yīng)用場(chǎng)景

Sorted Set 常用于以下場(chǎng)景:

  1. 排行榜:例如游戲中的玩家分?jǐn)?shù)排名。
  2. 范圍查詢:例如獲取一定分?jǐn)?shù)范圍內(nèi)的用戶。
  3. 任務(wù)調(diào)度:例如按照任務(wù)的優(yōu)先級(jí)執(zhí)行。
  4. 實(shí)時(shí)排名:例如股票價(jià)格的實(shí)時(shí)排名。

代碼分析

在 Redis 源碼中,Sorted Set 的實(shí)現(xiàn)主要在 t_zset.c 文件中。插入操作(zaddCommand)會(huì)根據(jù) Sorted Set 的編碼類型(跳躍表或壓縮列表)來執(zhí)行不同的邏輯。如果是跳躍表編碼,那么插入操作會(huì)涉及到字典的更新和跳躍表節(jié)點(diǎn)的添加。如果是壓縮列表編碼,則會(huì)檢查是否需要轉(zhuǎn)換為跳躍表編碼。

總結(jié)

Sorted Set 是 Redis 提供的一種強(qiáng)大的有序數(shù)據(jù)結(jié)構(gòu),它結(jié)合了跳躍表和字典的優(yōu)點(diǎn),提供了高效的插入、刪除、更新和范圍查詢操作。通過合理的使用 Sorted Set,可以有效地解決許多實(shí)際問題。如何以上內(nèi)容對(duì)你有幫助,懇請(qǐng)點(diǎn)贊轉(zhuǎn)發(fā),V 哥在此感謝各位兄弟的支持。88,洗洗睡了。

以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)