java-concurrent-util

java concurrent util #

AbstractQueuedSynchronizer #

抽象队列化同步器。

特性:

  • 非共享资源获取/释放(acquire/release)
  • 共享资源获取/释放(acquireShared/releaseShared)
  • 等待队列:尝试获取共享/非共享资源失败时,加入等待队列并调用LockSupport.park方法将当前线程阻塞,释放资源后调用LockSupport.unpark方法唤醒等待队列里的下一个线程。

Semaphore #

信号量。

特性:

  • 许可(permits):可访问资源的并发量。
  • 获取许可(acquire):线程通过acquire方法获取许可,许可数量不足时线程阻塞。
  • 释放许可(release):线程释放占有的许可,以供其他线程获取。
  • 线程竞争策略:支持公平(fair)和非公平(infair)。

用法:

  • 限制并发线程数量,超过并发量的线程进入阻塞状态或进入失败流程。

CyclicBarrier #

循环屏障。

特性:

  • 屏障点(barrier):即await方法调用点,所有线程执行至此都会开始等待,只有最后一个线程执行至此才会解除屏障(唤醒所有等待线程)。
  • 屏障线程数量:此参数通过构造方法传入,是一个固定值。
  • 屏障结束任务:此任务通过构造方法传入,屏障解除时立即执行。
  • 屏障异常(BrokenBarrierException):await方法超时、线程被中断、屏障结束任务执行异常,会导致所有线程屏障点抛出此异常。
  • 可重用:屏障正常解除后立即开启下一轮屏障。

使用场景:

  • 不恰当的比方,在联机游戏中,只有当最后一个玩家准备好后,所有玩家才会同时进入对局,否则一直处于等待状态。

CountDownLatch #

倒计数锁扣。

特性:

  • 同步点:通过await方法,线程进入等待,直到计数器归零被唤醒。
  • 倒计数及通知:通过countDown方法,计数器减一,当计数器归零时,唤醒所有等待线程。
  • 初始计数值:通过构造方法传入。
  • 不可重用:计数器归零后,无法再次使用。

使用场景:

  • 并发计算与汇总:将任务分成若干可并发的子任务,每完成一个子任务计数器就减一,完成所有子任务后,才执行汇总任务。

StampedLock #

戳记锁。

特性:

  • 写锁(WriteLock):独占锁。相关方法:writeLock、unlockWrite。
  • 读锁(ReadLock):共享锁。相关方法:readLock、unlockRead。
  • 乐观读锁:在读写锁基础上,引入了乐观读锁功能,适用于读多写少的场景。相关方法:tryOptimisticRead、validate。

乐观读锁用法:

  • 先调用乐观读锁方法获取当前戳记(调用 tryOptimisticRead 方法)。
  • 读取数据。
  • 验证戳记是否过期(调用 validate 方法),若已过期,则退回到悲观读锁重新读取数据(调用 readLock、unlockRead 方法对读取操作加锁)。
  • 最后自由使用读取到的数据。

CompletableFuture #

特性:

  • 异步处理结果:CompletableFuture 可以异步处理结果、获取结果,而 Future 获取结果时会阻塞。
  • 链式调用
  • 创建异步任务:静态方法 runAsync,supplyAsync
  • 任务组合:thenApply,thenAccept
  • 异常处理:exceptionally、handle
  • 并行处理:thenCombine、allOf

使用示例:

java
// 创建异步任务
CompletableFuture<Void> f = CompletableFuture.runAsync(() -> {
    // do something
});

// 创建异步任务(有返回值)
CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> "something");

// 任务完成后返回新结果
CompletableFuture<Void> f = CompletableFuture.supplyAsync(() -> "something")
    .thenApply(result -> "result=" + result);

// 任务完成后消费结果
CompletableFuture<Void> f = CompletableFuture.supplyAsync(() -> "something")
    .thenAccept(result -> System.out.println(result));

// 任务完成后执行操作
CompletableFuture<Void> f = CompletableFuture.supplyAsync(() -> "something")
    .thenRun(() -> System.out.println("done"));

// 合并两个任务的结果
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "resultA");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "resultB");
f1.thenCombine(f2, (resultA, resultB) -> resultA + resultB);

// 一个任务的输出作为另一个任务的输入
CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> "resultA")
    .thenCompose(result -> CompletableFuture.supplyAsync(() -> result + "resultB"))

// 发生异常后返回新结果
CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> {
    if (true) throw new RuntimeException("something wrong");
    return "something";
}).exceptionally(ex -> "nothing");

// 任务完成或发生异常后进行处理
CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> {
    if (true) throw new RuntimeException("something wrong");
    return "something";
}).handle((result, ex) -> {
    if (ex != null) {
        return "nothing";
    }
    return result;
});

// 等待所有任务完成
CompletableFuture<Void> f = CompletableFuture.allOf(f1, f2, f3)
    .thenRun(() -> System.out.println("all done"));

// 等待任意一个任务完成
CompletableFuture<Void> f = CompletableFuture.anyOf(f1, f2, f3)
    .thenAccept(result -> System.out.println("any done: " + reuslt));

// 手动指定线程池:
// supplyAsync(Supplier<U>, Executor)
// runAsync(Runnable, Executor)
// thenApplyAsync(Function<T, U>, Executor)
// thenAcceptAsync(Comsumer<T>, Executor)
// thenRunAsync(Runnable, Executor)
// ...

ForkJoinPool #

特性:

  • Fork(分解):任务被递归分解为更小的子任务,直到不可再分。
  • Join(合并):子任务执行完毕后,将结果合并。
  • 工作窃取算法:空闲的工作线程会从其他繁忙线程的工作队列中窃取任务。

关键类:

  • ForkJoinPool:线程池
  • ForkJoinTask:任务基础抽象类
  • RecursiveTask/RecursiveAction:递归任务,一个有返回值,一个无返回值,都是ForkJoinTask的子类。

使用示例:

java
// 计算斐波那契数列第n项值
public class Task extends RecursiveTask<Integer> {
    private final int input;

    public Task(int input) {
        this.input = input;
    }

    @Override
    protected Integer compute() {
        if (input <= 1) {
            return 1;
        }
        Task sub1 = new Task(input - 1);
        sub1.fork();
        Task sub2 = new Task(input - 2);
        return sub2.compute() + sub1.join();
    }
}

public class Main {
    public static void main(String[] args) {
        ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool();
        int input = 10;
        int result = pool.invoke(new Task(input));
    }
}
2024年12月9日