java.util.concurrency.atomic.LongAdder
是Java8新增的一个类,提供了原子累计值的方法。根据文档的描述其性能要优于AtomicLong,下面是一个简单的测试对比demo(平台:MBP):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 @State(Scope.Group) @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.NANOSECONDS) @Warmup(iterations = 3, time = 5, timeUnit = TimeUnit.SECONDS) @Measurement(iterations = 3, time = 5, timeUnit = TimeUnit.SECONDS) public class AtomicToLongAdder { private AtomicInteger atomicInteger; private LongAdder longAdder; @Setup public void setup () { atomicInteger = new AtomicInteger (); longAdder = new LongAdder (); } @Benchmark @Group("rw") @GroupThreads(20) public Integer atomicInc () { return atomicInteger.incrementAndGet(); } @Benchmark @Group("rw") @GroupThreads(1) public long atomicGet () { return atomicInteger.get(); } @Benchmark @Group("ld") @GroupThreads(20) public void longAdderInc () { longAdder.increment(); } @Benchmark @Group("ld") @GroupThreads(1) public long longAdderGet () { return longAdder.sum(); } public static void main (String[] args) throws RunnerException { Options opt = new OptionsBuilder () .include(AtomicToLongAdder.class.getSimpleName()) .forks(1 ) .build(); new Runner (opt).run(); } }
1 2 3 4 5 6 7 Benchmark Mode Cnt Score Error Units AtomicToLongAdder.ld avgt 3 91.834 ± 60.177 ns/op AtomicToLongAdder.ld:longAdderGet avgt 3 260.959 ± 508.102 ns/op AtomicToLongAdder.ld:longAdderInc avgt 3 83.378 ± 83.854 ns/op AtomicToLongAdder.rw avgt 3 328.864 ± 89.448 ns/op AtomicToLongAdder.rw:atomicGet avgt 3 132.681 ± 346.180 ns/op AtomicToLongAdder.rw:atomicInc avgt 3 338.673 ± 79.338 ns/op
这里测试时基于JDK1.8进行的,从上面结果可以,看出在多线程情况下,LongAdder的写性能要比AtomicInteger要好,但是读性能相对来说要差很多,所以LongAdder适合于一些多写少读的环境。
AtomicLong从Java8开始针对x86平台进行了优化,使用XADD替换了CAS操作,我们知道JUC下面提供的原子类都是基于Unsafe类实现的,并由Unsafe来提供CAS的能力。CAS (compare-and-swap)本质上是由现代CPU在硬件级实现的原子指令,允许进行无阻塞,多线程的数据操作同时兼顾了安全性以及效率。大部分情况下,CAS都能够提供不错的性能,但是在高竞争的情况下开销可能会成倍增长,具体的研究可以参考这篇文章 , 我们直接看下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class AtomicLong {public final long incrementAndGet () { return unsafe.getAndAddLong(this , valueOffset, 1L ) + 1L ; } } public final class Unsafe {public final long getAndAddLong (Object var1, long var2, long var4) { long var6; do { var6 = this .getLongVolatile(var1, var2); } while (!this .compareAndSwapLong(var1, var2, var6, var6 + var4)); return var6; } }
getAndAddLong方法会以volatile的语义去读需要自增的域的最新值,然后通过CAS去尝试更新,正常情况下会直接成功后返回,但是在高并发下可能会同时有很多线程同时尝试这个过程,也就是说线程A读到的最新值可能实际已经过期了,因此需要在while循环中不断的重试,造成很多不必要的开销,而xadd的相对来说会更高效一点,伪码如下,最重要的是下面这段代码是原子的,也就是说其他线程不能打断它的执行或者看到中间值,这条指令是在硬件级直接支持的:
1 2 3 4 5 function FetchAndAdd (address location, int inc) { int value := *location *location := value + inc return value }
而LongAdder的性能比上面那种还要好很多,于是就研究了一下。首先它有一个基础的值base,在发生竞争的情况下,会有一个Cell数组用于将不同线程的操作离散到不同的节点上去(会根据需要扩容,最大为CPU核数),sum()会将所有Cell数组中的value和base累加作为返回值。核心的思想就是将AtomicLong一个value的更新压力分散到多个value中去,从而降低更新热点。
1 2 3 public class LongAdder extends Striped64 implements Serializable {}
LongAdder继承自Striped64,Striped64内部维护了一个懒加载的数组以及一个额外的base实例域,数组的大小是2的N次方,使用每个线程Thread内部的哈希值访问。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 abstract class Striped64 extends Number { static final int NCPU = Runtime.getRuntime().availableProcessors(); transient volatile Cell[] cells; @sun .misc.Contended static final class Cell { volatile long value; Cell(long x) { value = x; } final boolean cas (long cmp, long val) { return UNSAFE.compareAndSwapLong(this , valueOffset, cmp, val); } private static final sun.misc.Unsafe UNSAFE; private static final long valueOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> ak = Cell.class; valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value" )); } catch (Exception e) { throw new Error (e); } } } }
数组的元素是Cell类,可以看到Cell类用Contended注解修饰,这里主要是解决false sharing(伪共享的问题),具体的可以看这篇文章伪共享(false sharing)并发编程无声的性能杀手 。 下面是LongAdder中的部分源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 public class LongAdder { public void add (long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true ; if (as == null || (m = as.length - 1 ) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null , uncontended); } } public long sum () { Cell[] as = cells; Cell a; long sum = base; if (as != null ) { for (int i = 0 ; i < as.length; ++i) { if ((a = as[i]) != null ) sum += a.value; } } return sum; } } @sun .misc.Contended static final class Cell { volatile long value; Cell(long x) { value = x; } final boolean cas (long cmp, long val) { return UNSAFE.compareAndSwapLong(this , valueOffset, cmp, val); } private static final sun.misc.Unsafe UNSAFE; private static final long valueOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> ak = Cell.class; valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value" )); } catch (Exception e) { throw new Error (e); } } } abstract class Striped64 extends Number { final void longAccumulate (long x, LongBinaryOperator fn, boolean wasUncontended) { int h; if ((h = getProbe()) == 0 ) { ThreadLocalRandom.current(); h = getProbe(); wasUncontended = true ; } boolean collide = false ; for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0 ) { if ((a = as[(n - 1 ) & h]) == null ) { if (cellsBusy == 0 ) { Cell r = new Cell (x); if (cellsBusy == 0 && casCellsBusy()) { boolean created = false ; try { Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1 ) & h] == null ) { rs[j] = r; created = true ; } } finally { cellsBusy = 0 ; } if (created) break ; continue ; } } collide = false ; } else if (!wasUncontended) wasUncontended = true ; else if (a.cas(v = a.value, ((fn == null ) ? v + x : fn.applyAsLong(v, x)))) break ; else if (n >= NCPU || cells != as) collide = false ; else if (!collide) collide = true ; else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { Cell[] rs = new Cell [n << 1 ]; for (int i = 0 ; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0 ; } collide = false ; continue ; } h = advanceProbe(h); } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false ; try { if (cells == as) { Cell[] rs = new Cell [2 ]; rs[h & 1 ] = new Cell (x); cells = rs; init = true ; } } finally { cellsBusy = 0 ; } if (init) break ; } else if (casBase(v = base, ((fn == null ) ? v + x : fn.applyAsLong(v, x)))) break ; } } final boolean casCellsBusy () { return UNSAFE.compareAndSwapInt(this , CELLSBUSY, 0 , 1 ); } static final int getProbe () { return UNSAFE.getInt(Thread.currentThread(), PROBE); } private static final sun.misc.Unsafe UNSAFE; private static final long BASE; private static final long CELLSBUSY; private static final long PROBE; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> sk = Striped64.class; BASE = UNSAFE.objectFieldOffset (sk.getDeclaredField("base" )); CELLSBUSY = UNSAFE.objectFieldOffset (sk.getDeclaredField("cellsBusy" )); Class<?> tk = Thread.class; PROBE = UNSAFE.objectFieldOffset (tk.getDeclaredField("threadLocalRandomProbe" )); } catch (Exception e) { throw new Error (e); } } }
由于Cell相对来说比较占内存,因此这里采用懒加载的方式,在无竞争的情况下直接更新base域,在第一次发生竞争的时候(CAS失败)就会创建一个大小为2的cells数组,每次扩容都是加倍,只到达到CPU核数。同时我们知道扩容数组等行为需要只能有一个线程同时执行,因此需要一个锁,这里通过CAS更新cellsBusy来实现一个简单的spin lock。 数组访问索引是通过Thread里的threadLocalRandomProbe域取模实现的,这个域是ThreadLocalRandom更新的,cells的数组大小被限制为CPU的核数,因为即使有超过核数个线程去更新,但是每个线程也只会和一个CPU绑定,更新的时候顶多会有cpu核数个线程,因此我们只需要通过hash将不同线程的更新行为离散到不同的slot即可。 我们知道线程、线程池会被关闭或销毁,这个时候可能这个线程之前占用的slot就会变成没人用的,但我们也不能清除掉,因为一般web应用都是长时间运行的,线程通常也会动态创建、销毁,很可能一段时间后又会被其他线程占用,而对于短时间运行的,例如单元测试,清除掉有啥意义呢?
总结 总的来说,LongAdder从性能上来说要远远好于AtomicLong,一般情况下是可以直接替代AtomicLong使用的,Netty也通过一个接口封装了这两个类,在Java8下直接采用LongAdder。但是AtomicLong的一系列方法不仅仅可以自增,还可以获取更新后的值,如果是例如获取一个全局唯一的ID还是采用AtomicLong会方便一点。
参考
Java并发工具类之LongAdder原理总结