java concurrent util #
AbstractQueuedSynchronizer #
抽象队列化同步器。
特性:
- 非共享资源获取/释放(acquire/release)
- 共享资源获取/释放(acquireShared/releaseShared)
- 等待队列:尝试获取共享/非共享资源失败时,加入等待队列并调用LockSupport.park方法将当前线程阻塞,释放资源后调用LockSupport.unpark方法唤醒等待队列里的下一个线程。
Semaphore #
信号量。
特性:
- 许可(permits):可访问资源的并发量。
- 获取许可(acquire):线程通过acquire方法获取许可,许可数量不足时线程阻塞。
- 释放许可(release):线程释放占有的许可,以供其他线程获取。
- 线程竞争策略:支持公平(fair)和非公平(infair)。
用法:
- 限制并发线程数量,超过并发量的线程进入阻塞状态或进入失败流程。
CyclicBarrier #
循环屏障。
特性:
- 屏障点(barrier):即await方法调用点,所有线程执行至此都会开始等待,只有最后一个线程执行至此才会解除屏障(唤醒所有等待线程)。
- 屏障线程数量:此参数通过构造方法传入,是一个固定值。
- 屏障结束任务:此任务通过构造方法传入,屏障解除时立即执行。
- 屏障异常(BrokenBarrierException):await方法超时、线程被中断、屏障结束任务执行异常,会导致所有线程屏障点抛出此异常。
- 可重用:屏障正常解除后立即开启下一轮屏障。
使用场景:
- 不恰当的比方,在联机游戏中,只有当最后一个玩家准备好后,所有玩家才会同时进入对局,否则一直处于等待状态。
CountDownLatch #
倒计数锁扣。
特性:
- 同步点:通过await方法,线程进入等待,直到计数器归零被唤醒。
- 倒计数及通知:通过countDown方法,计数器减一,当计数器归零时,唤醒所有等待线程。
- 初始计数值:通过构造方法传入。
- 不可重用:计数器归零后,无法再次使用。
使用场景:
- 并发计算与汇总:将任务分成若干可并发的子任务,每完成一个子任务计数器就减一,完成所有子任务后,才执行汇总任务。
StampedLock #
戳记锁。
特性:
- 写锁(WriteLock):独占锁。相关方法:writeLock、unlockWrite。
- 读锁(ReadLock):共享锁。相关方法:readLock、unlockRead。
- 乐观读锁:在读写锁基础上,引入了乐观读锁功能,适用于读多写少的场景。相关方法:tryOptimisticRead、validate。
乐观读锁用法:
- 先调用乐观读锁方法获取当前戳记(调用 tryOptimisticRead 方法)。
- 读取数据。
- 验证戳记是否过期(调用 validate 方法),若已过期,则退回到悲观读锁重新读取数据(调用 readLock、unlockRead 方法对读取操作加锁)。
- 最后自由使用读取到的数据。
CompletableFuture #
特性:
- 异步处理结果:CompletableFuture 可以异步处理结果、获取结果,而 Future 获取结果时会阻塞。
- 链式调用
- 创建异步任务:静态方法 runAsync,supplyAsync
- 任务组合:thenApply,thenAccept
- 异常处理:exceptionally、handle
- 并行处理:thenCombine、allOf
使用示例:
java
// 创建异步任务
CompletableFuture<Void> f = CompletableFuture.runAsync(() -> {
// do something
});
// 创建异步任务(有返回值)
CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> "something");
// 任务完成后返回新结果
CompletableFuture<Void> f = CompletableFuture.supplyAsync(() -> "something")
.thenApply(result -> "result=" + result);
// 任务完成后消费结果
CompletableFuture<Void> f = CompletableFuture.supplyAsync(() -> "something")
.thenAccept(result -> System.out.println(result));
// 任务完成后执行操作
CompletableFuture<Void> f = CompletableFuture.supplyAsync(() -> "something")
.thenRun(() -> System.out.println("done"));
// 合并两个任务的结果
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "resultA");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "resultB");
f1.thenCombine(f2, (resultA, resultB) -> resultA + resultB);
// 一个任务的输出作为另一个任务的输入
CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> "resultA")
.thenCompose(result -> CompletableFuture.supplyAsync(() -> result + "resultB"))
// 发生异常后返回新结果
CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> {
if (true) throw new RuntimeException("something wrong");
return "something";
}).exceptionally(ex -> "nothing");
// 任务完成或发生异常后进行处理
CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> {
if (true) throw new RuntimeException("something wrong");
return "something";
}).handle((result, ex) -> {
if (ex != null) {
return "nothing";
}
return result;
});
// 等待所有任务完成
CompletableFuture<Void> f = CompletableFuture.allOf(f1, f2, f3)
.thenRun(() -> System.out.println("all done"));
// 等待任意一个任务完成
CompletableFuture<Void> f = CompletableFuture.anyOf(f1, f2, f3)
.thenAccept(result -> System.out.println("any done: " + reuslt));
// 手动指定线程池:
// supplyAsync(Supplier<U>, Executor)
// runAsync(Runnable, Executor)
// thenApplyAsync(Function<T, U>, Executor)
// thenAcceptAsync(Comsumer<T>, Executor)
// thenRunAsync(Runnable, Executor)
// ...
ForkJoinPool #
特性:
- Fork(分解):任务被递归分解为更小的子任务,直到不可再分。
- Join(合并):子任务执行完毕后,将结果合并。
- 工作窃取算法:空闲的工作线程会从其他繁忙线程的工作队列中窃取任务。
关键类:
- ForkJoinPool:线程池
- ForkJoinTask:任务基础抽象类
- RecursiveTask/RecursiveAction:递归任务,一个有返回值,一个无返回值,都是ForkJoinTask的子类。
使用示例:
java
// 计算斐波那契数列第n项值
public class Task extends RecursiveTask<Integer> {
private final int input;
public Task(int input) {
this.input = input;
}
@Override
protected Integer compute() {
if (input <= 1) {
return 1;
}
Task sub1 = new Task(input - 1);
sub1.fork();
Task sub2 = new Task(input - 2);
return sub2.compute() + sub1.join();
}
}
public class Main {
public static void main(String[] args) {
ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool();
int input = 10;
int result = pool.invoke(new Task(input));
}
}