《并发编程的艺术》p36:JMM不保证64位的long型和double型变量的写操作具有原子性。
使用ConcourrentHashMap
面试中可能经常会被问到HashMap和HashTable的区别,其中最重要的就是前者并不是线程安全的,但其实在高并发的情形下,后者的效率低的不像话甚至不可用,所以在jdk7之后出现了线程高效且安全的ConcurrentHashMap。
当并发严重时,某线程若是调用了同步方法,另外的线程将进入阻塞/轮询状态,既不能put也不能get,但ConcurrentHashMap是不同的,它采用了锁的分段技术,将数据分段存储,不同的数据持有不同的锁,这样可用性会大大高于HashTable,所以在实际开发中我们都用ConcurrentHashMap取代HashTable。
CAS、乐观锁与自旋
通常在自主开发并发代码中,习惯于直接使用悲观锁实现线程安全,譬如synchronized在任何情况下都会上锁。但是若并发量不高,这一举措会好散很多不必要的性能,这时采用CAS算法或许是最佳的选择,CAS,即compareAndSet(或称compareAndSwap),来自于sun.misc.Unsafe类,是基于系统的native方法,在此我们只讨论它的作用:比较指定位置的内存位置和预设原值的位置是否一致,如一致则赋值(认为并没有其他线程干扰),反之直接返回,先看一段代码(代码来自ReentrantLock公平所加锁最底层的tryAcquire方法,参数为重入次数状态位):
protected final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //state是volatile变量 if (c == 0) { if (isFirst(current)&&compareAndSetState(0, acquires)) { //选择等待队列最前的线程进行CAS比较,符合结果为真为真,就加锁 setExclusiveOwnerThread(current); return true; } }else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
这里对于重入状态state采用了CAS更新,这里的CAS:
protectedfinal boolean compareAndSetState(int expect,int update){ return unsafe.compareAndSwapInt(this,stateOffset,expect,update); }
其中,方法四个参数从左至右依次代表:原对象(变量),在内存中的地址偏移量,期望值,更新值。只有当this对象的stateOffset地址的值符合expect值时才会返回true并且对状态更新值为update。unsafe的native的CAS方法是且必须是原子性的。
并不招人喜欢的自旋
在业务逻辑中,我们通常会在循环中使用CAS,当并发引起冲突时,进行重新调整参数并尝试直到成功赋值,但有时并不那么称心如意,高并发下,可能尝试经常会失败,产生自旋锁现象,不断尝试反而会降低效率,所以在实际使用中要根据并发量选择合适的锁。
ABA问题
若是CAS中的值被另外的线程修改后又改了回来(A-B-A),CAS算法看不到该过程,单从结果看是并没有发生修改,理所当然的直接赋值,这是可能就会因为值的更新产生业务逻辑问题(实际上产生的前提条件要更复杂一些),解决方式是添加版本戳,利用版本来判断是否发生修改(AtomicStampedReference类已经有了类似的实现)
阻塞和轮询
上面采用的CAS+自旋(轮询)方式是与阻塞-唤醒对应的锁竞争手段,因为阻塞状态需要被被动的唤醒,所以显然自旋操作在需要的锁被释放时能够更快地获得锁对象或是修改值
使用redis实现的分布式线程锁
分布式环境下也会有并发现象,这时我们并不能用锁来约束他们的并发,因为他们并没有共享的域来提供一个共享的锁,涉及到相同资源抢占的时候我们可能会使用数据库的锁来约束行为,但是不是所有的资源都能用数据库的锁来约束的,这时候我们就需要用他们能够共享的资源定义锁的标志位,最常见的当然就是redis分布式锁,而且redis本身的操作是串行的,不会引发问题,(此处利用zookeeper也是一种解决方案,在此暂且不讨论)。
大致的处理思路是:在线程想要进行同步操作之前获取redis的指定key(作为锁),若不存在则视为无锁并执行,将key赋值,视为上锁直到执行完毕删除该条数据,若存在则视为锁被占用,自旋尝试获取锁。以下是这个锁的概念工具类:
public class RedisLock { private static final String GET_RESULT = "OK"; private static final String RELEASE_RESULT = "1"; private static final String SET_IF_NOT_EXIST = "NX"; //为空才set private static final String SET_WITH_EXPIRE_TIME = "PX"; //单位毫秒 /** * 获取redis锁 * @param jedis redis客户端 * @param lockKey 锁标识 key * @param requestId 锁的持有者,加锁的请求 * @param expireTime 锁过期时间 * @return */ public static boolean getLock(Jedis jedis, String lockKey, String requestId, int expireTime){ /** * NXXX nxxx NX|XX, NX -- Only set the key if it does not already exist. * XX-- Only set the key if it already exist. * expx EX|PX, expire time units: EX = seconds; PX = milliseconds * time expire time in the units of {@param #expx} * Returns:Status code reply */ String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); if (GET_RESULT.equals(result)) { return true; } return false; } /** * 释放锁 * @param jedis * @param lockKey * @param requestId * @return */ public static boolean releaseLock(Jedis jedis, String lockKey, String requestId){ if (jedis.get(lockKey).equals(requestId)) { //校验当前锁的持有人与但概念请求是否相同 //执行在这里时,如果锁被其它请求重新获取到了,此时就不该删除了 jedis.del(lockKey); } } }
参考链接:多线程并发问题解决之redis锁