在上篇,我们利用线程池,信号量,倒计时相关类实现计数的功能,但运行结果总不能达到目标,我们将做以下改进。
1.首先附上源码,红色标注,是我们此次修改的地方
import javax.annotation.concurrent.ThreadSafe; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; @ThreadSafe public class SafeCountExample { //请求总数 private static int clientTotal = 5000; //线程数量 private static int threadTotal = 200; private static AtomicInteger count = new AtomicInteger(0); public static void main(String[] args) throws Exception{ ExecutorService exec = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); //每次固定数量的线程获取许可 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal ; i++) { exec.execute(()->{ try { semaphore.acquire(); add(); semaphore.release(); }catch (Exception e){ e.printStackTrace(); } countDownLatch.countDown(); }); } countDownLatch.await(); exec.shutdown(); System.out.println(count.get()); } private static void add(){ count.incrementAndGet(); } }2.AtomicInteger详解
2.1 AtomicInteger是一个支持原子操作的Integer类,它提供了原子自增方法、原子自减方法以及原子赋值方法等。其底层是通过volatile和CAS实现的,其中volatile保证了内存可见性,CAS算法保证了原子性。因此接下来我们先了解下volatile和CAS,然后在研究下AtomicInteger,unsafe相关的源码。
2.2 CAS CAS(Compare And Swap)即比较并交换,CAS是乐观锁技术,当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其它线程都失败,失败的线程并不会被挂起,而是被告知这次竞争中失败,并可以再次尝试。它包含三个参数:V内存值,预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。原理图如下所示:
2.3 volatile变量 volatile是一种稍弱的同步机制,用来确保将变量的更新操作通知到其他线程。当把变量声明为volatile类型后,编译器与运行时都会注意到这个变量是共享的,因此不会将该变量上的操作与其他内存操作一起重排序。volatile变量不会被缓存在寄存器或者对其他处理器不可见的地方,因此在读取volatile类型的变量时总返回最新写入的值。在访问volatile变量时不会执行加锁操作,因此也就不会使执行线程阻塞,因此volatile变量是一种比sychronized关键字更轻量级的同步机制。
2.4 源码分析
2.4.1 AtomicInteger 部分源码:
public class AtomicInteger extends Number implements java.io.Serializable { private static final Unsafe unsafe = Unsafe.getUnsafe();//调用类Unsafe private static final long valueOffset;//变量value的内存偏移量 private volatile int value;//volatile修饰的int变量value static { try { valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } public final int getAndSet(int newValue) {//设置新值并返回旧值 return unsafe.getAndSetInt(this, valueOffset, newValue); } public final boolean compareAndSet(int expect, int update) {//如果当前值为expect,则设置为update return unsafe.compareAndSwapInt(this, valueOffset, expect, update); } public final int getAndIncrement() {//当前值加1返回旧值 return unsafe.getAndAddInt(this, valueOffset, 1); } public final int getAndDecrement() {//当前值减1返回旧值 return unsafe.getAndAddInt(this, valueOffset, -1); } public final int getAndAdd(int delta) {//当前值增加delta,返回旧值 return unsafe.getAndAddInt(this, valueOffset, delta); } public final int incrementAndGet() {//当前值增加1返回新值 return unsafe.getAndAddInt(this, valueOffset, 1) + 1; } public final int decrementAndGet() {//当前值减1,返回新值 return unsafe.getAndAddInt(this, valueOffset, -1) - 1; } }2.4.1 unsafe部分源码 :
public final class Unsafe { private static final Unsafe theUnsafe; public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do //变量var5 调用底层的方法获取底层当前的值 var5 = this.getIntVolatile(var1, var2); //变量var1位count计数对象,变量var2当前值,变量为var4增加量 } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; } public final long getAndAddLong(Object var1, long var2, long var4) { long var6; do { var6 = this.getLongVolatile(var1, var2); } while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4)); return var6; } public final int getAndSetInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var4)); return var5; } public final long getAndSetLong(Object var1, long var2, long var4) { long var6; do { var6 = this.getLongVolatile(var1, var2); } while(!this.compareAndSwapLong(var1, var2, var6, var4)); return var6; } public final Object getAndSetObject(Object var1, long var2, Object var4) { Object var5; do { var5 = this.getObjectVolatile(var1, var2); } while(!this.compareAndSwapObject(var1, var2, var5, var4)); return var5; } }分析:我们着重关注compareAndSwapInt(var1, var2, var5, var4),这个方法,我们看到这是一个底层的实现方法
//变量var5 调用底层的方法获取底层当前的值
//变量var1位count计数对象,变量var2当前值,变量为var4增加量
每次传入的count对象var1时,将底层的值var5与当前的值var2进行比较
i. 若相同就将底层的值更新为var5+var4
ii. 若不相同,则循环从底层获取最新的值,原因:在多线程的前提下,底层的值,可能已被别的线程修改。
因此:这样就保证变量的值为最新。
