原子引用
JUC 并发包提供了:
- AtomicReference
- AtomicMarkableReference
- AtomicStampedReference
AtomicReference 使用举例
public interface DecimalAccount {// 获取余额BigDecimal getBalance();// 取款void withdraw(BigDecimal amount);/*** 方法内会启动 1000 个线程,每个线程做 -10 元 的操作* 如果初始余额为 10000 那么正确的结果应当是 0*/static void demo(DecimalAccount account) {List<Thread> ts = new ArrayList<>();for (int i = 0; i < 1000; i++) {ts.add(new Thread(() -> {account.withdraw(BigDecimal.TEN);}));}ts.forEach(Thread::start);ts.forEach(t -> {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});System.out.println(account.getBalance());}
}
如何提供 DecimalAccount 实现,实现安全的取款操作
不安全实现
class DecimalAccountUnsafe implements DecimalAccount {BigDecimal balance;public DecimalAccountUnsafe(BigDecimal balance) {this.balance = balance;}@Overridepublic BigDecimal getBalance() {return balance;}@Overridepublic void withdraw(BigDecimal amount) {BigDecimal balance = this.getBalance();this.balance = balance.subtract(amount);}
}
安全实现-使用synchronized
class DecimalAccountSafeLock implements DecimalAccount {private final Object lock = new Object();BigDecimal balance;public DecimalAccountSafeLock(BigDecimal balance) {this.balance = balance;}@Overridepublic BigDecimal getBalance() {return balance;}@Overridepublic void withdraw(BigDecimal amount) {synchronized (lock) {BigDecimal balance = this.getBalance();this.balance = balance.subtract(amount);}}
}
安全实现-使用CAS
class DecimalAccountSafeCas implements DecimalAccount {AtomicReference<BigDecimal> ref;public DecimalAccountSafeCas(BigDecimal balance) {ref = new AtomicReference<>(balance);}@Overridepublic BigDecimal getBalance() {return ref.get();}@Overridepublic void withdraw(BigDecimal amount) {while (true) {BigDecimal prev = ref.get();BigDecimal next = prev.subtract(amount);if (ref.compareAndSet(prev, next)) {break;}}}
}
测试代码
DecimalAccount.demo(new DecimalAccountUnsafe(new BigDecimal("10000")));
DecimalAccount.demo(new DecimalAccountSafeLock(new BigDecimal("10000")));
DecimalAccount.demo(new DecimalAccountSafeCas(new BigDecimal("10000")));
运行结果
4310 cost: 425 ms
0 cost: 285 ms
0 cost: 274 ms
AtomicStampedReference 使用举例
使用 AtomicReference 时线程仅能判断出共享变量的值与最初值 A 是否相同,不能感知到从 A 改为 B 又改回 A 的情况,如果主线程 希望: 只要有其它线程【动过了】共享变量,那么自己的 cas 就算失败,这时,仅比较值是不够的,需要再加一个版本号,使用 AtomicStampedReference 可以实现
static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0);public static void main(String[] args) throws InterruptedException {log.debug("main start...");// 获取值 AString prev = ref.getReference();// 获取版本号int stamp = ref.getStamp();log.debug("版本 {}", stamp);// 如果中间有其它线程干扰,发生了 ABA 现象other();sleep(1);// 尝试改为 Clog.debug("change A->C {}", ref.compareAndSet(prev, "C", stamp, stamp + 1));
}private static void other() {new Thread(() -> {log.debug("change A->B {}", ref.compareAndSet(ref.getReference(), "B", ref.getStamp(), ref.getStamp() + 1));log.debug("更新版本为 {}", ref.getStamp());}, "t1").start();sleep(0.5);new Thread(() -> {log.debug("change B->A {}", ref.compareAndSet(ref.getReference(), "A", ref.getStamp(), ref.getStamp() + 1));log.debug("更新版本为 {}", ref.getStamp());}, "t2").start();
}
输出如下,最后因为版本号不一致没有修改成功
15:21:34.891 c.Test36 [main] - main start...
15:21:34.894 c.Test36 [main] - 版本 0
15:21:34.956 c.Test36 [t1] - change A->B true
15:21:34.956 c.Test36 [t1] - 更新版本为 1
15:21:35.457 c.Test36 [t2] - change B->A true
15:21:35.457 c.Test36 [t2] - 更新版本为 2
15:21:36.457 c.Test36 [main] - change A->C false
AtomicMarkableReference 使用举例
AtomicStampedReference 可以给原子引用加上版本号,追踪原子引用整个的变化过程,如: A -> B -> A -> C ,通过AtomicStampedReference,我们可以知道,引用变量中途被更改了几次。但是有时候,并不关心引用变量更改了几次,只是单纯的关心是否更改过,所以就有了 AtomicMarkableReference。
class GarbageBag {String desc;public GarbageBag(String desc) {this.desc = desc;}public void setDesc(String desc) {this.desc = desc;}@Overridepublic String toString() {return super.toString() + " " + desc;}
}
@Slf4j
public class TestABAAtomicMarkableReference {public static void main(String[] args) throws InterruptedException {GarbageBag bag = new GarbageBag("装满了垃圾");// 参数2 mark 可以看作一个标记,表示垃圾袋满了AtomicMarkableReference<GarbageBag> ref = new AtomicMarkableReference<>(bag, true);log.debug("主线程 start...");GarbageBag prev = ref.getReference();log.debug(prev.toString());new Thread(() -> {log.debug("打扫卫生的线程 start...");bag.setDesc("空垃圾袋");while (!ref.compareAndSet(bag, bag, true, false)) {}log.debug(bag.toString());}).start();Thread.sleep(1000);log.debug("主线程想换一只新垃圾袋?");boolean success = ref.compareAndSet(prev, new GarbageBag("空垃圾袋"), true, false);log.debug("换了么?" + success);log.debug(ref.getReference().toString());}
}
输出
15:30:09.264 [main] 主线程 start...
15:30:09.270 [main] cn.itcast.GarbageBag@5f0fd5a0 装满了垃圾
15:30:09.293 [Thread-1] 打扫卫生的线程 start...
15:30:09.294 [Thread-1] cn.itcast.GarbageBag@5f0fd5a0 空垃圾袋
15:30:10.294 [main] 主线程想换一只新垃圾袋?
15:30:10.294 [main] 换了么?false
15:30:10.294 [main] cn.itcast.GarbageBag@5f0fd5a0 空垃圾袋
原子数组
- AtomicIntegerArray
- AtomicLongArray
- AtomicReferenceArray
AtomicIntegerArray 使用举例
有如下方法
/**参数1,提供数组、可以是线程不安全数组或线程安全数组参数2,获取数组长度的方法参数3,自增方法,回传 array, index参数4,打印数组的方法
*/
// supplier 提供者 无中生有 ()->结果
// function 函数 一个参数一个结果 (参数)->结果 , BiFunction (参数1,参数2)->结果
// consumer 消费者 一个参数没结果 (参数)->void, BiConsumer (参数1,参数2)-> void
private static <T> void demo(Supplier<T> arraySupplier,Function<T, Integer> lengthFun,BiConsumer<T, Integer> putConsumer,Consumer<T> printConsumer ) {List<Thread> ts = new ArrayList<>();T array = arraySupplier.get();int length = lengthFun.apply(array);for (int i = 0; i < length; i++) {// 每个线程对数组作 10000 次操作ts.add(new Thread(() -> {for (int j = 0; j < 10000; j++) {putConsumer.accept(array, j%length);}}));}ts.forEach(t -> t.start()); // 启动所有线程ts.forEach(t -> {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}}); // 等所有线程结束printConsumer.accept(array);
}
不安全的数组
demo(()->new int[10],(array)->array.length,(array, index) -> array[index]++,array-> System.out.println(Arrays.toString(array))
);
结果
[9870, 9862, 9774, 9697, 9683, 9678, 9679, 9668, 9680, 9698]
安全的数组
demo(()-> new AtomicIntegerArray(10),(array) -> array.length(),(array, index) -> array.getAndIncrement(index),array -> System.out.println(array)
);
结果
[10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000]
字段更新器
- AtomicReferenceFieldUpdater // 域 字段
- AtomicIntegerFieldUpdater
- AtomicLongFieldUpdater
利用字段更新器,可以针对对象的某个域(Field)进行原子操作,只能配合 volatile 修饰的字段使用,否则会出现 IllegalArgumentException 异常
AtomicReferenceFieldUpdater 使用举例
public class Test {@Testpublic void test() {A a = new A();a.setNum(0);AtomicReferenceFieldUpdater<A, Integer> num = AtomicReferenceFieldUpdater.newUpdater(A.class, Integer.class, "num");List<Thread> ts = new ArrayList<>();for (int i = 0; i < 20; i++) {// 每个线程对数组作 10000 次操作ts.add(new Thread(() -> {for (int j = 0; j < 10000; j++) {while (true) {Integer prev = num.get(a);Integer next = prev + 1;if (num.compareAndSet(a, prev, next)) {break;}}}}));}ts.forEach(Thread::start);ts.forEach(t -> {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});System.out.println(num.get(a));}
}@Data
class A{volatile Integer num;
}
结果
200000
执行对比:
public class Test {@Testpublic void test() {A a = new A();a.setNum(new AtomicInteger(0));List<Thread> ts = new ArrayList<>();for (int i = 0; i < 20; i++) {// 每个线程对数组作 10000 次操作ts.add(new Thread(() -> {for (int j = 0; j < 1000; j++) {a.getNum().addAndGet(1);}}));}ts.forEach(Thread::start);ts.forEach(t -> {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});System.out.println(a.getNum().get());}
}@Data
class A {AtomicInteger num;
}
public class Test {@Testpublic void test() {A a = new A();a.setNum(new AtomicReference<>(0));List<Thread> ts = new ArrayList<>();for (int i = 0; i < 20; i++) {// 每个线程对数组作 10000 次操作ts.add(new Thread(() -> {for (int j = 0; j < 10000; j++) {while (true) {Integer prev = a.getNum().get();Integer next = prev + 1;if (a.getNum().compareAndSet(prev, next)) {break;}}}}));}ts.forEach(Thread::start);ts.forEach(t -> {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});System.out.println(a.getNum().get());}
}@Data
class A {AtomicReference<Integer> num;
}
AI 解释:
在第三个示例中,使用的是AtomicReference<Integer>,并且在每次递增时,需要显式地进行一个循环,在这个循环中不断尝试使用compareAndSet方法来安全地更新共享变量。在高并发的情况下,多个线程可能会竞争更新同一个变量,这可能导致多次失败的compareAndSet尝试,因而增加了循环的迭代次数。在第二个示例中,使用的是AtomicInteger,这是一个针对整数值的原子类。在这个示例中,使用了addAndGet方法,它是一个原子操作,无需显式循环来尝试更新。addAndGet方法会自动处理并发更新的问题,使得代码更加简洁,并且在性能上可能更优,因为它减少了失败更新尝试的可能性。在效率和性能方面,第二种方法(使用AtomicInteger和addAndGet)通常比第一种方法(使用AtomicReference<Integer>和显式循环)更优,原因包括:更少的重试次数:AtomicInteger的addAndGet方法通常会有更少的重试次数,因为它是专为数值运算设计的原子操作。
原子操作的优化:AtomicInteger的方法可能会更好地利用底层硬件的原子性支持,如使用CPU的原子指令集(比如compareAndSwap),从而提高操作的效率。
更简洁的代码:AtomicInteger提供了直接的方法来进行原子性数值操作,使得代码更简洁、更易于理解和维护。但是,实际性能的差异还是需要通过具体的性能测试来确定,因为它可能受到多种因素的影响,包括JVM实现、底层硬件、操作系统以及其他并发任务的特性。
--------------------------------------------------------------------------------------------
这第一段代码使用了AtomicReferenceFieldUpdater来以一种线程安全的方式更新A类的num字段。与前面的例子相比,这里有几个关键点需要注意:AtomicReferenceFieldUpdater的使用:AtomicReferenceFieldUpdater是一个原子更新器,它允许你以线程安全的方式更新指定类的指定volatile字段。这是一个高级工具,通常用于在不直接使用原子类(例如AtomicInteger)时优化内存使用和性能,因为它可以在不为每个字段创建额外对象的情况下实现字段的原子操作。字段定义:num字段定义为volatile类型的Integer,这意味着其修改对其他线程立即可见,但请注意volatile并不能保证复合(多步)操作的原子性。性能:尽管使用了AtomicReferenceFieldUpdater,但每次递增操作仍然需要显式的循环来尝试更新,类似于第三个示例中的AtomicReference<Integer>。这意味着高并发下可能会有多次失败的compareAndSet尝试,这可能导致性能上的开销。在性能和效率方面,尽管AtomicReferenceFieldUpdater提供了一种更轻量级的原子操作方式,这段代码的性能可能与第三个示例(使用AtomicReference<Integer>)相似,因为它也依赖于显式的重试循环来成功更新值。虽然AtomicReferenceFieldUpdater可能会比为每个变量创建一个AtomicReference对象更节省内存,但由于有失败重试的可能性,其性能可能不如第二个示例(使用AtomicInteger和addAndGet方法)。需要注意的是,AtomicReferenceFieldUpdater要求字段必须是volatile类型的,而且对于基本数据类型,比如int,应该使用相应的AtomicIntegerFieldUpdater。在此代码示例中,可能需要将Integer字段替换为int类型,并使用AtomicIntegerFieldUpdater,因为AtomicReferenceFieldUpdater通常是用于引用类型的字段。如果保持为Integer类型,可能会产生额外的装箱和拆箱操作,这将进一步影响性能。
原子累加器
LongAdder 使用举例
private static <T> void demo(Supplier<T> adderSupplier, Consumer<T> action) {T adder = adderSupplier.get();List<Thread> ts = new ArrayList<>();// 4 个线程,每人累加 50 万for (int i = 0; i < 40; i++) {ts.add(new Thread(() -> {for (int j = 0; j < 500000; j++) {action.accept(adder);}}));}ts.forEach(t -> t.start());ts.forEach(t -> {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});long end = System.nanoTime();System.out.println(adder + " cost:" + (end - start)/1000_000);
}
比较 AtomicLong 与 LongAdder
for (int i = 0; i < 5; i++) {demo(() -> new LongAdder(), adder -> adder.increment());
}for (int i = 0; i < 5; i++) {demo(() -> new AtomicLong(), adder -> adder.getAndIncrement());
}
测试结果:LongAdder 的累加性能更优,因为 LongAdder 在有竞争时,设置多个累加单元,Therad-0 累加 Cell[0],而 Thread-1 累加 Cell[1]… 总Cell个数不超过CPU核心数,最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能
Unsafe类
Unsafe 对象提供了非常底层的,操作内存、线程的方法,Unsafe 对象不能直接调用,只能通过反射获得,前面的原子整数、原子引用这些类的底层都是调用 Unsafe,Unsafe 底层会调用 CPU 的 lock 打头的指令之类
获得 Unsafe 对象的代码:
public class UnsafeAccessor {static Unsafe unsafe;static {try { Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe 是private的");// 因为 theUnsafe 是 private 的theUnsafe.setAccessible(true);// 因为 theUnsafe 是静态的,从属于类,不属于对象,所以传null即可获得字段值unsafe = (Unsafe) theUnsafe.get(null);} catch (NoSuchFieldException | IllegalAccessException e) {throw new Error(e);}}static Unsafe getUnsafe() {return unsafe;}
}
使用 Unsafe 对象实现 CAS 操作
@Data
class Student {volatile int id;volatile String name;
}
Unsafe unsafe = UnsafeAccessor.getUnsafe();Field id = Student.class.getDeclaredField("id");
Field name = Student.class.getDeclaredField("name");// 获得成员变量的偏移量
long idOffset = UnsafeAccessor.unsafe.objectFieldOffset(id);
long nameOffset = UnsafeAccessor.unsafe.objectFieldOffset(name);Student student = new Student();
// 使用 cas 方法替换成员变量的值
UnsafeAccessor.unsafe.compareAndSwapInt(student, idOffset, 0, 20); // 返回 true
UnsafeAccessor.unsafe.compareAndSwapObject(student, nameOffset, null, "张三"); // 返回 true
System.out.println(student);
输出
Student(id=20, name=张三)
如何自己实现线程安全的整数自减操作?
class MyAtomicInteger {private volatile int num;static final Unsafe unsafe;static final long DATA_OFFSET;static {unsafe = UnsafeAccessor.getUnsafe();try {// num 属性在 DataContainer 对象中的偏移量,用于 Unsafe 直接访问该属性DATA_OFFSET = unsafe.objectFieldOffset(AtomicData.class.getDeclaredField("num"));} catch (NoSuchFieldException e) {throw new Error(e);}}public MyAtomicInteger(int num) {this.num = num;}public void decrease(int amount) {while(true) {int prev = this.num;int next = prev - amount;// cas 尝试修改 num 为 旧值 + amount,如果期间旧值被别的线程改了,返回 falseif (unsafe.compareAndSwapInt(this, DATA_OFFSET, prev, next)) {return;}}}public int getNum() {return num;}
}