Java并发指南14:Java并发容器ConcurrentSkipListMap与CopyOnWriteArrayList

本文涉及的产品
容器镜像服务 ACR,镜像仓库100个 不限时长
简介: 这位大侠,这是我的公众号:程序员江湖。 分享程序员面试与技术的那些事。 干货满满,关注就送。  原文出处http://cmsblogs.com/ 『chenssy』 到目前为止,我们在Java世界里看到了两种实现key-value的数据结构:Hash、TreeMap,这两种数据结构各自都有着优缺点。

原文出处http://cmsblogs.com/ 『chenssy

到目前为止我们在Java世界里看到了两种实现key-value的数据结构Hash、TreeMap这两种数据结构各自都有着优缺点。

  1. Hash表插入、查找最快为O(1)如使用链表实现则可实现无锁数据有序化需要显式的排序操作。
  2. 红黑树插入、查找为O(logn)但常数项较小无锁实现的复杂性很高一般需要加锁数据天然有序。

然而这次介绍第三种实现key-value的数据结构SkipList。SkipList有着不低于红黑树的效率但是其原理和实现的复杂度要比红黑树简单多了。

SkipList

什么是SkipListSkip List 称之为跳表它是一种可以替代平衡树的数据结构其数据元素默认按照key值升序天然有序。Skip list让已排序的数据分布在多层链表中以0-1随机数决定一个数据的向上攀升与否通过“空间来换取时间”的一个算法在每个节点中增加了向前的指针在插入、删除、查找时可以忽略一些不可能涉及到的结点从而提高了效率。

我们先看一个简单的链表如下

1499585893174201707090001

如果我们需要查询9、21、30则需要比较次数为3 + 6 + 8 = 17 次那么有没有优化方案呢有我们将该链表中的某些元素提炼出来作为一个比较“索引”如下

1499586063109201707090002

我们先与这些索引进行比较来决定下一个元素是往右还是下走由于存在“索引”的缘故导致在检索的时候会大大减少比较的次数。当然元素不是很多很难体现出优势当元素足够多的时候这种索引结构就会大显身手。

SkipList的特性

SkipList具备如下特性

  1. 由很多层结构组成level是通过一定的概率随机产生的
  2. 每一层都是一个有序的链表默认是升序也可以根据创建映射时所提供的Comparator进行排序具体取决于使用的构造方法
  3. 最底层(Level 1)的链表包含所有元素
  4. 如果一个元素出现在Level i 的链表中则它在Level i 之下的链表也都会出现
  5. 每个节点包含两个指针一个指向同一链表中的下一个元素一个指向下面一层的元素

我们将上图再做一些扩展就可以变成一个典型的SkipList结构了

1499590828559201707090003

SkipList的查找

SkipListd的查找算法较为简单对于上面我们我们要查找元素21其过程如下

  1. 比较3大于往后找9
  2. 比9大继续往后找25但是比25小则从9的下一层开始找16
  3. 16的后面节点依然为25则继续从16的下一层找
  4. 找到21

如图

201707090004

红色虚线代表路径。

SkipList的插入

SkipList的插入操作主要包括

  1. 查找合适的位置。这里需要明确一点就是在确认新节点要占据的层次K时采用丢硬币的方式完全随机。如果占据的层次K大于链表的层次则重新申请新的层否则插入指定层次
  2. 申请新的节点
  3. 调整指针

假定我们要插入的元素为23经过查找可以确认她是位于25后9、16、21前。当然需要考虑申请的层次K。

如果层次K > 3

需要申请新层次Level 4

201707090005

如果层次 K = 2

直接在Level 2 层插入即可

201707090006

这里会涉及到以个算法通过丢硬币决定层次K该算法我们通过后面ConcurrentSkipListMap源码来分析。还有一个需要注意的地方就是在K层插入元素后需要确保所有小于K层的层次都应该出现新节点。

SkipList的删除

删除节点和插入节点思路基本一致找到节点删除节点调整指针。

比如删除节点9如下

201707090007

ConcurrentSkipListMap

通过上面我们知道SkipList采用空间换时间的算法其插入和查找的效率O(logn)其效率不低于红黑树但是其原理和实现的复杂度要比红黑树简单多了。一般来说会操作链表List就会对SkipList毫无压力。

ConcurrentSkipListMap其内部采用SkipLis数据结构实现。为了实现SkipListConcurrentSkipListMap提供了三个内部类来构建这样的链表结构Node、Index、HeadIndex。其中Node表示最底层的单链表有序节点、Index表示为基于Node的索引层HeadIndex用来维护索引层次。到这里我们可以这样说ConcurrentSkipListMap是通过HeadIndex维护索引层次通过Index从最上层开始往下层查找一步一步缩小查询范围最后到达最底层Node时就只需要比较很小一部分数据了。在JDK中的关系如下图

201707090008

** Node **

    static final class Node<K,V> {
        final K key;
        volatile Object value;
        volatile ConcurrentSkipListMap.Node<K, V> next;
        
        /** 省略些许代码 */
    }static final class Node<K,V> {
        final K key;
        volatile Object value;
        volatile ConcurrentSkipListMap.Node<K, V> next;
        
        /** 省略些许代码 */
    }

Node的结构和一般的单链表毫无区别key-value和一个指向下一个节点的next。

Index

    static class Index<K,V> {
        final ConcurrentSkipListMap.Node<K,V> node;
        final ConcurrentSkipListMap.Index<K,V> down;
        volatile ConcurrentSkipListMap.Index<K,V> right;

        /** 省略些许代码 */
    }static class Index<K,V> {
        final ConcurrentSkipListMap.Node<K,V> node;
        final ConcurrentSkipListMap.Index<K,V> down;
        volatile ConcurrentSkipListMap.Index<K,V> right;

        /** 省略些许代码 */
    }

Index提供了一个基于Node节点的索引Node一个指向下一个Index的right一个指向下层的down节点。

HeadIndex

    static final class HeadIndex<K,V> extends Index<K,V> {
        final int level;  //索引层从1开始Node单链表层为0
        HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
            super(node, down, right);
            this.level = level;
        }
    }static final class HeadIndex<K,V> extends Index<K,V> {
        final int level;  //索引层从1开始Node单链表层为0
        HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
            super(node, down, right);
            this.level = level;
        }
    }

HeadIndex内部就一个level来定义层级。

ConcurrentSkipListMap提供了四个构造函数每个构造函数都会调用initialize()方法进行初始化工作。

    final void initialize() {
        keySet = null;
        entrySet = null;
        values = null;
        descendingMap = null;
        randomSeed = seedGenerator.nextInt() | 0x0100; // ensure nonzero
        head = new ConcurrentSkipListMap.HeadIndex<K,V>(new ConcurrentSkipListMap.Node<K,V>(null, BASE_HEADER, null),
                null, null, 1);
    }final void initialize() {
        keySet = null;
        entrySet = null;
        values = null;
        descendingMap = null;
        randomSeed = seedGenerator.nextInt() | 0x0100; // ensure nonzero
        head = new ConcurrentSkipListMap.HeadIndex<K,V>(new ConcurrentSkipListMap.Node<K,V>(null, BASE_HEADER, null),
                null, null, 1);
    }

注意initialize()方法不仅仅只在构造函数中被调用如cloneclear、readObject时都会调用该方法进行初始化步骤。这里需要注意randomSeed的初始化。

 private transient int randomSeed;
 randomSeed = seedGenerator.nextInt() | 0x0100; // ensure nonzeroprivate transient int randomSeed;
 randomSeed = seedGenerator.nextInt() | 0x0100; // ensure nonzero

randomSeed一个简单的随机数生成器在后面介绍。

put操作

CoucurrentSkipListMap提供了put()方法用于将指定值与此映射中的指定键关联。源码如下

    public V put(K key, V value) {
        if (value == null)
            throw new NullPointerException();
        return doPut(key, value, false);
    }public V put(K key, V value) {
        if (value == null)
            throw new NullPointerException();
        return doPut(key, value, false);
    }

首先判断value如果为null则抛出NullPointerException否则调用doPut方法其实如果各位看过JDK的源码的话应该对这样的操作很熟悉了JDK源码里面很多方法都是先做一些必要性的验证后然后通过调用do**()方法进行真正的操作。

doPut()方法内容较多我们分步分析。

    private V doPut(K key, V value, boolean onlyIfAbsent) {
        Node<K,V> z;             // added node
        if (key == null)
            throw new NullPointerException();
        // 比较器
        Comparator<? super K> cmp = comparator;
        outer: for (;;) {
            for (Node<K, V> b = findPredecessor(key, cmp), n = b.next; ; ) {

            /** 省略代码 */private V doPut(K key, V value, boolean onlyIfAbsent) {
        Node<K,V> z;             // added node
        if (key == null)
            throw new NullPointerException();
        // 比较器
        Comparator<? super K> cmp = comparator;
        outer: for (;;) {
            for (Node<K, V> b = findPredecessor(key, cmp), n = b.next; ; ) {

            /** 省略代码 */

doPut()方法有三个参数除了key,value外还有一个boolean类型的onlyIfAbsent该参数作用与如果存在当前key时该做何动作。当onlyIfAbsent为false时替换value为true时则返回该value。用代码解释为

   if (!map.containsKey(key))
      return map.put(key, value);
  else
       return map.get(key);if (!map.containsKey(key))
      return map.put(key, value);
  else
       return map.get(key);

首先判断key是否为null如果为null则抛出NullPointerException从这里我们可以确认ConcurrentSkipList是不支持key或者value为null的。然后调用findPredecessor()方法传入key来确认位置。findPredecessor()方法其实就是确认key要插入的位置。

   private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
        if (key == null)
            throw new NullPointerException(); // don't postpone errors
        for (;;) {
            // 从head节点开始head是level最高级别的headIndex
            for (Index<K,V> q = head, r = q.right, d;;) {

                // r != null表示该节点右边还有节点需要比较
                if (r != null) {
                    Node<K,V> n = r.node;
                    K k = n.key;
                    // value == null表示该节点已经被删除了
                    // 通过unlink()方法过滤掉该节点
                    if (n.value == null) {
                        //删掉r节点
                        if (!q.unlink(r))
                            break;           // restart
                        r = q.right;         // reread r
                        continue;
                    }

                    // value != null节点存在
                    // 如果key 大于r节点的key 则往前进一步
                    if (cpr(cmp, key, k) > 0) {
                        q = r;
                        r = r.right;
                        continue;
                    }
                }

                // 到达最右边如果dowm == null表示指针已经达到最下层了直接返回该节点
                if ((d = q.down) == null)
                    return q.node;
                q = d;
                r = d.right;
            }
        }
    }private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
        if (key == null)
            throw new NullPointerException(); // don't postpone errors
        for (;;) {
            // 从head节点开始head是level最高级别的headIndex
            for (Index<K,V> q = head, r = q.right, d;;) {

                // r != null表示该节点右边还有节点需要比较
                if (r != null) {
                    Node<K,V> n = r.node;
                    K k = n.key;
                    // value == null表示该节点已经被删除了
                    // 通过unlink()方法过滤掉该节点
                    if (n.value == null) {
                        //删掉r节点
                        if (!q.unlink(r))
                            break;           // restart
                        r = q.right;         // reread r
                        continue;
                    }

                    // value != null节点存在
                    // 如果key 大于r节点的key 则往前进一步
                    if (cpr(cmp, key, k) > 0) {
                        q = r;
                        r = r.right;
                        continue;
                    }
                }

                // 到达最右边如果dowm == null表示指针已经达到最下层了直接返回该节点
                if ((d = q.down) == null)
                    return q.node;
                q = d;
                r = d.right;
            }
        }
    }

findPredecessor()方法意思非常明确寻找前辈。从最高层的headIndex开始向右一步一步比较直到right为null或者右边节点的Node的key大于当前key为止然后再向下寻找依次重复该过程直到down为null为止即找到了前辈看返回的结果注意是Node不是Item所以插入的位置应该是最底层的Node链表。

在这个过程中ConcurrentSkipListMap赋予了该方法一个其他的功能就是通过判断节点的value是否为null如果为null表示该节点已经被删除了通过调用unlink()方法删除该节点。

        final boolean unlink(Index<K,V> succ) {
            return node.value != null && casRight(succ, succ.right);
        }final boolean unlink(Index<K,V> succ) {
            return node.value != null && casRight(succ, succ.right);
        }

删除节点过程非常简单更改下right指针即可。

通过findPredecessor()找到前辈节点后做什么呢看下面

 for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
        // 前辈节点的next != null
        if (n != null) {
            Object v; int c;
            Node<K,V> f = n.next;

            // 不一致读主要原因是并发有节点捷足先登
            if (n != b.next)               // inconsistent read
                break;

            // n.value == null该节点已经被删除了
            if ((v = n.value) == null) {   // n is deleted
                n.helpDelete(b, f);
                break;
            }

            // 前辈节点b已经被删除
            if (b.value == null || v == n) // b is deleted
                break;

            // 节点大于往前移
            if ((c = cpr(cmp, key, n.key)) > 0) {
                b = n;
                n = f;
                continue;
            }

            // c == 0 表示找到一个key相等的节点根据onlyIfAbsent参数来做判断
            // onlyIfAbsent ==false则通过casValue替换value
            // onlyIfAbsent == true返回该value
            if (c == 0) {
                if (onlyIfAbsent || n.casValue(v, value)) {
                    @SuppressWarnings("unchecked") V vv = (V)v;
                    return vv;
                }
                break; // restart if lost race to replace value
            }
            // else c < 0; fall through
        }

        // 将key-value包装成一个node插入
        z = new Node<K,V>(key, value, n);
        if (!b.casNext(n, z))
            break;         // restart if lost race to append to b
        break outer;
    }for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
        // 前辈节点的next != null
        if (n != null) {
            Object v; int c;
            Node<K,V> f = n.next;

            // 不一致读主要原因是并发有节点捷足先登
            if (n != b.next)               // inconsistent read
                break;

            // n.value == null该节点已经被删除了
            if ((v = n.value) == null) {   // n is deleted
                n.helpDelete(b, f);
                break;
            }

            // 前辈节点b已经被删除
            if (b.value == null || v == n) // b is deleted
                break;

            // 节点大于往前移
            if ((c = cpr(cmp, key, n.key)) > 0) {
                b = n;
                n = f;
                continue;
            }

            // c == 0 表示找到一个key相等的节点根据onlyIfAbsent参数来做判断
            // onlyIfAbsent ==false则通过casValue替换value
            // onlyIfAbsent == true返回该value
            if (c == 0) {
                if (onlyIfAbsent || n.casValue(v, value)) {
                    @SuppressWarnings("unchecked") V vv = (V)v;
                    return vv;
                }
                break; // restart if lost race to replace value
            }
            // else c < 0; fall through
        }

        // 将key-value包装成一个node插入
        z = new Node<K,V>(key, value, n);
        if (!b.casNext(n, z))
            break;         // restart if lost race to append to b
        break outer;
    }

找到合适的位置后就是在该位置插入节点咯。插入节点的过程比较简单就是将key-value包装成一个Node然后通过casNext()方法加入到链表当中。当然是插入之前需要进行一系列的校验工作。

在最下层插入节点后下一步工作是什么新建索引。前面博主提过在插入节点的时候会根据采用抛硬币的方式来决定新节点所插入的层次由于存在并发的可能ConcurrentSkipListMap采用ThreadLocalRandom来生成随机数。如下

int rnd = ThreadLocalRandom.nextSecondarySeed(); rnd = ThreadLocalRandom.nextSecondarySeed();

抛硬币决定层次的思想很简单就是通过抛硬币如果硬币为正面则层次level + 1 否则停止如下

            // 抛硬币决定层次
            while (((rnd >>>= 1) & 1) != 0)
                ++level;// 抛硬币决定层次
            while (((rnd >>>= 1) & 1) != 0)
                ++level;

在阐述SkipList插入节点的时候说明了决定的层次level会分为两种情况进行处理一是如果层次level大于最大的层次话则需要新增一层否则就在相应层次以及小于该level的层次进行节点新增处理。

level <= headIndex.level

            // 如果决定的层次level比最高层次head.level小直接生成最高层次的index
            // 由于需要确认每一层次的down所以需要从最下层依次往上生成
            if (level <= (max = h.level)) {
                for (int i = 1; i <= level; ++i)
                    idx = new ConcurrentSkipListMap.Index<K,V>(z, idx, null);
            }// 如果决定的层次level比最高层次head.level小直接生成最高层次的index
            // 由于需要确认每一层次的down所以需要从最下层依次往上生成
            if (level <= (max = h.level)) {
                for (int i = 1; i <= level; ++i)
                    idx = new ConcurrentSkipListMap.Index<K,V>(z, idx, null);
            }

 

从底层开始小于level的每一层都初始化一个index每次的node都指向新加入的nodedown指向下一层的item右侧next全部为null。整个处理过程非常简单为小于level的每一层初始化一个index然后加入到原来的index链条中去。

level > headIndex.level

           // leve > head.level 则新增一层
            else { // try to grow by one level
                // 新增一层
                level = max + 1;

                // 初始化 level个item节点
                @SuppressWarnings("unchecked")
                ConcurrentSkipListMap.Index<K,V>[] idxs =
                        (ConcurrentSkipListMap.Index<K,V>[])new ConcurrentSkipListMap.Index<?,?>[level+1];
                for (int i = 1; i <= level; ++i)
                    idxs[i] = idx = new ConcurrentSkipListMap.Index<K,V>(z, idx, null);

                //
                for (;;) {
                    h = head;
                    int oldLevel = h.level;
                    // 层次扩大了需要重新开始有新线程节点加入
                    if (level <= oldLevel) // lost race to add level
                        break;
                    // 新的头结点HeadIndex
                    ConcurrentSkipListMap.HeadIndex<K,V> newh = h;
                    ConcurrentSkipListMap.Node<K,V> oldbase = h.node;
                    // 生成新的HeadIndex节点该HeadIndex指向新增层次
                    for (int j = oldLevel+1; j <= level; ++j)
                        newh = new ConcurrentSkipListMap.HeadIndex<K,V>(oldbase, newh, idxs[j], j);

                    // HeadIndex CAS替换
                    if (casHead(h, newh)) {
                        h = newh;
                        idx = idxs[level = oldLevel];
                        break;
                    }
                }// leve > head.level 则新增一层
            else { // try to grow by one level
                // 新增一层
                level = max + 1;

                // 初始化 level个item节点
                @SuppressWarnings("unchecked")
                ConcurrentSkipListMap.Index<K,V>[] idxs =
                        (ConcurrentSkipListMap.Index<K,V>[])new ConcurrentSkipListMap.Index<?,?>[level+1];
                for (int i = 1; i <= level; ++i)
                    idxs[i] = idx = new ConcurrentSkipListMap.Index<K,V>(z, idx, null);

                //
                for (;;) {
                    h = head;
                    int oldLevel = h.level;
                    // 层次扩大了需要重新开始有新线程节点加入
                    if (level <= oldLevel) // lost race to add level
                        break;
                    // 新的头结点HeadIndex
                    ConcurrentSkipListMap.HeadIndex<K,V> newh = h;
                    ConcurrentSkipListMap.Node<K,V> oldbase = h.node;
                    // 生成新的HeadIndex节点该HeadIndex指向新增层次
                    for (int j = oldLevel+1; j <= level; ++j)
                        newh = new ConcurrentSkipListMap.HeadIndex<K,V>(oldbase, newh, idxs[j], j);

                    // HeadIndex CAS替换
                    if (casHead(h, newh)) {
                        h = newh;
                        idx = idxs[level = oldLevel];
                        break;
                    }
                }

当抛硬币决定的level大于最大层次level时需要新增一层进行处理。处理逻辑如下

  1. 初始化一个对应的index数组大小为level + 1然后为每个单位都创建一个index个中参数为Node为新增的Zdown为下一层indexright为null
  2. 通过for循环来进行扩容操作。从最高层进行处理新增一个HeadIndex个中参数节点Nodedown都为最高层的Node和HeadIndexright为刚刚创建的对应层次的indexlevel为相对应的层次level。最后通过CAS把当前的head与新加入层的head进行替换。

通过上面步骤我们发现尽管已经找到了前辈节点也将node插入了也确定确定了层次并生成了相应的Index但是并没有将这些Index插入到相应的层次当中所以下面的代码就是将index插入到相对应的层当中。

  // 从插入的层次level开始
    splice: for (int insertionLevel = level;;) {
        int j = h.level;
        //  从headIndex开始
        for (ConcurrentSkipListMap.Index<K,V> q = h, r = q.right, t = idx;;) {
            if (q == null || t == null)
                break splice;

            // r != null这里是找到相应层次的插入节点位置注意这里只横向找
            if (r != null) {
                ConcurrentSkipListMap.Node<K,V> n = r.node;

                int c = cpr(cmp, key, n.key);

                // n.value == null 解除关系r右移
                if (n.value == null) {
                    if (!q.unlink(r))
                        break;
                    r = q.right;
                    continue;
                }

                // key > n.key 右移
                if (c > 0) {
                    q = r;
                    r = r.right;
                    continue;
                }
            }

            // 上面找到节点要插入的位置这里就插入
            // 当前层是最顶层
            if (j == insertionLevel) {
                // 建立联系
                if (!q.link(r, t))
                    break; // restart
                if (t.node.value == null) {
                    findNode(key);
                    break splice;
                }
                // 标志的插入层 -- 如果== 0 表示已经到底了插入完毕退出循环
                if (--insertionLevel == 0)
                    break splice;
            }

            // 上面节点已经插入完毕了插入下一个节点
            if (--j >= insertionLevel && j < level)
                t = t.down;
            q = q.down;
            r = q.right;
        }
    }// 从插入的层次level开始
    splice: for (int insertionLevel = level;;) {
        int j = h.level;
        //  从headIndex开始
        for (ConcurrentSkipListMap.Index<K,V> q = h, r = q.right, t = idx;;) {
            if (q == null || t == null)
                break splice;

            // r != null这里是找到相应层次的插入节点位置注意这里只横向找
            if (r != null) {
                ConcurrentSkipListMap.Node<K,V> n = r.node;

                int c = cpr(cmp, key, n.key);

                // n.value == null 解除关系r右移
                if (n.value == null) {
                    if (!q.unlink(r))
                        break;
                    r = q.right;
                    continue;
                }

                // key > n.key 右移
                if (c > 0) {
                    q = r;
                    r = r.right;
                    continue;
                }
            }

            // 上面找到节点要插入的位置这里就插入
            // 当前层是最顶层
            if (j == insertionLevel) {
                // 建立联系
                if (!q.link(r, t))
                    break; // restart
                if (t.node.value == null) {
                    findNode(key);
                    break splice;
                }
                // 标志的插入层 -- 如果== 0 表示已经到底了插入完毕退出循环
                if (--insertionLevel == 0)
                    break splice;
            }

            // 上面节点已经插入完毕了插入下一个节点
            if (--j >= insertionLevel && j < level)
                t = t.down;
            q = q.down;
            r = q.right;
        }
    }

这段代码分为两部分看一部分是找到相应层次的该节点插入的位置第二部分在该位置插入然后下移。

至此ConcurrentSkipListMap的put操作到此就结束了。代码量有点儿多这里总结下

  1. 首先通过findPredecessor()方法找到前辈节点Node
  2. 根据返回的前辈节点以及key-value新建Node节点同时通过CAS设置next
  3. 设置节点Node再设置索引节点。采取抛硬币方式决定层次如果所决定的层次大于现存的最大层次则新增一层然后新建一个Item链表。
  4. 最后将新建的Item链表插入到SkipList结构中。

get操作

相比于put操作 get操作会简单很多其过程其实就只相当于put操作的第一步

  private V doGet(Object key) {
        if (key == null)
            throw new NullPointerException();
        Comparator<? super K> cmp = comparator;
        outer: for (;;) {
            for (ConcurrentSkipListMap.Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
                Object v; int c;
                if (n == null)
                    break outer;
                ConcurrentSkipListMap.Node<K,V> f = n.next;
                if (n != b.next)                // inconsistent read
                    break;
                if ((v = n.value) == null) {    // n is deleted
                    n.helpDelete(b, f);
                    break;
                }
                if (b.value == null || v == n)  // b is deleted
                    break;
                if ((c = cpr(cmp, key, n.key)) == 0) {
                    @SuppressWarnings("unchecked") V vv = (V)v;
                    return vv;
                }
                if (c < 0)
                    break outer;
                b = n;
                n = f;
            }
        }
        return null;
    }private V doGet(Object key) {
        if (key == null)
            throw new NullPointerException();
        Comparator<? super K> cmp = comparator;
        outer: for (;;) {
            for (ConcurrentSkipListMap.Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
                Object v; int c;
                if (n == null)
                    break outer;
                ConcurrentSkipListMap.Node<K,V> f = n.next;
                if (n != b.next)                // inconsistent read
                    break;
                if ((v = n.value) == null) {    // n is deleted
                    n.helpDelete(b, f);
                    break;
                }
                if (b.value == null || v == n)  // b is deleted
                    break;
                if ((c = cpr(cmp, key, n.key)) == 0) {
                    @SuppressWarnings("unchecked") V vv = (V)v;
                    return vv;
                }
                if (c < 0)
                    break outer;
                b = n;
                n = f;
            }
        }
        return null;
    }

与put操作第一步相似首先调用findPredecessor()方法找到前辈节点然后顺着right一直往右找即可同时在这个过程中同样承担了一个删除value为null的节点的职责。

remove操作

remove操作为删除指定key节点如下

    public V remove(Object key) {
        return doRemove(key, null);
    }public V remove(Object key) {
        return doRemove(key, null);
    }

直接调用doRemove()方法这里remove有两个参数一个是key另外一个是value所以doRemove方法即提供remove key也提供同时满足key-value。

 final V doRemove(Object key, Object value) {
        if (key == null)
            throw new NullPointerException();
        Comparator<? super K> cmp = comparator;
        outer: for (;;) {
            for (ConcurrentSkipListMap.Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
                Object v; int c;
                if (n == null)
                    break outer;
                ConcurrentSkipListMap.Node<K,V> f = n.next;

                // 不一致读重新开始
                if (n != b.next)                    // inconsistent read
                    break;

                // n节点已删除
                if ((v = n.value) == null) {        // n is deleted
                    n.helpDelete(b, f);
                    break;
                }

                // b节点已删除
                if (b.value == null || v == n)      // b is deleted
                    break;

                if ((c = cpr(cmp, key, n.key)) < 0)
                    break outer;

                // 右移
                if (c > 0) {
                    b = n;
                    n = f;
                    continue;
                }

                /*
                 * 找到节点
                 */

                // value != null 表示需要同时校验key-value值
                if (value != null && !value.equals(v))
                    break outer;

                // CAS替换value
                if (!n.casValue(v, null))
                    break;
                if (!n.appendMarker(f) || !b.casNext(n, f))
                    findNode(key);                  // retry via findNode
                else {
                    // 清理节点
                    findPredecessor(key, cmp);      // clean index

                    // head.right == null表示该层已经没有节点删掉该层
                    if (head.right == null)
                        tryReduceLevel();
                }
                @SuppressWarnings("unchecked") V vv = (V)v;
                return vv;
            }
        }
        return null;
    }final V doRemove(Object key, Object value) {
        if (key == null)
            throw new NullPointerException();
        Comparator<? super K> cmp = comparator;
        outer: for (;;) {
            for (ConcurrentSkipListMap.Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
                Object v; int c;
                if (n == null)
                    break outer;
                ConcurrentSkipListMap.Node<K,V> f = n.next;

                // 不一致读重新开始
                if (n != b.next)                    // inconsistent read
                    break;

                // n节点已删除
                if ((v = n.value) == null) {        // n is deleted
                    n.helpDelete(b, f);
                    break;
                }

                // b节点已删除
                if (b.value == null || v == n)      // b is deleted
                    break;

                if ((c = cpr(cmp, key, n.key)) < 0)
                    break outer;

                // 右移
                if (c > 0) {
                    b = n;
                    n = f;
                    continue;
                }

                /*
                 * 找到节点
                 */

                // value != null 表示需要同时校验key-value值
                if (value != null && !value.equals(v))
                    break outer;

                // CAS替换value
                if (!n.casValue(v, null))
                    break;
                if (!n.appendMarker(f) || !b.casNext(n, f))
                    findNode(key);                  // retry via findNode
                else {
                    // 清理节点
                    findPredecessor(key, cmp);      // clean index

                    // head.right == null表示该层已经没有节点删掉该层
                    if (head.right == null)
                        tryReduceLevel();
                }
                @SuppressWarnings("unchecked") V vv = (V)v;
                return vv;
            }
        }
        return null;
    }

调用findPredecessor()方法找到前辈节点然后通过右移然后比较找到后利用CAS把value替换为null然后判断该节点是不是这层唯一的index如果是的话调用tryReduceLevel()方法把这层干掉完成删除。

其实从这里可以看出remove方法仅仅是把Node的value设置null并没有真正删除该节点Node其实从上面的put操作、get操作我们可以看出他们在寻找节点的时候都会判断节点的value是否为null如果为null则调用unLink()方法取消关联关系如下

                    if (n.value == null) {
                        if (!q.unlink(r))
                            break;           // restart
                        r = q.right;         // reread r
                        continue;
                    }if (n.value == null) {
                        if (!q.unlink(r))
                            break;           // restart
                        r = q.right;         // reread r
                        continue;
                    }

size操作

ConcurrentSkipListMap的size()操作和ConcurrentHashMap不同它并没有维护一个全局变量来统计元素的个数所以每次调用该方法的时候都需要去遍历。

    public int size() {
        long count = 0;
        for (Node<K,V> n = findFirst(); n != null; n = n.next) {
            if (n.getValidValue() != null)
                ++count;
        }
        return (count >= Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) count;
    }public int size() {
        long count = 0;
        for (Node<K,V> n = findFirst(); n != null; n = n.next) {
            if (n.getValidValue() != null)
                ++count;
        }
        return (count >= Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) count;
    }

调用findFirst()方法找到第一个Node然后利用node的next去统计。最后返回统计数据最多能返回Integer.MAX_VALUE。注意这里在线程并发下是安全的。

ConcurrentSkipListMap过程其实不复杂相比于ConcurrentHashMap而言是简单的不能再简单了。对跳表SkipList熟悉的话ConcurrentSkipListMap 应该是盘中餐了。

Java并发编程并发容器之CopyOnWriteArrayList转载

 

  原文链接

  http://ifeve.com/java-copy-on-write/

  

  Copy-On-Write简称COW是一种用于程序设计中的优化策略。其基本思路是从一开始大家都在共享同一个内容当某个人想要修改这个内容的时候才会真正把内容Copy出去形成一个新的内容然后再改这是一种延时懒惰策略。从JDK1.5开始Java并发包里提供了两个使用CopyOnWrite机制实现的并发容器,它们是CopyOnWriteArrayList和CopyOnWriteArraySet。CopyOnWrite容器非常有用可以在非常多的并发场景中使用到。

什么是CopyOnWrite容器

  CopyOnWrite容器即写时复制的容器。通俗的理解是当我们往一个容器添加元素的时候不直接往当前容器添加而是先将当前容器进行Copy复制出一个新的容器然后新的容器里添加元素添加完元素之后再将原容器的引用指向新的容器。这样做的好处是我们可以对CopyOnWrite容器进行并发的读而不需要加锁因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想读和写不同的容器。

CopyOnWriteArrayList的实现原理

  在使用CopyOnWriteArrayList之前我们先阅读其源码了解下它是如何实现的。以下代码是向CopyOnWriteArrayList中add方法的实现向CopyOnWriteArrayList里添加元素可以发现在添加的时候是需要加锁的否则多线程写的时候会Copy出N个副本出来。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

/**

     * Appends the specified element to the end of this list.

     *

     * @param e element to be appended to this list

     * @return <tt>true</tt> (as specified by {@link Collection#add})

     */

    public boolean add(E e) {

    final ReentrantLock lock = this.lock;

    lock.lock();

    try {

        Object[] elements = getArray();

        int len = elements.length;

        Object[] newElements = Arrays.copyOf(elements, len + 1);

        newElements[len] = e;

        setArray(newElements);

        return true;

    finally {

        lock.unlock();

    }

    }

   读的时候不需要加锁如果读的时候有多个线程正在向CopyOnWriteArrayList添加数据读还是会读到旧的数据因为写的时候不会锁住旧的CopyOnWriteArrayList。

1

2

3

public E get(int index) {

    return get(getArray(), index);

}

   JDK中并没有提供CopyOnWriteMap我们可以参考CopyOnWriteArrayList来实现一个基本代码如下

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

import java.util.Collection;

import java.util.Map;

import java.util.Set;

 

public class CopyOnWriteMap<K, V> implements Map<K, V>, Cloneable {

    private volatile Map<K, V> internalMap;

 

    public CopyOnWriteMap() {

        internalMap = new HashMap<K, V>();

    }

 

    public V put(K key, V value) {

 

        synchronized (this) {

            Map<K, V> newMap = new HashMap<K, V>(internalMap);

            V val = newMap.put(key, value);

            internalMap = newMap;

            return val;

        }

    }

 

    public V get(Object key) {

        return internalMap.get(key);

    }

 

    public void putAll(Map<? extends K, ? extends V> newData) {

        synchronized (this) {

            Map<K, V> newMap = new HashMap<K, V>(internalMap);

            newMap.putAll(newData);

            internalMap = newMap;

        }

    }

}

   实现很简单只要了解了CopyOnWrite机制我们可以实现各种CopyOnWrite容器并且在不同的应用场景中使用。

CopyOnWrite的应用场景

  CopyOnWrite并发容器用于读多写少的并发场景。比如白名单黑名单商品类目的访问和更新场景假如我们有一个搜索网站用户在这个网站的搜索框中输入关键字搜索内容但是某些关键字不允许被搜索。这些不能被搜索的关键字会被放在一个黑名单当中黑名单每天晚上更新一次。当用户搜索时会检查当前关键字在不在黑名单当中如果在则提示不能搜索。实现代码如下

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

package com.ifeve.book;

 

import java.util.Map;

 

import com.ifeve.book.forkjoin.CopyOnWriteMap;

 

/**

 * 黑名单服务

 *

 * @author fangtengfei

 *

 */

public class BlackListServiceImpl {

 

    private static CopyOnWriteMap<String, Boolean> blackListMap = new CopyOnWriteMap<String, Boolean>(

            1000);

 

    public static boolean isBlackList(String id) {

        return blackListMap.get(id) == null false true;

    }

 

    public static void addBlackList(String id) {

        blackListMap.put(id, Boolean.TRUE);

    }

 

    /**

     * 批量添加黑名单

     *

     * @param ids

     */

    public static void addBlackList(Map<String,Boolean> ids) {

        blackListMap.putAll(ids);

    }

 

}

   代码很简单但是使用CopyOnWriteMap需要注意两件事情

  1. 减少扩容开销。根据实际需要初始化CopyOnWriteMap的大小避免写时CopyOnWriteMap扩容的开销。

  2. 使用批量添加。因为每次添加容器每次都会进行复制所以减少添加次数可以减少容器的复制次数。如使用上面代码里的addBlackList方法。

CopyOnWrite的缺点

  CopyOnWrite容器有很多优点但是同时也存在两个问题即内存占用问题和数据一致性问题。所以在开发的时候需要注意一下。

  内存占用问题。因为CopyOnWrite的写时复制机制所以在进行写操作的时候内存里会同时驻扎两个对象的内存旧的对象和新写入的对象注意:在复制的时候只是复制容器里的引用只是在写的时候会创建新对象添加到新容器里而旧容器的对象还在使用所以有两份对象内存。如果这些对象占用的内存比较大比如说200M左右那么再写入100M数据进去内存就会占用300M那么这个时候很有可能造成频繁的Yong GC和Full GC。之前我们系统中使用了一个服务由于每晚使用CopyOnWrite机制更新大对象造成了每晚15秒的Full GC应用响应时间也随之变长。

  针对内存占用问题可以通过压缩容器中的元素的方法来减少大对象的内存消耗比如如果元素全是10进制的数字可以考虑把它压缩成36进制或64进制。或者不使用CopyOnWrite容器而使用其他的并发容器如ConcurrentHashMap

  数据一致性问题。CopyOnWrite容器只能保证数据的最终一致性不能保证数据的实时一致性。所以如果你希望写入的的数据马上能读到请不要使用CopyOnWrite容器。

  

  下面这篇文章验证了CopyOnWriteArrayList和同步容器的性能

  http://blog.csdn.net/wind5shy/article/details/5396887

  下面这篇文章简单描述了CopyOnWriteArrayList的使用

  http://blog.csdn.net/imzoer/article/details/9751591

 

 å¨è¿éæå¥å¾çæè¿°

相关文章
|
13天前
|
缓存 安全 Java
Java并发编程进阶:深入理解Java内存模型
【4月更文挑战第6天】Java内存模型(JMM)是多线程编程的关键,定义了线程间共享变量读写的规则,确保数据一致性和可见性。主要包括原子性、可见性和有序性三大特性。Happens-Before原则规定操作顺序,内存屏障和锁则保障这些原则的实施。理解JMM和相关机制对于编写线程安全、高性能的Java并发程序至关重要。
|
2天前
|
存储 安全 Java
Java中的容器,线程安全和线程不安全
Java中的容器,线程安全和线程不安全
10 1
|
2天前
|
Java 开发者
Java中多线程并发控制的实现与优化
【4月更文挑战第17天】 在现代软件开发中,多线程编程已成为提升应用性能和响应能力的关键手段。特别是在Java语言中,由于其平台无关性和强大的运行时环境,多线程技术的应用尤为广泛。本文将深入探讨Java多线程的并发控制机制,包括基本的同步方法、死锁问题以及高级并发工具如java.util.concurrent包的使用。通过分析多线程环境下的竞态条件、资源争夺和线程协调问题,我们提出了一系列实现和优化策略,旨在帮助开发者构建更加健壮、高效的多线程应用。
3 0
|
3天前
|
存储 缓存 安全
Java并发基础之互斥同步、非阻塞同步、指令重排与volatile
在Java中,多线程编程常常涉及到共享数据的访问,这时候就需要考虑线程安全问题。Java提供了多种机制来实现线程安全,其中包括互斥同步(Mutex Synchronization)、非阻塞同步(Non-blocking Synchronization)、以及volatile关键字等。 互斥同步(Mutex Synchronization) 互斥同步是一种基本的同步手段,它要求在任何时刻,只有一个线程可以执行某个方法或某个代码块,其他线程必须等待。Java中的synchronized关键字就是实现互斥同步的常用手段。当一个线程进入一个synchronized方法或代码块时,它需要先获得锁,如果
21 0
|
11天前
|
存储 缓存 安全
【企业级理解】高效并发之Java内存模型
【企业级理解】高效并发之Java内存模型
|
18天前
|
安全 Java
Java中的多线程并发控制
在Java中,多线程是实现并发执行任务的一种重要方式。然而,随着多个线程同时访问共享资源,可能会导致数据不一致和其他并发问题。因此,了解并掌握Java中的多线程并发控制机制显得尤为重要。本文将深入探讨Java的多线程并发控制,包括synchronized关键字、Lock接口、Semaphore类以及CountDownLatch类等,并通过实例代码演示其使用方法和注意事项。
12 2
|
23天前
|
缓存 NoSQL Java
Java项目:支持并发的秒杀项目(基于Redis)
Java项目:支持并发的秒杀项目(基于Redis)
25 0
|
24天前
|
算法 安全 Java
Java中的并发编程:理解并发性能优化
在当今软件开发领域,多核处理器的普及使得并发编程变得更加重要。本文将深入探讨Java中的并发编程,介绍并发性能优化的关键技术,帮助开发人员更好地利用多核处理器提升应用程序性能。
|
1月前
|
Java 容器
Java常用组件、容器与布局
Java常用组件、容器与布局
14 0
|
1月前
|
安全 Java API
Java并发 - J.U.C并发容器类 list、set、queue
Queue API 阻塞是通过 condition 来实现的,可参考 Java 并发 - Lock 接口 ArrayBlockingQueue 阻塞 LinkedBlockingQueue 阻塞 ArrayQueue 非阻塞 LinkedQueue 非阻塞