1. 并发容器
1.1 并发容器种类
jdk 1.5 引入了并发容器,同之前的同步容器相比,主要解决了2个问题:
1. 根据具体场景,尽量避免synchronized,提供并发性
2. 定义了一些并发安全的复合操作,保证并发环境下的迭代不会出错(但未必每次看到的都是最新的数据)
- ConcurrentHashMap:替代同步Map(Collections.synchronized(new HashMap())),分段锁;增加了原子复合操作,如putIfAbsent()、replace();
- CopyOnWriteArrayList:替代List;迭代过程保证不出错,除了加锁,另一种方法是克隆容器对象;
- CopyOnWriteArraySet:替代Set;
- ConcurrentLinkedQueue:先进先出队列,非阻塞;
- ConcurrentSkipListMap:替代SoredMap(Collections.synchronizedMap(new TreeMap()));
- ConcurrentSkipListSet:替代SoredSet(Collections.synchronizedSet(new TreeMap()));
1.2 ConcurrentHashMap
ConcurrentHashMap内部结构
结构:Segment数组+HashEntry链表数组,Segment是一种可重入锁ReentrantLock。
读操作:定位1个元素需要2次Hash操作,第一次Hash定位到Segment,第二次Hash定位到元素所在链表的头部。
写操作:只对元素所在的Segment加锁,如需扩容,则只对Segment扩容。
Segment结构。
static final class Segment<K,V> extends ReentrantLock implements Serializable {
transient volatile int count; // Segment中元素的数量
transient int modCount; // 对table大小造成影响的操作数量(如put、remove)
transient int threshold; // 阈值,Segment中元素超过该阈值后会对Segment扩容
transient volatile HashEntry<K,V>[] table; // 链表数组,每个元素代表了一个链表的头部
final float loadFactor; // 负载因子,用于确定threshold
}
HashEntry结构。
static final class HashEntry<K,V> {
final K key;
final int hash;
volatile V value;
final HashEntry<K,V> next;
}
初始化
Segment、HashEntry的个数都是2的n次方个,便于执行Hash计算。
get操作
get操作的高效之处在于整个get过程不需要加锁,除非读到的值是空的才会加锁重读。不加锁的原因是get方法中使用的共享变量都是volatile的,保证线程间的可见性。
ConcurrentHashMap的get方法、segmentFor方法。
public V get(Object key) {
int hash = hash(key.hashCode()); // 对hashCode再哈希,为了减少哈希冲突
return segmentFor(hash).get(key, hash);
}
final Segment<K,V> segmentFor(int hash) {
return segments[(hash >>> segmentShift) & segmentMask];
}
Segment的get方法。
V get(Object key, int hash) {
if (count != 0) { // read-volatile
HashEntry<K,V> e = getFirst(hash);
while (e != null) {
if (e.hash == hash && key.equals(e.key)) {
V v = e.value;
if (v != null)
return v;
return readValueUnderLock(e); // recheck 读到空值,加锁重读
}
e = e.next;
}
}
return null;
}
put操作
需要加锁完成。如果Segment中的元素数量超过了阈值,需要对Segment扩容,再进行rehash。如果找到要更新的元素,则更新其value,否则生成一个新的HashEntry加到Segment头部。
remove操作
先确定要删除的元素位置,不过删除的方式不是将待删除元素前面一个元素的next指向后一个元素,因为HashEntry中的next是final的,一经赋值不可修改。所以定位到待删除元素的位置后,将待删除元素前的元素复制一遍,然后一个个重新接到链表上。
如上图,现在要删除元素3,删除后链表如下图。
// All entries following removed node can stay
// in list, but all preceding ones need to be
// cloned.
++modCount;
HashEntry<K,V> newFirst = e.next;
for (HashEntry<K,V> p = first; p != e; p = p.next)
newFirst = new HashEntry<K,V>(p.key, p.hash, newFirst, p.value);
tab[index] = newFirst;
size操作
不加锁统计2遍各个Segment的大小,如果2次统计modCount没有发生变化,则返回第一次统计值,否则加锁统计一次所有Segment的大小。
参考
Java并发编程:并发容器之ConcurrentHashMap(转载)
聊聊并发(四)深入分析ConcurrentHashMap
1.3 CopyOnWriteArrayList
读的时候不加锁,写的时候加锁,用于读多写少的并发场景。
回顾2个常识:
1. Java中“=”操作只是将引用与对象关联,线程A将线程B中的引用指向另一个对象,他们之间不会发生ConcurrentModificationException。
2. Java中2个引用指向同一个对象,若引用A指向另一个对象,则不会影响引用B,引用B仍然指向原来的对象。
复制数组使用的是Arrays.copyOf方法,其内部调用的System.arraycopy的native方法。对于数组元素为引用类型的,则只在新数组中将元素引用指向旧数组相应的元素;对于非引用类型,则会将旧数组中的元素复制到新数组。参考StackOverflow上的回答
注意点:
1. 减少写时扩容开销,根据实际需要初始化CopyOnWriteMap的大小
2. 使用批量添加,因为每次添加容器都会进行复制
缺点:
1. 内存占用问题
2. 数据一致性问题
参考
Java并发编程:并发容器之CopyOnWriteArrayList(转载)
java CopyOnWriteArrayList的使用
2. 阻塞队列
非阻塞队列
PriorityQueue、LinkedList。
主要方法:
add(E e):插入队尾。成功,返回true,失败(队列已满),抛异常;
remove():移除队首元素。成功,返回true,失败(队列空),抛异常;
offer(E e):插入队尾。成功,返回true,失败(队列已满),返回false;
poll():移除并获取队首元素。成功,返回队首元素,失败返回null;
peek:获取队首元素。成功,返回队首元素,失败返回null。
建议使用offer、poll、peek,因为可以根据返回值判断操作是否成功。非阻塞队列中的方法都没有进行同步措施。
阻塞队列
ArrayBlockingQueue:基于数组,先进先出,有界。创建时需指定容量大小,并且可指定公平性非公平性。
LinkedBlockingQueue:基于链表,先进先出,有界。创建时若不指定容量大小,则默认大小为Integer.MAX_VALUE。
PriorityBlockingQueue:按照元素优先级排序,并按照优先级出队。无界(即容量无上限)。
DelayQueue:基于PriorityQueue,延时阻塞队列,无界。无界阻塞队列,插入数据时永远不会被阻塞,只有取数据时才会被阻塞。
主要方法:
包含了非阻塞队列的5个方法,且均进行了同步措施。除此之外,还提供了以下4个方法:
put(E e):向队尾存入元素,若队列满,则等待;
take():取队首元素,若队列空,则等待;
offer(E e, long timeout, TimeUnit unit):向队尾存入元素,若队列满,则等待一定时间,若超时仍未成功,则返回false,否则返回true;
poll(long timeout, TimeUnit unit):取队首元素,若队列空,则等待一定时间,若超时仍未取到,则返回null,否则返回取得的元素。
阻塞队列原理
以ArrayBlockingQueue为例,
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private static final long serialVersionUID = -817911632652898426L;
/** The queued items */
private final E[] items;
/** items index for next take, poll or remove */
private int takeIndex;
/** items index for next put, offer, or add. */
private int putIndex;
/** Number of items in the queue */
private int count;
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
/** Main lock guarding all access */
private final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
}