背景
我们知道hashmap是一个线程不安全的数据结构,在多线程编程的时候,多个线程同时向hashmap中put元素的时候,会发生数据丢失。多线程put操作后,再get操作导致死循环。
多线程put非NULL元素后,get操作得到NULL值。
使用
为了保证并发安全,我们使用hashmap的时候,建议是使用ConcurrentHashMap。
底层原理
1.7的时候,底层数据结构是大数组Segment(容量为16)和小数组HashEntry。默认是16个Segmement,每个HashEntry会存放一些键值对或者链表。
Segment继承了可重入锁,有加锁和释放锁的操作,这样就能保证多个线程访问ConcurrentHashMap的时候,同一时间只能有一个线程能够操作相应的节点,保证了线程安全。
性能相对于hashtable而言,效率提高了16倍。即当线程访问一个Segment 的时候,只对这一个Segment加锁,对于其他段的Segment,则可以继续被其他线程访问,不会有冲突。
1.8的时候,底层数据结构更新为数组+链表+红黑树。1.8不再有分段锁的设计,而是采用CAS和synchrionzed来保证并发安全。
CAS主要是用于put的过程中进行初始化,synchronzied主要是用于往map中插入元素的时候保证线程安全。
采用CAS自旋重试的方式进行初始化,是为了保证只有一个线程完成map的初始化问题,因为多个线程同时初始化,会产生数据丢失的问题。这边使用CAS原子操作,通过修改sizeCtl变量成功与否来代表是否抢占到了锁。如果抢占到了,由该线程完成map的初始化工作;
如果没抢占到,那就进行While循环,自旋重试,直到该map初始化成功,循环自动退出,自旋也随之结束。
初始化结束以后,对于一个put(key, val)操作,首先计算出该key的哈希值,从而得到该key在数组中的插入位置。
首先使用CAS的方式插入key-val, 先判断该位置是否为null, 为null, CAS插入元素。如果成功,则该key-val成功插入;
如果不为null,说明发生了碰撞,改用synchronized关键字,对头节点加锁,然后将key-val插入链表或者红黑树。
插入前,判断是链表还是红黑树,不同的结构处理方式不同;
如果是链表,那么从链表的头节点开始向下遍历,遍历的每个节点:
使用equlas判断key相等否,相等,则修改该key 的value; 否则,把当前的key value插入链表的最后一个节点。
如果是红黑树,那么putTreeVal完成值的存储。
要说明的是,这种锁控制在单个数据节点上,16位的数组可以支持16个线程并发写入数据。
源码
public V put(K key, V value) {return putVal(key, value, false);}/** Implementation for put and putIfAbsent */final V putVal(K key, V value, boolean onlyIfAbsent) {//1、数据检查if (key == null || value == null) throw new NullPointerException();//2、求key哈希int hash = spread(key.hashCode());int binCount = 0; //记录遍历的节点数,可以用于判断是否要链表转化为红黑树for (Node<K,V>[] tab = table;;) { //死循环Node<K,V> f; int n, i, fh;if (tab == null || (n = tab.length) == 0) //检查 table 是否初始化tab = initTable();//使用哈希值计算索引 i 并检查该位置是否为空。如果为空,使用 CAS 操作插入新节点,并跳出循环。else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))break; // no lock when adding to empty bin}else if ((fh = f.hash) == MOVED) // MOVEDd标志用于判断是否已经节点迁移//当一个桶(bin)中的所有节点都被迁移到新的数组中后,原来的位置上会放置一个特殊的转发节点,表示这个桶已经处理完毕。此时,转发节点的 hash 字段会被设置为 MOVED(即 -1)。tab = helpTransfer(tab, f);//协助迁移else { //如果碰撞了 需要使用synchronized,放弃cas,f是table那个碰撞节点V oldVal = null;synchronized (f) {if (tabAt(tab, i) == f) { // 经典的双重检查,防止当前线程获取table锁之前,tabAt(tab, i)被其它线程改变了if (fh >= 0) {// 哈希值>=0代表是链表,<0代表是红黑树binCount = 1;for (Node<K,V> e = f;; ++binCount) {K ek;if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) { // 三比较,hashcode==hashcode,key==key,key.equals(key)oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}Node<K,V> pred = e;if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key,value, null);break;}}}else if (f instanceof TreeBin) { // 红黑树Node<K,V> p;binCount = 2; //binCount 被初始化为 2,因为红黑树中的节点数计算方式不同于链表。 具体原因我不知道if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}if (binCount != 0) { // 判断是否要扩容 TREEIFY_THRESHOLD=8if (binCount >= TREEIFY_THRESHOLD)treeifyBin(tab, i);if (oldVal != null)return oldVal;break;}}}addCount(1L, binCount); //计数 里面通过cas维护元素个数。return null;}