一、共享模型之内存
1. Java内存模型 - JMM
可见性 - volatile
先来看一个现象,main线程对run变量的修改对于t线程不可见,导致了t线程无法停止:
package cn.itcast.test;import lombok.extern.slf4j.Slf4j;import static cn.itcast.n2.util.Sleeper.sleep;@Slf4j(topic = "c.Test32")
public class Test32 {// 易变static boolean run = true;public static void main(String[] args) throws InterruptedException {Thread t = new Thread(()->{while(true){if(!run) {break;}}});t.start();sleep(1);run = false; // 线程t不会如预想的停下来}
}
解决方案一:volatile关键字,它可以用来修饰成员变量和静态成员变量,他可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取它的值,线程操作volatile变量都是直接操作主存。
volatile static boolean run = true;
解决方案二:synchronized
package cn.itcast.test;import lombok.extern.slf4j.Slf4j;import static cn.itcast.n2.util.Sleeper.sleep;@Slf4j(topic = "c.Test32")
public class Test32 {static boolean run = true;// 锁对象final static Object lock = new Object();public static void main(String[] args) throws InterruptedException {Thread t = new Thread(()->{while(true){synchronized (lock) {if(!run) {break;}}}});t.start();sleep(1);synchronized (lock) {run = false;}}
}
可见性 vs 原子性
有序性 - volatile
2. 终止模式之两阶段终止模式
package cn.itcast.test;import lombok.extern.slf4j.Slf4j;@Slf4j(topic = "c.TwoPhaseTermination")
public class Test13 {public static void main(String[] args) throws InterruptedException {TwoPhaseTermination tpt = new TwoPhaseTermination();tpt.start();//tpt.start();//tpt.start();Thread.sleep(3500);log.debug("停止监控");tpt.stop();}
}@Slf4j(topic = "c.TwoPhaseTermination")
class TwoPhaseTermination {// 监控线程private Thread monitorThread;// 停止标记private volatile boolean stop = false;// 判断是否执行过 start 方法private boolean starting = false;// 启动监控线程public void start() {synchronized (this) {if (starting) { // falsereturn;}starting = true;}monitorThread = new Thread(() -> {while (true) {Thread current = Thread.currentThread();// 是否被打断if (stop) {log.debug("料理后事");break;}try {Thread.sleep(1000);log.debug("执行监控记录");} catch (InterruptedException e) {e.printStackTrace();// 重新设置打断标记// current.interrupt();}}}, "monitor");monitorThread.start();}// 停止监控线程public void stop() {stop = true;monitorThread.interrupt();}
}
3. 同步模式之Balking
Balking(犹豫)模式用在一个线程发现另一个线程或本线程已经做了某一件相同的事,那么本线程就无需再做了,直接结束返回。
package cn.itcast.test;import lombok.extern.slf4j.Slf4j;@Slf4j(topic = "c.TwoPhaseTermination")
public class Test13 {public static void main(String[] args) throws InterruptedException {TwoPhaseTermination tpt = new TwoPhaseTermination();tpt.start();//tpt.start();//tpt.start();Thread.sleep(3500);log.debug("停止监控");tpt.stop();}
}@Slf4j(topic = "c.TwoPhaseTermination")
class TwoPhaseTermination {// 监控线程private Thread monitorThread;// 停止标记private volatile boolean stop = false;// 判断是否执行过 start 方法private boolean starting = false;// 启动监控线程public void start() {synchronized (this) {// Balking模式if (starting) { // falsereturn;}starting = true;}monitorThread = new Thread(() -> {while (true) {Thread current = Thread.currentThread();// 是否被打断if (stop) {log.debug("料理后事");break;}try {Thread.sleep(1000);log.debug("执行监控记录");} catch (InterruptedException e) {e.printStackTrace();// 重新设置打断标记// current.interrupt();}}}, "monitor");monitorThread.start();}// 停止监控线程public void stop() {stop = true;monitorThread.interrupt();}
}
练习题:
balking模式习题:希望doInit()方法仅被调用一次,下面的实现是否有问题,为什么?
package cn.itcast.test;import lombok.extern.slf4j.Slf4j;@Slf4j(topic = "c.Test")
public class Test {static volatile boolean intialized = false;static int y;public static void main(String[] args){init();}static void init() {if(intialized) {return;}doInit();intialized = true;}private static void doInit() {System.out.println("111");}
}
- 假设有多个线程同时调用
init()
方法。 - 如果一个线程检查了
intialized
变量的值,发现它为false
,并且在执行doInit()
的过程中,其他线程也调用了init()
方法。这可能会导致多个线程同时执行doInit()
方法,造成重复初始化。
- 虽然
volatile
可以解决可见性问题,但它并不能保证原子性。如果一个线程中执行了intialized = true
,但在doInit()
方法执行完之前,另一个线程可能进入init()
方法并执行doInit()
。
解决方案一:加锁
static void init() { synchronized (Test.class) { if (intialized) { return; } doInit(); intialized = true; }
}
解决方案二:双重检查锁定DCL
static void init() { if (intialized) { return; } synchronized (Test.class) { if (intialized) { return; } doInit(); intialized = true; }
}
解决方案三:使用java.util.concurrent包下的工具类
import java.util.concurrent.atomic.AtomicBoolean; static AtomicBoolean initialized = new AtomicBoolean(false); static void init() { if (initialized.get()) { return; } synchronized (Test.class) { if (initialized.get()) { return; } doInit(); initialized.set(true); }
}
4. double-checked locking问题
public class DCLSingleton { private static volatile DCLSingleton instance = null; private DCLSingleton() {} public static DCLSingleton getInstance() { if (instance == null) { synchronized (DCLSingleton.class) { if (instance == null) { instance = new DCLSingleton(); } } } return instance; }
}
5. happens-before
package cn.itcast.test;import cn.itcast.n2.util.Sleeper;
import lombok.extern.slf4j.Slf4j;@Slf4j(topic = "c.Test")
public class Test {static int x;public static void main(String[] args){Thread t2 = new Thread(() -> {while(true) {if(Thread.currentThread().isInterrupted()) {System.out.println(x);break;}}}, "t2");t2.start();new Thread(() -> {Sleeper.sleep(1);x = 10;t2.interrupt();}, "t1").start();while (!t2.isInterrupted()) {Thread.yield();}System.out.println(x);}
}
package cn.itcast.test;import cn.itcast.n2.util.Sleeper;
import lombok.extern.slf4j.Slf4j;@Slf4j(topic = "c.Test")
public class Test {volatile static int x;static int y;public static void main(String[] args){new Thread(() -> {y = 10;x = 20;}, "t1").start();new Thread(() -> {System.out.println(y);System.out.println(x);}, "t2").start();}
}
6. 线程安全单例习题
单例模式有很多实现方法,懒汉、饿汉、静态内部类、枚举类,试分析每种实现下获取单例对象(即调用getInstance)时的线程安全,并思考注释中的问题
- 饿汉式:类加载就会导致该单例对象被创建
- 懒汉式:类加载不会导致该单例对象被创建,而是首次使用该对象时才会创建
实现1:饿汉式
package cn.itcast.test;import java.io.Serializable;// 问题1:为什么类上加final
// 问题2:如果实现了序列化接口,还要做什么来防止反序列化破坏单例
public final class Singleton implements Serializable {// 问题3:为什么构造方法设置为私有?是否能防止反射创建新的实例?private Singleton() {}// 问题4:这样初始化是否能够保证单例对象创建时的线程安全?private static final Singleton INSTANCE = new Singleton();// 问题5:为什么提供静态方法而不是直接将INSTANCE设置为public?public static Singleton getInstance() {return INSTANCE;}
}
问题1:为什么类上加final?
将类声明为 final
是为了防止其他类继承该类,子类覆盖方法。单例模式的核心是保证一个类只有一个实例,如果允许其他类继承该类,那么子类可能会创建自己的实例,从而破坏单例的特性。因此,使用 final
关键字可以确保这一点。
问题 2:如果实现了序列化接口,还要做什么来防止反序列化破坏单例?
实现 Serializable
接口的类在反序列化时会创建新的实例,因此在单例模式中,为了防止反序列化破坏单例,通常需要实现 readResolve
方法,返回现有的实例。
private Object readResolve() { return INSTANCE;
}
这个方法会在反序列化之后被调用,从而确保返回的是同一个单例实例。
问题 3:为什么构造方法设置为私有?是否能防止反射创建新的实例?
构造方法设置为私有是为了防止外部通过 new
关键字创建新的实例。这是实现单例模式的一个基本步骤。
然而,私有构造方法并不能完全防止反射创建新的实例,反射可以绕过访问控制。如果在反射中调用了私有构造方法,确实可以创建新的实例。为了防止这一点,可以在构造方法中抛出异常,或者在 if
语句检查是否已经创建过实例。
private static boolean instanceCreated = false; private Singleton() { if (instanceCreated) { throw new IllegalStateException("Instance already created"); } instanceCreated = true;
}
问题 4:这样初始化是否能够保证单例对象创建时的线程安全?
是的,这样的初始化方式能够保证线程安全。因为 INSTANCE
是在类加载时被静态初始化的,Java 的类加载机制能够确保在多线程环境中只会初始化一次。此外,静态初始化是在类加载期间完成的,保证了构造方法不会被多次调用。
问题 5:为什么提供静态方法而不是直接将 INSTANCE
设置为 public
?
提供静态方法 getInstance()
而不是将 INSTANCE
设置为 public
主要是出于以下考虑:
-
控制访问:通过提供方法,可以控制如何访问单例,可能在将来需要添加额外的逻辑,例如懒加载、初始化等。
-
封装性:将实例隐藏在类内部提供了一层封装,增强了类的封装性和维护性。
-
灵活性:如果未来需要修改单例的获取方式(例如,可能转变为懒加载),只需在
getInstance()
方法中进行更改,而不影响外部使用代码。
实现2:枚举类
package cn.itcast.test;enum Singleton {INSTANCE;
}
问题1:枚举单例是如何限制实例个数的?
答:枚举类型在 Java 中是一个特殊的类,它的实例在类加载时就被创建,并且只能有限个实例(在这里只有一个,INSTANCE
)。Java 语言通过枚举的特性,确保每个枚举实例都是唯一的,因此通过枚举可以自动限制实例个数。
问题2:枚举单例在创建时是否有并发问题?
答:没有。Java 的枚举类型在类被加载时就会创建实例,因为枚举实例的创建由 JVM 控制,确保线程安全,不会发生并发问题。无论多少线程同时访问 INSTANCE
,它们总是获取同一个实例。
问题3:枚举单例能否被反射破坏单例?
答:不能破坏。在 Java 中,枚举实例是由 Java 虚拟机直接控制的,反射无法直接创建新的枚举实例,因为每个枚举值在 Enum 类中都被定义为 public static final
的常量。如果尝试通过反射创建新的枚举实例,JVM 会抛出 IllegalArgumentException
。
问题4:枚举单例能否被反序列化破坏单例?
答:不能被破坏。Java 的枚举在反序列化时,会确保返回已有的枚举实例,而不会创建新的实例。这是由 JVM 的处理机制所保障的。因此,枚举单例在反序列化过程中不会被破坏。
问题5:枚举单例属于懒汉式还是饿汉式?
答:枚举单例可以被视为“饿汉式”单例。这是因为枚举实例是在类加载时就被创建,而不是在首次访问时懒加载。但是,它的加载是按照 Java 的启动顺序来完成的,因此它在某种程度上结合了懒加载和饿汉式的优点,保证了线程安全且避免了反序列化和反射攻击。
问题6:枚举单例如果希望加入一些单例创建时的初始化逻辑该如何做?
答:可以使用枚举的构造函数来增加初始化逻辑。在枚举的构造函数中,可以添加任何需要的初始化代码。示例如下:
enum Singleton { INSTANCE; Singleton() { // 这里加入初始化逻辑 // 例如: System.out.println("Singleton instance is being initialized"); }
}
通过这个方式,当枚举实例 INSTANCE
被加载时,提供的初始化逻辑会被执行。
实现3:懒汉式
public final class Singleton {private Singleton() {}private static Singleton INSTANCE = null;// 分析这里的线程安全,并说明有什么缺点public static synchronized Singleton getInstance() {if (INSTANCE != null) {return INSTANCE;}INSTANCE = new Singleton();return INSTANCE;}
}
-
性能问题:由于每次调用
getInstance()
方法都需要获取锁,这在某些情况下会导致性能下降。如果getInstance()
方法被频繁调用,由于多线程访问的竞争,可能会导致线程在获取锁时产生阻塞,从而影响程序的响应速度。
实现4:双重检查模式DCL
public final class Singleton { private Singleton() {} private static volatile Singleton INSTANCE = null; public static Singleton getInstance() { if (INSTANCE == null) { synchronized (Singleton.class) { if (INSTANCE == null) { INSTANCE = new Singleton(); } } } return INSTANCE; }
}
实现5:静态内部类
public final class Singleton {private Singleton() {}// 问题1:属于懒汉式还是饿汉式private static class LazyHolder {static final Singleton INSTANCE = new Singleton();}// 问题2:在创建时是否有并发问题public static Singleton getInstance() {return LazyHolder.INSTANCE;}
}
问题 1:属于懒汉式还是饿汉式
答:该实现属于懒汉式单例。虽然 Singleton
周期的持有者 LazyHolder
是一个静态内部类,并且环境中持有其引用,但实际的 Singleton
实例(INSTANCE
)是在首次调用 getInstance()
方法时才会被创建。这种方式确保了在真正需要时才创建单例实例,从而达到懒加载的效果。
问题 2:在创建时是否有并发问题
答:没有并发问题。由于 Singleton
类使用了静态内部类 LazyHolder
,INSTANCE
的创建是由 Java 的类加载机制保证的。只有在第一次调用 getInstance()
方法时,LazyHolder
类才会被加载,而它的静态字段 INSTANCE
只有在加载时才会被初始化。这个初始化过程是线程安全的,JVM 会保证静态变量在多线程环境下得到正确的初始化。因此,这种实现方式不会有并发问题。
二、共享模型之无锁
1. CAS 与 volatile
package cn.itcast.test;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;public class TestAccount {public static void main(String[] args) {Account account = new AccountCas(10000);Account.demo(account);}
}class AccountCas implements Account {private AtomicInteger balance;public AccountCas(int balance) {this.balance = new AtomicInteger(balance);}@Overridepublic Integer getBalance() {return balance.get();}@Overridepublic void withdraw(Integer amount) {/*while(true) {// 获取余额的最新值int prev = balance.get();// 要修改的余额int next = prev - amount;// 真正修改if(balance.compareAndSet(prev, next)) {break;}}*/balance.getAndAdd(-1 * amount);}
}class AccountUnsafe implements Account {private Integer balance;public AccountUnsafe(Integer balance) {this.balance = balance;}@Overridepublic Integer getBalance() {synchronized (this) {return this.balance;}}@Overridepublic void withdraw(Integer amount) {synchronized (this) {this.balance -= amount;}}
}interface Account {// 获取余额Integer getBalance();// 取款void withdraw(Integer amount);/*** 方法内会启动 1000 个线程,每个线程做 -10 元 的操作* 如果初始余额为 10000 那么正确的结果应当是 0*/static void demo(Account account) {List<Thread> ts = new ArrayList<>();for (int i = 0; i < 1000; i++) {ts.add(new Thread(() -> {account.withdraw(10);}));}long start = System.nanoTime();ts.forEach(Thread::start);ts.forEach(t -> {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});long end = System.nanoTime();System.out.println(account.getBalance()+ " cost: " + (end-start)/1000_000 + " ms");}
}
为什么无锁效率高
CAS的特点
2. 原子整数
AtomicInteger、AtomicLong、AtomicBoolean
package cn.itcast.test;import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntUnaryOperator;public class Test34 {public static void main(String[] args) {AtomicInteger i = new AtomicInteger(5);System.out.println(i.incrementAndGet()); // ++i 6, 6System.out.println(i.getAndIncrement()); // i++ 6, 7System.out.println(i.getAndAdd(5)); // 7 , 12System.out.println(i.addAndGet(5)); // 17, 17// 读取到 设置的值i.updateAndGet(value -> value * 10); // 170System.out.println(updateAndGet(i, p -> p / 2)); // 85// i.getAndUpdate()System.out.println(i.get()); // 85}public static int updateAndGet(AtomicInteger i, IntUnaryOperator operator) {while (true) {int prev = i.get();int next = operator.applyAsInt(prev);if (i.compareAndSet(prev, next)) {return next;}}}
}
3. 原子引用
AtomicReference、AtomicMarkableReference、AtomicStampedReference
AtomicReference
package cn.itcast.test;import lombok.extern.slf4j.Slf4j;import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;@Slf4j(topic = "c.Test35")
public class Test35 {public static void main(String[] args) {DecimalAccount.demo(new DecimalAccountCas(new BigDecimal("10000")));}
}class DecimalAccountCas implements DecimalAccount {private AtomicReference<BigDecimal> balance;public DecimalAccountCas(BigDecimal balance) {this.balance = new AtomicReference<>(balance);}@Overridepublic BigDecimal getBalance() {return balance.get();}@Overridepublic void withdraw(BigDecimal amount) {while(true) {BigDecimal prev = balance.get();BigDecimal next = prev.subtract(amount);if (balance.compareAndSet(prev, next)) {break;}}}
}interface DecimalAccount {// 获取余额BigDecimal getBalance();// 取款void withdraw(BigDecimal amount);/*** 方法内会启动 1000 个线程,每个线程做 -10 元 的操作* 如果初始余额为 10000 那么正确的结果应当是 0*/static void demo(DecimalAccount account) {List<Thread> ts = new ArrayList<>();for (int i = 0; i < 1000; i++) {ts.add(new Thread(() -> {account.withdraw(BigDecimal.TEN);}));}ts.forEach(Thread::start);ts.forEach(t -> {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});System.out.println(account.getBalance());}
}
ABA问题 - AtomicStampedReference
AtomicStampedReference
package cn.itcast.test;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.atomic.AtomicStampedReference;import static cn.itcast.n2.util.Sleeper.sleep;@Slf4j(topic = "c.Test36")
public class Test36 {static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0);public static void main(String[] args) throws InterruptedException {log.debug("main start...");// 获取值 AString prev = ref.getReference();// 获取版本号int stamp = ref.getStamp();log.debug("版本 {}", stamp);// 如果中间有其它线程干扰,发生了 ABA 现象other();sleep(1);// 尝试改为 Clog.debug("change A->C {}", ref.compareAndSet(prev, "C", stamp, stamp + 1));}private static void other() {new Thread(() -> {log.debug("change A->B {}", ref.compareAndSet(ref.getReference(), "B", ref.getStamp(), ref.getStamp() + 1));log.debug("更新版本为 {}", ref.getStamp());}, "t1").start();sleep(0.5);new Thread(() -> {log.debug("change B->A {}", ref.compareAndSet(ref.getReference(), "A", ref.getStamp(), ref.getStamp() + 1));log.debug("更新版本为 {}", ref.getStamp());}, "t2").start();}
}
AtomicMarkableReference
package cn.itcast.test;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.atomic.AtomicMarkableReference;import static cn.itcast.n2.util.Sleeper.sleep;@Slf4j(topic = "c.Test38")
public class Test38 {public static void main(String[] args) throws InterruptedException {GarbageBag bag = new GarbageBag("装满了垃圾");// 参数2 mark 可以看作一个标记,表示垃圾袋满了AtomicMarkableReference<GarbageBag> ref = new AtomicMarkableReference<>(bag, true);log.debug("start...");GarbageBag prev = ref.getReference();log.debug(prev.toString());new Thread(() -> {log.debug("start...");bag.setDesc("空垃圾袋");ref.compareAndSet(bag, bag, true, false);log.debug(bag.toString());},"保洁阿姨").start();sleep(1);log.debug("想换一只新垃圾袋?");boolean success = ref.compareAndSet(prev, new GarbageBag("空垃圾袋"), true, false);log.debug("换了么?" + success);log.debug(ref.getReference().toString());}
}class GarbageBag {String desc;public GarbageBag(String desc) {this.desc = desc;}public void setDesc(String desc) {this.desc = desc;}@Overridepublic String toString() {return super.toString() + " " + desc;}
}
4. 原子数组
AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray
package cn.itcast.test;import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;public class Test39 {public static void main(String[] args) {demo(() -> new int[10],(array) -> array.length,(array, index) -> array[index]++,array -> System.out.println(Arrays.toString(array)));demo(() -> new AtomicIntegerArray(10),(array) -> array.length(),(array, index) -> array.getAndIncrement(index),array -> System.out.println(array));}/*** 参数1,提供数组、可以是线程不安全数组或线程安全数组* 参数2,获取数组长度的方法* 参数3,自增方法,回传 array, index* 参数4,打印数组的方法*/// supplier 提供者 无中生有 ()->结果// function 函数 一个参数一个结果 (参数)->结果 , BiFunction (参数1,参数2)->结果// consumer 消费者 一个参数没结果 (参数)->void, BiConsumer (参数1,参数2)->private static <T> void demo(Supplier<T> arraySupplier,Function<T, Integer> lengthFun,BiConsumer<T, Integer> putConsumer,Consumer<T> printConsumer) {List<Thread> ts = new ArrayList<>();T array = arraySupplier.get();int length = lengthFun.apply(array);for (int i = 0; i < length; i++) {// 每个线程对数组作 10000 次操作ts.add(new Thread(() -> {for (int j = 0; j < 10000; j++) {putConsumer.accept(array, j % length);}}));}ts.forEach(t -> t.start()); // 启动所有线程ts.forEach(t -> {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}}); // 等所有线程结束printConsumer.accept(array);}
}
5. 字段更新器
AtomicReferenceFieldUpdater
AtomicIntegerFieldUpdater
AtomicLongFieldUpdater
利用字段更新器,可以针对对象的某个域(Field)进行原子操作,只能配合volatile修饰的字段使用,否则会出现异常
package cn.itcast.test;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;@Slf4j(topic = "c.Test40")
public class Test40 {public static void main(String[] args) {Student stu = new Student();AtomicReferenceFieldUpdater updater =AtomicReferenceFieldUpdater.newUpdater(Student.class, String.class, "name");System.out.println(updater.compareAndSet(stu, null, "张三"));System.out.println(stu);}
}class Student {volatile String name;@Overridepublic String toString() {return "Student{" +"name='" + name + '\'' +'}';}
}
6. 原子累加器
累加器性能比较
package cn.itcast.test;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Consumer;
import java.util.function.Supplier;public class Test41 {public static void main(String[] args) {for (int i = 0; i < 5; i++) {demo(() -> new AtomicLong(0),(adder) -> adder.getAndIncrement());}for (int i = 0; i < 5; i++) {demo(() -> new LongAdder(),adder -> adder.increment());}}/*() -> 结果 提供累加器对象(参数) -> 执行累加操作*/private static <T> void demo(Supplier<T> adderSupplier, Consumer<T> action) {T adder = adderSupplier.get();List<Thread> ts = new ArrayList<>();// 4 个线程,每人累加 50 万for (int i = 0; i < 4; i++) {ts.add(new Thread(() -> {for (int j = 0; j < 500000; j++) {action.accept(adder);}}));}long start = System.nanoTime();ts.forEach(t -> t.start());ts.forEach(t -> {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});long end = System.nanoTime();System.out.println(adder + " cost:" + (end - start) / 1000_000);}
}
源码之LongAdder
cas锁
package cn.itcast.test;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.atomic.AtomicInteger;import static cn.itcast.n2.util.Sleeper.sleep;@Slf4j(topic = "c.Test42")
public class LockCas {// 0 没加锁// 1 加锁private AtomicInteger state = new AtomicInteger(0);public void lock() {while (true) {if (state.compareAndSet(0, 1)) {break;}}}public void unlock() {log.debug("unlock...");state.set(0);}public static void main(String[] args) {LockCas lock = new LockCas();new Thread(() -> {log.debug("begin1...");lock.lock();try {log.debug("lock1...");sleep(1);} finally {lock.unlock();}}).start();new Thread(() -> {log.debug("begin2...");lock.lock();try {log.debug("lock2...");} finally {lock.unlock();}}).start();}
}
原理之伪共享
public void add(long x) {Cell[] as; long b, v; int m; Cell a;if ((as = cells) != null || !casBase(b = base, b + x)) {boolean uncontended = true;if (as == null || (m = as.length - 1) < 0 ||(a = as[getProbe() & m]) == null ||!(uncontended = a.cas(v = a.value, v + x)))longAccumulate(x, null, uncontended);}}
final void longAccumulate(long x, LongBinaryOperator fn,boolean wasUncontended) {int h;if ((h = getProbe()) == 0) {ThreadLocalRandom.current(); // force initializationh = getProbe();wasUncontended = true;}boolean collide = false; // True if last slot nonemptyfor (;;) {Cell[] as; Cell a; int n; long v;if ((as = cells) != null && (n = as.length) > 0) {if ((a = as[(n - 1) & h]) == null) {if (cellsBusy == 0) { // Try to attach new CellCell r = new Cell(x); // Optimistically createif (cellsBusy == 0 && casCellsBusy()) {boolean created = false;try { // Recheck under lockCell[] rs; int m, j;if ((rs = cells) != null &&(m = rs.length) > 0 &&rs[j = (m - 1) & h] == null) {rs[j] = r;created = true;}} finally {cellsBusy = 0;}if (created)break;continue; // Slot is now non-empty}}collide = false;}else if (!wasUncontended) // CAS already known to failwasUncontended = true; // Continue after rehashelse if (a.cas(v = a.value, ((fn == null) ? v + x :fn.applyAsLong(v, x))))break;else if (n >= NCPU || cells != as)collide = false; // At max size or staleelse if (!collide)collide = true;else if (cellsBusy == 0 && casCellsBusy()) {try {if (cells == as) { // Expand table unless staleCell[] rs = new Cell[n << 1];for (int i = 0; i < n; ++i)rs[i] = as[i];cells = rs;}} finally {cellsBusy = 0;}collide = false;continue; // Retry with expanded table}h = advanceProbe(h);}else if (cellsBusy == 0 && cells == as && casCellsBusy()) {boolean init = false;try { // Initialize tableif (cells == as) {Cell[] rs = new Cell[2];rs[h & 1] = new Cell(x);cells = rs;init = true;}} finally {cellsBusy = 0;}if (init)break;}else if (casBase(v = base, ((fn == null) ? v + x :fn.applyAsLong(v, x))))break; // Fall back on using base}}
7. Unsafe
package cn.itcast.test;import lombok.Data;
import sun.misc.Unsafe;import java.lang.reflect.Field;public class TestUnsafe {public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException {Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");theUnsafe.setAccessible(true);Unsafe unsafe = (Unsafe) theUnsafe.get(null);System.out.println(unsafe);// 1. 获取域的偏移地址long idOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("id"));long nameOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("name"));Teacher t = new Teacher();// 2. 执行 cas 操作unsafe.compareAndSwapInt(t, idOffset, 0, 1);unsafe.compareAndSwapObject(t, nameOffset, null, "张三");// 3. 验证System.out.println(t);}
}
@Data
class Teacher {volatile int id;volatile String name;
}
自己实现一个AtomicInteger
package cn.itcast.n4;import sun.misc.Unsafe;import java.lang.reflect.Field;public class UnsafeAccessor {private static final Unsafe unsafe;static {try {Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");theUnsafe.setAccessible(true);unsafe = (Unsafe) theUnsafe.get(null);} catch (NoSuchFieldException | IllegalAccessException e) {throw new Error(e);}}public static Unsafe getUnsafe() {return unsafe;}
}
package cn.itcast.test;import cn.itcast.n4.UnsafeAccessor;
import lombok.extern.slf4j.Slf4j;
import sun.misc.Unsafe;@Slf4j(topic = "c.Test42")
public class Test42 {public static void main(String[] args) {Account.demo(new MyAtomicInteger(10000));}
}class MyAtomicInteger implements Account {private volatile int value;private static final long valueOffset;private static final Unsafe UNSAFE;static {UNSAFE = UnsafeAccessor.getUnsafe();try {valueOffset = UNSAFE.objectFieldOffset(MyAtomicInteger.class.getDeclaredField("value"));} catch (NoSuchFieldException e) {e.printStackTrace();throw new RuntimeException(e);}}public int getValue() {return value;}public void decrement(int amount) {while(true) {int prev = this.value;int next = prev - amount;if (UNSAFE.compareAndSwapInt(this, valueOffset, prev, next)) {break;}}}public MyAtomicInteger(int value) {this.value = value;}@Overridepublic Integer getBalance() {return getValue();}@Overridepublic void withdraw(Integer amount) {decrement(amount);}
}
三、共享模型之不可变
1. 日期转换的问题
不可变类 DateTimeFormatter 的使用:
package cn.itcast.n7;import lombok.extern.slf4j.Slf4j;import java.text.SimpleDateFormat;
import java.time.format.DateTimeFormatter;
import java.time.temporal.TemporalAccessor;@Slf4j(topic = "c.Test1")
public class Test1 {public static void main(String[] args) {DateTimeFormatter stf = DateTimeFormatter.ofPattern("yyyy-MM-dd");for (int i = 0; i < 10; i++) {new Thread(() -> {TemporalAccessor parse = stf.parse("1951-04-21");log.debug("{}", parse);}).start();}// test();}private static void test() {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");for (int i = 0; i < 10; i++) {new Thread(() -> {synchronized (sdf) {try {log.debug("{}", sdf.parse("1951-04-21"));} catch (Exception e) {log.error("{}", e);}}}).start();}}
}
2. 不可变设计
享元模式
package cn.itcast.n7;import lombok.extern.slf4j.Slf4j;import java.sql.*;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicIntegerArray;public class Test3 {public static void main(String[] args) {Pool pool = new Pool(2);for (int i = 0; i < 5; i++) {new Thread(() -> {Connection conn = pool.borrow();try {Thread.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}pool.free(conn);}).start();}}
}@Slf4j(topic = "c.Pool")
class Pool {// 1. 连接池大小private final int poolSize;// 2. 连接对象数组private Connection[] connections;// 3. 连接状态数组 0 表示空闲, 1 表示繁忙private AtomicIntegerArray states;// 4. 构造方法初始化public Pool(int poolSize) {this.poolSize = poolSize;this.connections = new Connection[poolSize];this.states = new AtomicIntegerArray(new int[poolSize]);for (int i = 0; i < poolSize; i++) {connections[i] = new MockConnection("连接" + (i+1));}}// 5. 借连接public Connection borrow() {while(true) {for (int i = 0; i < poolSize; i++) {// 获取空闲连接if(states.get(i) == 0) {if (states.compareAndSet(i, 0, 1)) {log.debug("borrow {}", connections[i]);return connections[i];}}}// 如果没有空闲连接,当前线程进入等待synchronized (this) {try {log.debug("wait...");this.wait();} catch (InterruptedException e) {e.printStackTrace();}}}}// 6. 归还连接public void free(Connection conn) {for (int i = 0; i < poolSize; i++) {if (connections[i] == conn) {states.set(i, 0);synchronized (this) {log.debug("free {}", conn);this.notifyAll();}break;}}}
}class MockConnection implements Connection {private String name;public MockConnection(String name) {this.name = name;}@Overridepublic String toString() {return "MockConnection{" +"name='" + name + '\'' +'}';}@Overridepublic Statement createStatement() throws SQLException {return null;}@Overridepublic PreparedStatement prepareStatement(String sql) throws SQLException {return null;}@Overridepublic CallableStatement prepareCall(String sql) throws SQLException {return null;}@Overridepublic String nativeSQL(String sql) throws SQLException {return null;}@Overridepublic void setAutoCommit(boolean autoCommit) throws SQLException {}@Overridepublic boolean getAutoCommit() throws SQLException {return false;}@Overridepublic void commit() throws SQLException {}@Overridepublic void rollback() throws SQLException {}@Overridepublic void close() throws SQLException {}@Overridepublic boolean isClosed() throws SQLException {return false;}@Overridepublic DatabaseMetaData getMetaData() throws SQLException {return null;}@Overridepublic void setReadOnly(boolean readOnly) throws SQLException {}@Overridepublic boolean isReadOnly() throws SQLException {return false;}@Overridepublic void setCatalog(String catalog) throws SQLException {}@Overridepublic String getCatalog() throws SQLException {return null;}@Overridepublic void setTransactionIsolation(int level) throws SQLException {}@Overridepublic int getTransactionIsolation() throws SQLException {return 0;}@Overridepublic SQLWarning getWarnings() throws SQLException {return null;}@Overridepublic void clearWarnings() throws SQLException {}@Overridepublic Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {return null;}@Overridepublic PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {return null;}@Overridepublic CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {return null;}@Overridepublic Map<String, Class<?>> getTypeMap() throws SQLException {return null;}@Overridepublic void setTypeMap(Map<String, Class<?>> map) throws SQLException {}@Overridepublic void setHoldability(int holdability) throws SQLException {}@Overridepublic int getHoldability() throws SQLException {return 0;}@Overridepublic Savepoint setSavepoint() throws SQLException {return null;}@Overridepublic Savepoint setSavepoint(String name) throws SQLException {return null;}@Overridepublic void rollback(Savepoint savepoint) throws SQLException {}@Overridepublic void releaseSavepoint(Savepoint savepoint) throws SQLException {}@Overridepublic Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {return null;}@Overridepublic PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {return null;}@Overridepublic CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {return null;}@Overridepublic PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {return null;}@Overridepublic PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {return null;}@Overridepublic PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {return null;}@Overridepublic Clob createClob() throws SQLException {return null;}@Overridepublic Blob createBlob() throws SQLException {return null;}@Overridepublic NClob createNClob() throws SQLException {return null;}@Overridepublic SQLXML createSQLXML() throws SQLException {return null;}@Overridepublic boolean isValid(int timeout) throws SQLException {return false;}@Overridepublic void setClientInfo(String name, String value) throws SQLClientInfoException {}@Overridepublic void setClientInfo(Properties properties) throws SQLClientInfoException {}@Overridepublic String getClientInfo(String name) throws SQLException {return null;}@Overridepublic Properties getClientInfo() throws SQLException {return null;}@Overridepublic Array createArrayOf(String typeName, Object[] elements) throws SQLException {return null;}@Overridepublic Struct createStruct(String typeName, Object[] attributes) throws SQLException {return null;}@Overridepublic void setSchema(String schema) throws SQLException {}@Overridepublic String getSchema() throws SQLException {return null;}@Overridepublic void abort(Executor executor) throws SQLException {}@Overridepublic void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {}@Overridepublic int getNetworkTimeout() throws SQLException {return 0;}@Overridepublic <T> T unwrap(Class<T> iface) throws SQLException {return null;}@Overridepublic boolean isWrapperFor(Class<?> iface) throws SQLException {return false;}
}
final原理
3. 无状态
四、并发工具
1. 自定义线程池
package cn.itcast.n8;import lombok.extern.slf4j.Slf4j;
import org.springframework.core.log.LogDelegateFactory;import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;@Slf4j(topic = "c.TestPool")
public class TestPool {public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(2,1000, TimeUnit.MILLISECONDS, 1, (queue, task)->{// 1. 死等
// queue.put(task);// 2) 带超时等待
// queue.offer(task, 1500, TimeUnit.MILLISECONDS);// 3) 让调用者放弃任务执行
// log.debug("放弃{}", task);// 4) 让调用者抛出异常
// throw new RuntimeException("任务执行失败 " + task);// 5) 让调用者自己执行任务task.run();});for (int i = 0; i < 15; i++) {int j = i;threadPool.execute(() -> {try {Thread.sleep(1000L);} catch (InterruptedException e) {e.printStackTrace();}log.debug("{}", j);});}}
}@FunctionalInterface // 拒绝策略
interface RejectPolicy<T> {void reject(BlockingQueue<T> queue, T task);
}@Slf4j(topic = "c.ThreadPool")
class ThreadPool {// 任务队列private BlockingQueue<Runnable> taskQueue;// 线程集合private HashSet<Worker> workers = new HashSet<>();// 核心线程数private int coreSize;// 获取任务时的超时时间private long timeout;private TimeUnit timeUnit;private RejectPolicy<Runnable> rejectPolicy;// 执行任务public void execute(Runnable task) {// 当任务数没有超过 coreSize 时,直接交给 worker 对象执行// 如果任务数超过 coreSize 时,加入任务队列暂存synchronized (workers) {if(workers.size() < coreSize) {Worker worker = new Worker(task);log.debug("新增 worker{}, {}", worker, task);workers.add(worker);worker.start();} else {
// taskQueue.put(task);// 1) 死等// 2) 带超时等待// 3) 让调用者放弃任务执行// 4) 让调用者抛出异常// 5) 让调用者自己执行任务taskQueue.tryPut(rejectPolicy, task);}}}public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity, RejectPolicy<Runnable> rejectPolicy) {this.coreSize = coreSize;this.timeout = timeout;this.timeUnit = timeUnit;this.taskQueue = new BlockingQueue<>(queueCapcity);this.rejectPolicy = rejectPolicy;}class Worker extends Thread{private Runnable task;public Worker(Runnable task) {this.task = task;}@Overridepublic void run() {// 执行任务// 1) 当 task 不为空,执行任务// 2) 当 task 执行完毕,再接着从任务队列获取任务并执行
// while(task != null || (task = taskQueue.take()) != null) {while(task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {try {log.debug("正在执行...{}", task);task.run();} catch (Exception e) {e.printStackTrace();} finally {task = null;}}synchronized (workers) {log.debug("worker 被移除{}", this);workers.remove(this);}}}
}
@Slf4j(topic = "c.BlockingQueue")
class BlockingQueue<T> {// 1. 任务队列private Deque<T> queue = new ArrayDeque<>();// 2. 锁private ReentrantLock lock = new ReentrantLock();// 3. 生产者条件变量private Condition fullWaitSet = lock.newCondition();// 4. 消费者条件变量private Condition emptyWaitSet = lock.newCondition();// 5. 容量private int capcity;public BlockingQueue(int capcity) {this.capcity = capcity;}// 带超时阻塞获取public T poll(long timeout, TimeUnit unit) {lock.lock();try {// 将 timeout 统一转换为 纳秒long nanos = unit.toNanos(timeout);while (queue.isEmpty()) {try {// 返回值是剩余时间if (nanos <= 0) {return null;}nanos = emptyWaitSet.awaitNanos(nanos);} catch (InterruptedException e) {e.printStackTrace();}}T t = queue.removeFirst();fullWaitSet.signal();return t;} finally {lock.unlock();}}// 阻塞获取public T take() {lock.lock();try {while (queue.isEmpty()) {try {emptyWaitSet.await();} catch (InterruptedException e) {e.printStackTrace();}}T t = queue.removeFirst();fullWaitSet.signal();return t;} finally {lock.unlock();}}// 阻塞添加public void put(T task) {lock.lock();try {while (queue.size() == capcity) {try {log.debug("等待加入任务队列 {} ...", task);fullWaitSet.await();} catch (InterruptedException e) {e.printStackTrace();}}log.debug("加入任务队列 {}", task);queue.addLast(task);emptyWaitSet.signal();} finally {lock.unlock();}}// 带超时时间阻塞添加public boolean offer(T task, long timeout, TimeUnit timeUnit) {lock.lock();try {long nanos = timeUnit.toNanos(timeout);while (queue.size() == capcity) {try {if(nanos <= 0) {return false;}log.debug("等待加入任务队列 {} ...", task);nanos = fullWaitSet.awaitNanos(nanos);} catch (InterruptedException e) {e.printStackTrace();}}log.debug("加入任务队列 {}", task);queue.addLast(task);emptyWaitSet.signal();return true;} finally {lock.unlock();}}// 获取大小public int size() {lock.lock();try {return queue.size();} finally {lock.unlock();}}public void tryPut(RejectPolicy<T> rejectPolicy, T task) {lock.lock();try {// 判断队列是否满if(queue.size() == capcity) {rejectPolicy.reject(this, task);} else { // 有空闲log.debug("加入任务队列 {}", task);queue.addLast(task);emptyWaitSet.signal();}} finally {lock.unlock();}}
}
2. ThreadPoolExecutor
线程池状态
构造方法
newFixedThreadPool
package cn.itcast.n8;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;@Slf4j(topic = "c.TestExecutors")
public class TestExecutors {public static void main(String[] args) throws InterruptedException {test1();}private static void test1() {ExecutorService pool = Executors.newFixedThreadPool(2, new ThreadFactory() {private AtomicInteger t = new AtomicInteger(1);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "mypool_t" + t.getAndIncrement());}});pool.execute(() -> {log.debug("1");});pool.execute(() -> {log.debug("2");});pool.execute(() -> {log.debug("3");});}
}
newCachedThreadPool
package cn.itcast.n8;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.SynchronousQueue;import static cn.itcast.n2.util.Sleeper.sleep;@Slf4j(topic = "c.TestSynchronousQueue")
public class TestSynchronousQueue {public static void main(String[] args) {SynchronousQueue<Integer> integers = new SynchronousQueue<>();new Thread(() -> {try {log.debug("putting {} ", 1);integers.put(1);log.debug("{} putted...", 1);log.debug("putting...{} ", 2);integers.put(2);log.debug("{} putted...", 2);} catch (InterruptedException e) {e.printStackTrace();}},"t1").start();sleep(1);new Thread(() -> {try {log.debug("taking {}", 1);integers.take();} catch (InterruptedException e) {e.printStackTrace();}},"t2").start();sleep(1);new Thread(() -> {try {log.debug("taking {}", 2);integers.take();} catch (InterruptedException e) {e.printStackTrace();}},"t3").start();}
}
newSingleThreadExecutor
package cn.itcast.n8;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;@Slf4j(topic = "c.TestExecutors")
public class TestExecutors {public static void main(String[] args) throws InterruptedException {test2();}public static void test2() {ExecutorService pool = Executors.newSingleThreadExecutor();pool.execute(() -> {log.debug("1");int i = 1 / 0;});pool.execute(() -> {log.debug("2");});pool.execute(() -> {log.debug("3");});}
}
提交任务
package cn.itcast.n8;import lombok.extern.slf4j.Slf4j;import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;@Slf4j(topic = "c.TestSubmit")
public class TestSubmit {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService pool = Executors.newFixedThreadPool(1);method3(pool);}private static void method3(ExecutorService pool) throws InterruptedException, ExecutionException {String result = pool.invokeAny(Arrays.asList(() -> {log.debug("begin 1");Thread.sleep(1000);log.debug("end 1");return "1";},() -> {log.debug("begin 2");Thread.sleep(500);log.debug("end 2");return "2";},() -> {log.debug("begin 3");Thread.sleep(2000);log.debug("end 3");return "3";}));log.debug("{}", result);}private static void method2(ExecutorService pool) throws InterruptedException {List<Future<String>> futures = pool.invokeAll(Arrays.asList(() -> {log.debug("begin");Thread.sleep(1000);return "1";},() -> {log.debug("begin");Thread.sleep(500);return "2";},() -> {log.debug("begin");Thread.sleep(2000);return "3";}));futures.forEach( f -> {try {log.debug("{}", f.get());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}});}private static void method1(ExecutorService pool) throws InterruptedException, ExecutionException {Future<String> future = pool.submit(() -> {log.debug("running");Thread.sleep(1000);return "ok";});log.debug("{}", future.get());}
}
关闭线程池
任务调度线程池
package cn.itcast.n8;import lombok.extern.slf4j.Slf4j;import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.*;import static cn.itcast.n2.util.Sleeper.sleep;@Slf4j(topic = "c.TestTimer")
public class TestTimer {public static void main(String[] args) throws ExecutionException, InterruptedException {/*ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);pool.schedule(() -> {try {log.debug("task1");int i = 1 / 0;} catch (Exception e) {log.error("error:", e);}}, 1, TimeUnit.SECONDS);*/ExecutorService pool = Executors.newFixedThreadPool(1);Future<?> f = pool.submit(() -> {try {log.debug("task1");int i = 1 / 0;} catch (Exception e) {log.error("error:", e);}});log.debug("result: {}", f.get());}private static void method3() {ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);log.debug("start...");// 定时执行任务pool.scheduleAtFixedRate(() -> {log.debug("running...");}, 1, 1, TimeUnit.SECONDS);/*pool.scheduleWithFixedDelay(() -> {log.debug("running...");sleep(2);}, 1, 1, TimeUnit.SECONDS);*/}// 前一个任务的异常不会影响到之后的任务private static void method2(ScheduledExecutorService pool) {pool.schedule(() -> {log.debug("task1");int i = 1 / 0;}, 1, TimeUnit.SECONDS);pool.schedule(() -> {log.debug("task2");}, 1, TimeUnit.SECONDS);}// 前一个任务的执行或异常会影响到之后的任务private static void method1() {Timer timer = new Timer();TimerTask task1 = new TimerTask() {@Overridepublic void run() {log.debug("task 1");sleep(2);}};TimerTask task2 = new TimerTask() {@Overridepublic void run() {log.debug("task 2");}};log.debug("start...");timer.schedule(task1, 1000);timer.schedule(task2, 1000);}
}
定时任务
package cn.itcast.n8;import java.time.DayOfWeek;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;public class TestSchedule {// 如何让每周四 18:00:00 定时执行任务?public static void main(String[] args) {// 获取当前时间LocalDateTime now = LocalDateTime.now();System.out.println(now);// 获取周四时间LocalDateTime time = now.withHour(18).withMinute(0).withSecond(0).withNano(0).with(DayOfWeek.THURSDAY);// 如果 当前时间 > 本周周四,必须找到下周周四if(now.compareTo(time) > 0) {time = time.plusWeeks(1);}System.out.println(time);// initailDelay 代表当前时间和周四的时间差// period 一周的间隔时间long initailDelay = Duration.between(now, time).toMillis();long period = 1000 * 60 * 60 * 24 * 7;ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);pool.scheduleAtFixedRate(() -> {System.out.println("running...");}, initailDelay, period, TimeUnit.MILLISECONDS);}
}
Tomcat线程池
3. Fork / Join
概念
package cn.itcast.n8;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;@Slf4j(topic = "c.TestForkJoin2")
public class TestForkJoin2 {public static void main(String[] args) {ForkJoinPool pool = new ForkJoinPool(4);System.out.println(pool.invoke(new MyTask(5)));// new MyTask(5) 5+ new MyTask(4) 4 + new MyTask(3) 3 + new MyTask(2) 2 + new MyTask(1)}
}// 1~n 之间整数的和
@Slf4j(topic = "c.MyTask")
class MyTask extends RecursiveTask<Integer> {private int n;public MyTask(int n) {this.n = n;}@Overridepublic String toString() {return "{" + n + '}';}@Overrideprotected Integer compute() {// 如果 n 已经为 1,可以求得结果了if (n == 1) {log.debug("join() {}", n);return n;}// 将任务进行拆分(fork)AddTask1 t1 = new AddTask1(n - 1);t1.fork();log.debug("fork() {} + {}", n, t1);// 合并(join)结果int result = n + t1.join();log.debug("join() {} + {} = {}", n, t1, result);return result;}
}
改进:二分思想
@Slf4j(topic = "c.AddTask")
class AddTask3 extends RecursiveTask<Integer> {int begin;int end;public AddTask3(int begin, int end) {this.begin = begin;this.end = end;}@Overridepublic String toString() {return "{" + begin + "," + end + '}';}@Overrideprotected Integer compute() {if (begin == end) {log.debug("join() {}", begin);return begin;}if (end - begin == 1) {log.debug("join() {} + {} = {}", begin, end, end + begin);return end + begin;}int mid = (end + begin) / 2;AddTask3 t1 = new AddTask3(begin, mid);t1.fork();AddTask3 t2 = new AddTask3(mid + 1, end);t2.fork();log.debug("fork() {} + {} = ?", t1, t2);int result = t1.join() + t2.join();log.debug("join() {} + {} = {}", t1, t2, result);return result;}
}
4. JUC
(1)AQS原理
概述
package cn.itcast.n8;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;import static cn.itcast.n2.util.Sleeper.sleep;@Slf4j(topic = "c.TestAqs")
public class TestAqs {public static void main(String[] args) {MyLock lock = new MyLock();new Thread(() -> {lock.lock();try {log.debug("locking1...");sleep(1);} finally {log.debug("unlocking1...");lock.unlock();}},"t1").start();new Thread(() -> {lock.lock();try {log.debug("locking2...");} finally {log.debug("unlocking2...");lock.unlock();}},"t2").start();}
}// 自定义锁(不可重入锁)
class MyLock implements Lock {// 独占锁 同步器类class MySync extends AbstractQueuedSynchronizer {@Overrideprotected boolean tryAcquire(int arg) {if(compareAndSetState(0, 1)) {// 加上了锁,并设置 owner 为当前线程setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}@Overrideprotected boolean tryRelease(int arg) {setExclusiveOwnerThread(null);setState(0); // 写屏障return true;}@Override // 是否持有独占锁protected boolean isHeldExclusively() {return getState() == 1;}public Condition newCondition() {return new ConditionObject();}}private MySync sync = new MySync();@Override // 加锁(不成功会进入等待队列)public void lock() {sync.acquire(1);}@Override // 加锁,可打断public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}@Override // 尝试加锁(一次)public boolean tryLock() {return sync.tryAcquire(1);}@Override // 尝试加锁,带超时public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(time));}@Override // 解锁public void unlock() {sync.release(1);}@Override // 创建条件变量public Condition newCondition() {return sync.newCondition();}
}
(2)ReentrantLock原理
非公平锁实现原理
可重入原理
可打断原理
不可打断模式
可打断模式
公平锁原理
非公平锁
公平锁
条件变量实现原理
(3)读写锁
ReentrantReadWriteLock
package cn.itcast.n8;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.locks.ReentrantReadWriteLock;import static cn.itcast.n2.util.Sleeper.sleep;@Slf4j(topic = "c.TestReadWriteLock")
public class TestReadWriteLock {public static void main(String[] args) throws InterruptedException {DataContainer dataContainer = new DataContainer();new Thread(() -> {dataContainer.read();}, "t1").start();new Thread(() -> {dataContainer.write();}, "t2").start();new Thread(() -> {dataContainer.read();}, "t3").start();}
}@Slf4j(topic = "c.DataContainer")
class DataContainer {private Object data;private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();private ReentrantReadWriteLock.ReadLock r = rw.readLock();private ReentrantReadWriteLock.WriteLock w = rw.writeLock();public Object read() {log.debug("获取读锁...");r.lock();try {log.debug("读取");sleep(1);return data;} finally {log.debug("释放读锁...");r.unlock();}}public void write() {log.debug("获取写锁...");w.lock();try {log.debug("写入");sleep(1);} finally {log.debug("释放写锁...");w.unlock();}}
}
StampedLock
- StampedLock不支持条件变量
- StampedLock不支持可重入
package cn.itcast.n8;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.locks.StampedLock;import static cn.itcast.n2.util.Sleeper.sleep;@Slf4j(topic = "c.TestStampedLock")
public class TestStampedLock {public static void main(String[] args) {DataContainerStamped dataContainer = new DataContainerStamped(1);new Thread(() -> {dataContainer.read(1);}, "t1").start();sleep(0.5);new Thread(() -> {dataContainer.write(1);}, "t2").start();}
}@Slf4j(topic = "c.DataContainerStamped")
class DataContainerStamped {private int data;private final StampedLock lock = new StampedLock();public DataContainerStamped(int data) {this.data = data;}public int read(int readTime) {long stamp = lock.tryOptimisticRead();log.debug("optimistic read locking...{}", stamp);sleep(readTime);if (lock.validate(stamp)) {log.debug("read finish...{}, data:{}", stamp, data);return data;}// 锁升级 - 读锁log.debug("updating to read lock... {}", stamp);try {stamp = lock.readLock();log.debug("read lock {}", stamp);sleep(readTime);log.debug("read finish...{}, data:{}", stamp, data);return data;} finally {log.debug("read unlock {}", stamp);lock.unlockRead(stamp);}}public void write(int newData) {long stamp = lock.writeLock();log.debug("write lock {}", stamp);try {sleep(2);this.data = newData;} finally {log.debug("write unlock {}", stamp);lock.unlockWrite(stamp);}}
}
缓存更新策略
package cn.itcast.n8;import java.util.*;
import java.util.concurrent.locks.ReentrantReadWriteLock;public class TestGenericDao {public static void main(String[] args) {GenericDao dao = new GenericDaoCached();System.out.println("============> 查询");String sql = "select * from emp where empno = ?";int empno = 7369;Emp emp = dao.queryOne(Emp.class, sql, empno);System.out.println(emp);emp = dao.queryOne(Emp.class, sql, empno);System.out.println(emp);emp = dao.queryOne(Emp.class, sql, empno);System.out.println(emp);System.out.println("============> 更新");dao.update("update emp set sal = ? where empno = ?", 800, empno);emp = dao.queryOne(Emp.class, sql, empno);System.out.println(emp);}
}class GenericDaoCached extends GenericDao {private GenericDao dao = new GenericDao();private Map<SqlPair, Object> map = new HashMap<>();private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();@Overridepublic <T> List<T> queryList(Class<T> beanClass, String sql, Object... args) {return dao.queryList(beanClass, sql, args);}@Overridepublic <T> T queryOne(Class<T> beanClass, String sql, Object... args) {// 先从缓存中找,找到直接返回SqlPair key = new SqlPair(sql, args);;rw.readLock().lock();try {T value = (T) map.get(key);if(value != null) {return value;}} finally {rw.readLock().unlock();}rw.writeLock().lock();try {// 多个线程T value = (T) map.get(key);if(value == null) {// 缓存中没有,查询数据库value = dao.queryOne(beanClass, sql, args);map.put(key, value);}return value;} finally {rw.writeLock().unlock();}}@Overridepublic int update(String sql, Object... args) {rw.writeLock().lock();try {// 先更新库int update = dao.update(sql, args);// 清空缓存map.clear();return update;} finally {rw.writeLock().unlock();}}class SqlPair {private String sql;private Object[] args;public SqlPair(String sql, Object[] args) {this.sql = sql;this.args = args;}@Overridepublic boolean equals(Object o) {if (this == o) {return true;}if (o == null || getClass() != o.getClass()) {return false;}SqlPair sqlPair = (SqlPair) o;return Objects.equals(sql, sqlPair.sql) &&Arrays.equals(args, sqlPair.args);}@Overridepublic int hashCode() {int result = Objects.hash(sql);result = 31 * result + Arrays.hashCode(args);return result;}}}
(4)Semaphore
package cn.itcast.n8;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.Semaphore;import static cn.itcast.n2.util.Sleeper.sleep;@Slf4j(topic = "c.TestSemaphore")
public class TestSemaphore {public static void main(String[] args) {// 1. 创建 semaphore 对象Semaphore semaphore = new Semaphore(3);// 2. 10个线程同时运行for (int i = 0; i < 10; i++) {new Thread(() -> {try {semaphore.acquire();} catch (InterruptedException e) {e.printStackTrace();}try {log.debug("running...");sleep(1);log.debug("end...");} finally {semaphore.release();}}).start();}}
}
package cn.itcast.n8;import lombok.extern.slf4j.Slf4j;import java.sql.*;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicIntegerArray;public class TestPoolSemaphore {public static void main(String[] args) {Pool pool = new Pool(2);for (int i = 0; i < 5; i++) {new Thread(() -> {Connection conn = pool.borrow();try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}pool.free(conn);}).start();}}
}@Slf4j(topic = "c.Pool")
class Pool {// 1. 连接池大小private final int poolSize;// 2. 连接对象数组private Connection[] connections;// 3. 连接状态数组 0 表示空闲, 1 表示繁忙private AtomicIntegerArray states;private Semaphore semaphore;// 4. 构造方法初始化public Pool(int poolSize) {this.poolSize = poolSize;// 让许可数与资源数一致this.semaphore = new Semaphore(poolSize);this.connections = new Connection[poolSize];this.states = new AtomicIntegerArray(new int[poolSize]);for (int i = 0; i < poolSize; i++) {connections[i] = new MockConnection("连接" + (i+1));}}// 5. 借连接public Connection borrow() {// t1, t2, t3// 获取许可try {semaphore.acquire(); // 没有许可的线程,在此等待} catch (InterruptedException e) {e.printStackTrace();}for (int i = 0; i < poolSize; i++) {// 获取空闲连接if(states.get(i) == 0) {if (states.compareAndSet(i, 0, 1)) {log.debug("borrow {}", connections[i]);return connections[i];}}}// 不会执行到这里return null;}// 6. 归还连接public void free(Connection conn) {for (int i = 0; i < poolSize; i++) {if (connections[i] == conn) {states.set(i, 0);log.debug("free {}", conn);semaphore.release();break;}}}
}class MockConnection implements Connection {private String name;public MockConnection(String name) {this.name = name;}@Overridepublic String toString() {return "MockConnection{" +"name='" + name + '\'' +'}';}@Overridepublic Statement createStatement() throws SQLException {return null;}@Overridepublic PreparedStatement prepareStatement(String sql) throws SQLException {return null;}@Overridepublic CallableStatement prepareCall(String sql) throws SQLException {return null;}@Overridepublic String nativeSQL(String sql) throws SQLException {return null;}@Overridepublic void setAutoCommit(boolean autoCommit) throws SQLException {}@Overridepublic boolean getAutoCommit() throws SQLException {return false;}@Overridepublic void commit() throws SQLException {}@Overridepublic void rollback() throws SQLException {}@Overridepublic void close() throws SQLException {}@Overridepublic boolean isClosed() throws SQLException {return false;}@Overridepublic DatabaseMetaData getMetaData() throws SQLException {return null;}@Overridepublic void setReadOnly(boolean readOnly) throws SQLException {}@Overridepublic boolean isReadOnly() throws SQLException {return false;}@Overridepublic void setCatalog(String catalog) throws SQLException {}@Overridepublic String getCatalog() throws SQLException {return null;}@Overridepublic void setTransactionIsolation(int level) throws SQLException {}@Overridepublic int getTransactionIsolation() throws SQLException {return 0;}@Overridepublic SQLWarning getWarnings() throws SQLException {return null;}@Overridepublic void clearWarnings() throws SQLException {}@Overridepublic Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {return null;}@Overridepublic PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {return null;}@Overridepublic CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {return null;}@Overridepublic Map<String, Class<?>> getTypeMap() throws SQLException {return null;}@Overridepublic void setTypeMap(Map<String, Class<?>> map) throws SQLException {}@Overridepublic void setHoldability(int holdability) throws SQLException {}@Overridepublic int getHoldability() throws SQLException {return 0;}@Overridepublic Savepoint setSavepoint() throws SQLException {return null;}@Overridepublic Savepoint setSavepoint(String name) throws SQLException {return null;}@Overridepublic void rollback(Savepoint savepoint) throws SQLException {}@Overridepublic void releaseSavepoint(Savepoint savepoint) throws SQLException {}@Overridepublic Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {return null;}@Overridepublic PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {return null;}@Overridepublic CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {return null;}@Overridepublic PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {return null;}@Overridepublic PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {return null;}@Overridepublic PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {return null;}@Overridepublic Clob createClob() throws SQLException {return null;}@Overridepublic Blob createBlob() throws SQLException {return null;}@Overridepublic NClob createNClob() throws SQLException {return null;}@Overridepublic SQLXML createSQLXML() throws SQLException {return null;}@Overridepublic boolean isValid(int timeout) throws SQLException {return false;}@Overridepublic void setClientInfo(String name, String value) throws SQLClientInfoException {}@Overridepublic void setClientInfo(Properties properties) throws SQLClientInfoException {}@Overridepublic String getClientInfo(String name) throws SQLException {return null;}@Overridepublic Properties getClientInfo() throws SQLException {return null;}@Overridepublic Array createArrayOf(String typeName, Object[] elements) throws SQLException {return null;}@Overridepublic Struct createStruct(String typeName, Object[] attributes) throws SQLException {return null;}@Overridepublic void setSchema(String schema) throws SQLException {}@Overridepublic String getSchema() throws SQLException {return null;}@Overridepublic void abort(Executor executor) throws SQLException {}@Overridepublic void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {}@Overridepublic int getNetworkTimeout() throws SQLException {return 0;}@Overridepublic <T> T unwrap(Class<T> iface) throws SQLException {return null;}@Overridepublic boolean isWrapperFor(Class<?> iface) throws SQLException {return false;}
}
(5)CountdownLatch
package cn.itcast.n8;import lombok.extern.slf4j.Slf4j;
import org.springframework.web.client.RestTemplate;import java.util.Arrays;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;import static cn.itcast.n2.util.Sleeper.sleep;@Slf4j(topic = "c.TestCountDownLatch")
public class TestCountDownLatch {public static void main(String[] args) throws InterruptedException, ExecutionException {test3();}private static void test5() {CountDownLatch latch = new CountDownLatch(3);ExecutorService service = Executors.newFixedThreadPool(4);service.submit(() -> {log.debug("begin...");sleep(1);latch.countDown();log.debug("end...{}", latch.getCount());});service.submit(() -> {log.debug("begin...");sleep(1.5);latch.countDown();log.debug("end...{}", latch.getCount());});service.submit(() -> {log.debug("begin...");sleep(2);latch.countDown();log.debug("end...{}", latch.getCount());});service.submit(()->{try {log.debug("waiting...");latch.await();log.debug("wait end...");} catch (InterruptedException e) {e.printStackTrace();}});}private static void test4() throws InterruptedException {CountDownLatch latch = new CountDownLatch(3);new Thread(() -> {log.debug("begin...");sleep(1);latch.countDown();log.debug("end...{}", latch.getCount());}).start();new Thread(() -> {log.debug("begin...");sleep(2);latch.countDown();log.debug("end...{}", latch.getCount());}).start();new Thread(() -> {log.debug("begin...");sleep(1.5);latch.countDown();log.debug("end...{}", latch.getCount());}).start();log.debug("waiting...");latch.await();log.debug("wait end...");}// 应用之同步等待多个远程调用结束private static void test3() throws InterruptedException, ExecutionException {RestTemplate restTemplate = new RestTemplate();log.debug("begin");ExecutorService service = Executors.newCachedThreadPool();CountDownLatch latch = new CountDownLatch(4);Future<Map<String,Object>> f1 = service.submit(() -> {Map<String, Object> response = restTemplate.getForObject("http://localhost:8080/order/{1}", Map.class, 1);return response;});Future<Map<String, Object>> f2 = service.submit(() -> {Map<String, Object> response1 = restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 1);return response1;});Future<Map<String, Object>> f3 = service.submit(() -> {Map<String, Object> response1 = restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 2);return response1;});Future<Map<String, Object>> f4 = service.submit(() -> {Map<String, Object> response3 = restTemplate.getForObject("http://localhost:8080/logistics/{1}", Map.class, 1);return response3;});System.out.println(f1.get());System.out.println(f2.get());System.out.println(f3.get());System.out.println(f4.get());log.debug("执行完毕");service.shutdown();}// 应用之同步等待多线程准备完毕private static void test2() throws InterruptedException {AtomicInteger num = new AtomicInteger(0);ExecutorService service = Executors.newFixedThreadPool(10, (r) -> {return new Thread(r, "t" + num.getAndIncrement());});CountDownLatch latch = new CountDownLatch(10);String[] all = new String[10];Random r = new Random();for (int j = 0; j < 10; j++) {int x = j;service.submit(() -> {for (int i = 0; i <= 100; i++) {try {Thread.sleep(r.nextInt(100));} catch (InterruptedException e) {}all[x] = Thread.currentThread().getName() + "(" + (i + "%") + ")";System.out.print("\r" + Arrays.toString(all));}latch.countDown();});}latch.await();System.out.println("\n游戏开始...");service.shutdown();}
}
(6)CyclicBarrier
package cn.itcast.n8;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.*;import static cn.itcast.n2.util.Sleeper.sleep;@Slf4j(topic = "c.TestCyclicBarrier")
public class TestCyclicBarrier {public static void main(String[] args) {ExecutorService service = Executors.newFixedThreadPool(3);CyclicBarrier barrier = new CyclicBarrier(2, ()-> {log.debug("task1, task2 finish...");});for (int i = 0; i < 3; i++) { // task1 task2 service.submit(() -> {log.debug("task1 begin...");sleep(1);try {barrier.await(); // 2-1=1} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}});service.submit(() -> {log.debug("task2 begin...");sleep(2);try {barrier.await(); // 1-1=0} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}});}service.shutdown();}private static void test1() {ExecutorService service = Executors.newFixedThreadPool(5);for (int i = 0; i < 3; i++) {CountDownLatch latch = new CountDownLatch(2);service.submit(() -> {log.debug("task1 start...");sleep(1);latch.countDown();});service.submit(() -> {log.debug("task2 start...");sleep(2);latch.countDown();});try {latch.await();} catch (InterruptedException e) {e.printStackTrace();}log.debug("task1 task2 finish...");}service.shutdown();}
}
(7)线程安全集合类
(8)ConcurrentHashMap
package cn.itcast.n8;import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;public class TestWordCount {public static void main(String[] args) {demo(// 创建 map 集合// 创建 ConcurrentHashMap 对不对?() -> new ConcurrentHashMap<String, LongAdder>(8,0.75f,8),(map, words) -> {for (String word : words) {// 如果缺少一个 key,则计算生成一个 value , 然后将 key value 放入 map// a 0LongAdder value = map.computeIfAbsent(word, (key) -> new LongAdder());// 执行累加value.increment(); // 2/*// 检查 key 有没有Integer counter = map.get(word);int newValue = counter == null ? 1 : counter + 1;// 没有 则 putmap.put(word, newValue);*/}});}private static void demo2() {Map<String, Integer> collect = IntStream.range(1, 27).parallel().mapToObj(idx -> readFromFile(idx)).flatMap(list -> list.stream()).collect(Collectors.groupingBy(Function.identity(), Collectors.summingInt(w -> 1)));System.out.println(collect);}private static <V> void demo(Supplier<Map<String, V>> supplier, BiConsumer<Map<String, V>, List<String>> consumer) {Map<String, V> counterMap = supplier.get();// key value// a 200// b 200List<Thread> ts = new ArrayList<>();for (int i = 1; i <= 26; i++) {int idx = i;Thread thread = new Thread(() -> {List<String> words = readFromFile(idx);consumer.accept(counterMap, words);});ts.add(thread);}ts.forEach(t -> t.start());ts.forEach(t -> {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});System.out.println(counterMap);}public static List<String> readFromFile(int i) {ArrayList<String> words = new ArrayList<>();try (BufferedReader in = new BufferedReader(new InputStreamReader(new FileInputStream("tmp/" + i + ".txt")))) {while (true) {String word = in.readLine();if (word == null) {break;}words.add(word);}return words;} catch (IOException e) {throw new RuntimeException(e);}}
}
(9)LinkedBlockingQueue
(10)ConcurrentLinkedQueue
(11)CopyOnWriteArrayList
五、异步模式之工作线程
1. 定义
2. 饥饿
package cn.itcast.n8;import lombok.extern.slf4j.Slf4j;import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;@Slf4j(topic = "c.TestDeadLock")
public class TestStarvation {static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");static Random RANDOM = new Random();static String cooking() {return MENU.get(RANDOM.nextInt(MENU.size()));}public static void main(String[] args) {// 不同的任务类型使用不同的线程池ExecutorService waiterPool = Executors.newFixedThreadPool(1);ExecutorService cookPool = Executors.newFixedThreadPool(1);waiterPool.execute(() -> {log.debug("处理点餐...");Future<String> f = cookPool.submit(() -> {log.debug("做菜");return cooking();});try {log.debug("上菜: {}", f.get());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}});waiterPool.execute(() -> {log.debug("处理点餐...");Future<String> f = cookPool.submit(() -> {log.debug("做菜");return cooking();});try {log.debug("上菜: {}", f.get());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}});}
}