Java并发48:并发集合系列-基于CAS算法的非阻塞双向无界队列ConcurrentLinkedDueue

news/2024/7/4 7:43:45

[超级链接:Java并发学习系列-绪论]
[系列序章:Java并发43:并发集合系列-序章]


原文地址:https://www.jianshu.com/p/602b3240afaf

ConcurrentLinkedDeque 双向链表结构的无界并发队列,从JDK 7开始加入到J.U.C的行列中,使用CAS实现并发安全。

与 ConcurrentLinkedQueue 的区别是该阻塞队列同时支持FIFO和FILO两种操作方式,即可以从队列的头和尾同时操作(插入/删除)

ConcurrentLinkedDeque 适合“多生产,多消费”的场景。

内存一致性遵循:对 ConcurrentLinkedDeque 的插入操作先行发生于(happen-before)访问或移除操作。

相较于 ConcurrentLinkedQueue,ConcurrentLinkedDeque 由于是双端队列,所以在操作和概念上会更加复杂,来一起看下。

概述

ConcurrentLinkedDeque(后面称CLD) 的实现方式继承了 ConcurrentLinkedQueue 和 LinkedTransferQueue的思想。

ConcurrentLinkedDeque 在非阻塞算法的实现方面与 ConcurrentLinkedQueue 基本一致。

关于 ConcurrentLinkedQueue,请参考笔者的上一篇文章:Java并发47:并发集合系列-基于CAS算法的非阻塞单向无界队列ConcurrentLinkedQueue。

数据结构

这里写图片描述

重要属性:

//头节点
private transient volatile Node<E> head;
//尾节点
private transient volatile Node<E> tail;
//终止节点
private static final Node<Object> PREV_TERMINATOR, NEXT_TERMINATOR;
//移除节点时更新链表属性的阀值
private static final int HOPS = 2;

和ConcurrentLinkedQueue一样,CLD 内部也只维护了head和tail属性,对 head/tail 节点也使用了“不变性”和“可变性”约束,不过跟 ConcurrentLinkedQueue 有些许差异,我们来看一下:

head/tail 的不变性:

  • 第一个节点总是能以O(1)的时间复杂度从 head 通过 prev 链接到达;
  • 最后一个节点总是能以O(1)的时间复杂度从 tail 通过 next 链接到达;
  • 所有live节点(item不为null的节点),都能从第一个节点通过调用 succ() 方法遍历可达;
  • 所有live节点(item不为null的节点),都能从最后一个节点通过调用 pred() 方法遍历可达;
  • head/tail 不能为 null;
  • head 节点的 next 域不能引用到自身;
  • head/tail 不会是GC-unlinked节点(但它可能是unlink节点)。

head/tail的可变性:

  • head/tail 节点的 item 域可能为 null,也可能不为 null;
  • head/tail 节点可能从first/last/tail/head 节点访问时不可达;
  • tail 节点的 next 域可以引用到自身。

除此之外,再来看看CLD中另外两个属性:

  • PREV_TERMINATOR:prev的终止节点,next指向自身,即PREV_TERMINATOR.next = PREV_TERMINATOR。在 first 节点出列后,会把first.next指向自身(first.next=first),然后把prev设为PREV_TERMINATOR。
  • NEXT_TERMINATOR:next的终止节点,prev指向自身,即NEXT_TERMINATOR.pre = NEXT_TERMINATOR。在 last 节点出列后,会把last.prev指向自身(last.prev=last),然后把next设为NEXT_TERMINATOR。

添加(入列)

CLD的添加方法包括:offer(E)、add(E)、push(E)、addFirst(E)、addLast(E)、offerFirst(E)、offerLast(E)

所有这些操作都是通过linkFirst(E)或linkLast(E)来实现的。

linkFirst(E) / linkLast(E)

/**
 * Links e as first element.
 */
private void linkFirst(E e) {
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);

    restartFromHead:
    for (;;)
        //从head节点往前寻找first节点
        for (Node<E> h = head, p = h, q;;) {
            if ((q = p.prev) != null &&
                (q = (p = q).prev) != null)
                // Check for head updates every other hop.
                // If p == q, we are sure to follow head instead.
                //如果head被修改,返回head重新查找
                p = (h != (h = head)) ? h : q;
            else if (p.next == p) // 自链接节点,重新查找
                continue restartFromHead;
            else {
                // p is first node
                newNode.lazySetNext(p); // CAS piggyback
                if (p.casPrev(null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this deque,
                    // and for newNode to become "live".
                    if (p != h) // hop two nodes at a time 跳两个节点时才修改head
                        casHead(h, newNode);  // Failure is OK.
                    return;
                }
                // Lost CAS race to another thread; re-read prev
            }
        }
}

/**
 * Links e as last element.
 */
private void linkLast(E e) {
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);

    restartFromTail:
    for (;;)
        //从tail节点往后寻找last节点
        for (Node<E> t = tail, p = t, q;;) {
            if ((q = p.next) != null &&
                (q = (p = q).next) != null)
                // Check for tail updates every other hop.
                // If p == q, we are sure to follow tail instead.
                //如果tail被修改,返回tail重新查找
                p = (t != (t = tail)) ? t : q;
            else if (p.prev == p) // 自链接节点,重新查找
                continue restartFromTail;
            else {
                // p is last node
                newNode.lazySetPrev(p); // CAS piggyback
                if (p.casNext(null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this deque,
                    // and for newNode to become "live".
                    if (p != t) // hop two nodes at a time 跳两个节点时才修改tail
                        casTail(t, newNode);  // Failure is OK.
                    return;
                }
                // Lost CAS race to another thread; re-read next
            }
        }
}

说明:

linkFirst是插入新节点到队列头的主函数,执行流程如下:

  • 首先从 head 节点开始向前循环找到 first 节点(p.prev==null&&p.next!=p);
  • 然后通过lazySetNext设置新节点的 next 节点为 first;
  • 然后 CAS 修改 first 的 prev 为新节点。

注意这里 CAS 指令成功后会判断 first 节点是否已经跳了两个节点,只有在跳了两个节点才会 CAS 更新 head,这也是为了节省 CAS 指令执行开销。

linkLast是插入新节点到队列尾,执行流程与linkFirst一致,不多赘述,具体见源码。


获取(出列)

CLD的获取方法分两种:

  • 获取节点:peek、peekFirst 、peekLast、getFirst、getLast,都是通过peekFirst 、peekLast实现。
  • 获取并移除节点: poll、pop、remove、pollFirst、pollLast、removeFirst、removeLast,都是通过pollFirst、pollLast实现。

pollFirst、pollLast包括了peekFirst 、peekLast的实现,都是找到并返回 first/last 节点。

不同的是,pollFirst、pollLastpeekFirst 、peekLast多了unlink这一步。

所以这里我们只对pollFirst和pollLast两个方法进行解析。

首先来看一下pollFirst()

/**获取并移除队列首节点*/
public E pollFirst() {
    for (Node<E> p = first(); p != null; p = succ(p)) {
        E item = p.item;
        if (item != null && p.casItem(item, null)) {
            unlink(p);
            return item;
        }
    }
    return null;
}

说明:

pollFirst()用于找到链表中首个 item 不为 null 的节点,并返回节点的item

注意并不是first节点,因为first节点的item可以为null

涉及的内部方法较多,不过都很简单,我们通过穿插代码方式分析:

1.首先通过first()方法找到first节点,first 节点必须为 active 节点(p.prev==null&&p.next!=p)。first()源码如下:

Node<E> first() {
    restartFromHead:
    for (;;)
        //从head开始往前找
        for (Node<E> h = head, p = h, q;;) {
            if ((q = p.prev) != null &&
                (q = (p = q).prev) != null)
                // Check for head updates every other hop.
                // If p == q, we are sure to follow head instead.
                //如果head被修改则返回新的head重新查找,否则继续向前(prev)查找
                p = (h != (h = head)) ? h : q;
            else if (p == h
                     // It is possible that p is PREV_TERMINATOR,
                     // but if so, the CAS is guaranteed to fail.
                    //找到的节点不是head节点,CAS修改head
                     || casHead(h, p))
                return p;
            else
                continue restartFromHead;
        }
}

2.如果first.item==null(这里是允许的,具体见上面我们对 first/last 节点的介绍),则继续调用succ方法寻找后继节点。succ源码如下:

/**返回指定节点的的后继节点,如果指定节点的next指向自己,返回first节点*/
final Node<E> succ(Node<E> p) {
    // TODO: should we skip deleted nodes here?
    Node<E> q = p.next;
    return (p == q) ? first() : q;
}

3.CAS 修改节点的 item 为 null(即 “逻辑删除-logical deletion”),然后调用unlink(p)方法解除节点链接,最后返回 item。

小结

本章与 ConcurrentLinkedQueue 一篇中的非阻塞算法基本一致,只是为双端操作定义了几个可供操作的节点类型


http://www.niftyadmin.cn/n/2815559.html

相关文章

jsp-EL表达式简介

表达式语言&#xff08;Expression Language&#xff0c;EL&#xff09;,EL表达式是用"${}"括起来的脚本&#xff0c;用来更方便的读取对象&#xff01; EL表达式主要用来读取数据&#xff0c;进行内容的显示&#xff01; 使用EL表达式可以方便地读取对象中的属性、提…

Java并发49:并发集合系列-基于独占锁+链表实现的单向阻塞无界队列LinkedBlockingQueue

[超级链接&#xff1a;Java并发学习系列-绪论] [系列序章&#xff1a;Java并发43:并发集合系列-序章] 原文地址&#xff1a;http://www.importnew.com/25583.html 一、前言 前面介绍了使用CAS实现的非阻塞队列ConcurrentLinkedQueue&#xff0c;下面就来介绍下使用独占锁实现…

正式入驻CSDN博客

正式入驻CSDN博客&#xff0c;以前新浪博客文章会逐步搬过来&#xff0c;敬请期待。

在linux环境下重启oracle数据库,解决密码过期的问题

&#xff08;1&#xff09; 以oracle身份登录数据库&#xff0c;命令&#xff1a;su – oracle &#xff08;2&#xff09; 进入Sqlplus控制台&#xff0c;命令&#xff1a;sqlplus /nolog &#xff08;3&#xff09; 以系统管理员登录&#xff0c;命令&#xff1a;connect /as…

Java并发50:并发集合系列-基于独占锁实现的双向阻塞队列LinkedBlockingDeque

[超级链接&#xff1a;Java并发学习系列-绪论] [系列序章&#xff1a;Java并发43:并发集合系列-序章] 原文地址&#xff1a;http://ifeve.com/concurrent-collections-3/ 关于与LinkedBlockingDeque类似的单向队列LinkedBlockingQueue可以参考&#xff1a;Java并发49 使用阻…

hibernatenbsp;注解配置一对多关系

从Hibernate 2.5开始就可以使用annotation实现实体关系的映射了&#xff0c;减少了配置hbm文件的繁琐&#xff0c;而且annotation也是一种趋势&#xff0c;现在的SSH2的整合都是完全可以用annotation来实现。在以前实现一对多关联的关联式都是使用hbm文件&#xff0c;今天我们来…

React 作为一个 UI 运行时(一、宿主环境和渲染器)

很多教程把React介绍为一个UI框架。这很合理因为它就是一个UI库&#xff0c;这就是react标语的意思。这篇文章不会叫你任何关于建立用户界面的知识&#xff0c;但是会帮助你更生层次的理解React编程模型。这是一篇深入解析的文章&#xff0c;对初学者不太适合。在这篇文章我将通…

Java并发51:并发集合系列-基于独占锁+数组实现的单向阻塞有界队列ArrayBlockingQueue

[超级链接&#xff1a;Java并发学习系列-绪论] [系列序章&#xff1a;Java并发43:并发集合系列-序章] 原文地址&#xff1a;http://www.importnew.com/25566.html 一、 前言 上节介绍了无界链表方式的阻塞队列LinkedBlockingQueue&#xff0c;本节来研究下有界使用数组方式实…