java.util.concurrent(J.U.C)大大提高了并发性能,AQS 被认为是 J.U.C 的核心。
CountdownLatch
用来控制一个线程等待多个线程。
维护了一个计数器 cnt,每次调用 countDown() 方法会让计数器的值减 1,减到 0 的时候,那些因为调用 await() 方法而在等待的线程就会被唤醒。
public class CountdownLatchExample { public static void main(String[] args) throws InterruptedException { final int totalThread = 10; CountDownLatch countDownLatch = new CountDownLatch(totalThread); ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < totalThread; i++) { executorService.execute(() -> { System.out.print("run.."); countDownLatch.countDown(); }); } countDownLatch.await(); System.out.println("end"); executorService.shutdown(); }}run..run..run..run..run..run..run..run..run..run..end复制代码
CyclicBarrier
用来控制多个线程互相等待,只有当多个线程都到达时,这些线程才会继续执行。
和 CountdownLatch 相似,都是通过维护计数器来实现的。线程执行 await() 方法之后计数器会减 1,并进行等待,直到计数器为 0,所有调用 await() 方法而在等待的线程才能继续执行。
CyclicBarrier 和 CountdownLatch 的一个区别是,CyclicBarrier 的计数器通过调用 reset() 方法可以循环使用,所以它才叫做循环屏障。
CyclicBarrier 有两个构造函数,其中 parties 指示计数器的初始值,barrierAction 在所有线程都到达屏障的时候会执行一次。
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction;}public CyclicBarrier(int parties) { this(parties, null);}复制代码
public class CyclicBarrierExample { public static void main(String[] args) { final int totalThread = 10; CyclicBarrier cyclicBarrier = new CyclicBarrier(totalThread); ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < totalThread; i++) { executorService.execute(() -> { System.out.print("before.."); try { cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.print("after.."); }); } executorService.shutdown(); }}before..before..before..before..before..before..before..before..before..before..after..after..after..after..after..after..after..after..after..after..复制代码
Semaphore
Semaphore 类似于操作系统中的信号量,可以控制对互斥资源的访问线程数。
以下代码模拟了对某个服务的并发请求,每次只能有 3 个客户端同时访问,请求总数为 10。
public class SemaphoreExample { public static void main(String[] args) { final int clientCount = 3; final int totalRequestCount = 10; Semaphore semaphore = new Semaphore(clientCount); ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < totalRequestCount; i++) { executorService.execute(()->{ try { semaphore.acquire(); System.out.print(semaphore.availablePermits() + " "); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); } }); } executorService.shutdown(); }}复制代码
2 1 2 2 2 2 2 1 2 2复制代码