并发容器、阻塞队列

2017/2/12 23:39 下午 posted in  并发编程  

1. 并发容器

1.1 并发容器种类

jdk 1.5 引入了并发容器,同之前的同步容器相比,主要解决了2个问题:
1. 根据具体场景,尽量避免synchronized,提供并发性
2. 定义了一些并发安全的复合操作,保证并发环境下的迭代不会出错(但未必每次看到的都是最新的数据)

  • ConcurrentHashMap:替代同步Map(Collections.synchronized(new HashMap())),分段锁;增加了原子复合操作,如putIfAbsent()、replace();
  • CopyOnWriteArrayList:替代List;迭代过程保证不出错,除了加锁,另一种方法是克隆容器对象;
  • CopyOnWriteArraySet:替代Set;
  • ConcurrentLinkedQueue:先进先出队列,非阻塞;
  • ConcurrentSkipListMap:替代SoredMap(Collections.synchronizedMap(new TreeMap()));
  • ConcurrentSkipListSet:替代SoredSet(Collections.synchronizedSet(new TreeMap()));

1.2 ConcurrentHashMap

ConcurrentHashMap内部结构

ConcurrentHashMap结构

结构:Segment数组+HashEntry链表数组,Segment是一种可重入锁ReentrantLock。
读操作:定位1个元素需要2次Hash操作,第一次Hash定位到Segment,第二次Hash定位到元素所在链表的头部。
写操作:只对元素所在的Segment加锁,如需扩容,则只对Segment扩容。

Segment结构。

static final class Segment<K,V> extends ReentrantLock implements Serializable {
    transient volatile int count; // Segment中元素的数量
    transient int modCount; // 对table大小造成影响的操作数量(如put、remove)
    transient int threshold; // 阈值,Segment中元素超过该阈值后会对Segment扩容
    transient volatile HashEntry<K,V>[] table; // 链表数组,每个元素代表了一个链表的头部
    final float loadFactor; // 负载因子,用于确定threshold
}

HashEntry结构。

static final class HashEntry<K,V> {
    final K key;
    final int hash;
    volatile V value;
    final HashEntry<K,V> next;
}

初始化

Segment、HashEntry的个数都是2的n次方个,便于执行Hash计算。

get操作

get操作的高效之处在于整个get过程不需要加锁,除非读到的值是空的才会加锁重读。不加锁的原因是get方法中使用的共享变量都是volatile的,保证线程间的可见性。

ConcurrentHashMap的get方法、segmentFor方法。

public V get(Object key) {
  int hash = hash(key.hashCode()); // 对hashCode再哈希,为了减少哈希冲突
  return segmentFor(hash).get(key, hash);
}
final Segment<K,V> segmentFor(int hash) {
    return segments[(hash >>> segmentShift) & segmentMask];
}

Segment的get方法。

V get(Object key, int hash) {
    if (count != 0) { // read-volatile
        HashEntry<K,V> e = getFirst(hash);
        while (e != null) {
            if (e.hash == hash && key.equals(e.key)) {
                V v = e.value;
                if (v != null)
                    return v;
                return readValueUnderLock(e); // recheck 读到空值,加锁重读
            }
            e = e.next;
        }
    }
    return null;
}

put操作

需要加锁完成。如果Segment中的元素数量超过了阈值,需要对Segment扩容,再进行rehash。如果找到要更新的元素,则更新其value,否则生成一个新的HashEntry加到Segment头部。

remove操作

先确定要删除的元素位置,不过删除的方式不是将待删除元素前面一个元素的next指向后一个元素,因为HashEntry中的next是final的,一经赋值不可修改。所以定位到待删除元素的位置后,将待删除元素前的元素复制一遍,然后一个个重新接到链表上。

remove元素前

如上图,现在要删除元素3,删除后链表如下图。
remove元素后

// All entries following removed node can stay
// in list, but all preceding ones need to be
// cloned.
++modCount;
HashEntry<K,V> newFirst = e.next;
for (HashEntry<K,V> p = first; p != e; p = p.next)
    newFirst = new HashEntry<K,V>(p.key, p.hash, newFirst, p.value);
tab[index] = newFirst;

size操作

不加锁统计2遍各个Segment的大小,如果2次统计modCount没有发生变化,则返回第一次统计值,否则加锁统计一次所有Segment的大小。

参考

Java并发编程:并发容器之ConcurrentHashMap(转载)
聊聊并发(四)深入分析ConcurrentHashMap

1.3 CopyOnWriteArrayList

读的时候不加锁,写的时候加锁,用于读多写少的并发场景。

回顾2个常识:
1. Java中“=”操作只是将引用与对象关联,线程A将线程B中的引用指向另一个对象,他们之间不会发生ConcurrentModificationException。
2. Java中2个引用指向同一个对象,若引用A指向另一个对象,则不会影响引用B,引用B仍然指向原来的对象。

复制数组使用的是Arrays.copyOf方法,其内部调用的System.arraycopy的native方法。对于数组元素为引用类型的,则只在新数组中将元素引用指向旧数组相应的元素;对于非引用类型,则会将旧数组中的元素复制到新数组。参考StackOverflow上的回答

注意点:
1. 减少写时扩容开销,根据实际需要初始化CopyOnWriteMap的大小
2. 使用批量添加,因为每次添加容器都会进行复制

缺点:
1. 内存占用问题
2. 数据一致性问题

参考

Java并发编程:并发容器之CopyOnWriteArrayList(转载)
java CopyOnWriteArrayList的使用

2. 阻塞队列

非阻塞队列

PriorityQueue、LinkedList。

主要方法:
add(E e):插入队尾。成功,返回true,失败(队列已满),抛异常;
remove():移除队首元素。成功,返回true,失败(队列空),抛异常;
offer(E e):插入队尾。成功,返回true,失败(队列已满),返回false;
poll():移除并获取队首元素。成功,返回队首元素,失败返回null;
peek:获取队首元素。成功,返回队首元素,失败返回null。

建议使用offer、poll、peek,因为可以根据返回值判断操作是否成功。非阻塞队列中的方法都没有进行同步措施。

阻塞队列

ArrayBlockingQueue:基于数组,先进先出,有界。创建时需指定容量大小,并且可指定公平性非公平性。
LinkedBlockingQueue:基于链表,先进先出,有界。创建时若不指定容量大小,则默认大小为Integer.MAX_VALUE。
PriorityBlockingQueue:按照元素优先级排序,并按照优先级出队。无界(即容量无上限)。
DelayQueue:基于PriorityQueue,延时阻塞队列,无界。无界阻塞队列,插入数据时永远不会被阻塞,只有取数据时才会被阻塞。

主要方法:
包含了非阻塞队列的5个方法,且均进行了同步措施。除此之外,还提供了以下4个方法:
put(E e):向队尾存入元素,若队列满,则等待;
take():取队首元素,若队列空,则等待;
offer(E e, long timeout, TimeUnit unit):向队尾存入元素,若队列满,则等待一定时间,若超时仍未成功,则返回false,否则返回true;
poll(long timeout, TimeUnit unit):取队首元素,若队列空,则等待一定时间,若超时仍未取到,则返回null,否则返回取得的元素。

阻塞队列原理

以ArrayBlockingQueue为例,

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
 
    private static final long serialVersionUID = -817911632652898426L;
     
    /** The queued items  */
    private final E[] items;
    /** items index for next take, poll or remove */
    private int takeIndex;
    /** items index for next put, offer, or add. */
    private int putIndex;
    /** Number of items in the queue */
    private int count;
     
    /*
    * Concurrency control uses the classic two-condition algorithm
    * found in any textbook.
    */
     
    /** Main lock guarding all access */
    private final ReentrantLock lock;
    /** Condition for waiting takes */
    private final Condition notEmpty;
    /** Condition for waiting puts */
    private final Condition notFull;
}

参考

Java并发编程:阻塞队列