Java中有许多工具类可以在并发场景中使用,简化并发编程难度,提高程序准确性。
CountDaowLatch 可以实现类似于fork-join模型提供的功能,在多线程场景中,用于等待其他线程完成的线程可以调用countDownLatch的await方法进入等待状态,只有当其他线程将CountDownLatch中保存的值递减到0时,等待线程才会继续运行。
使用方式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class CountDownLatchTest { private static CountDownLatch countDownLatch = new CountDownLatch(2 ); public static void main (String[]args) throws InterruptedException { Thread threadOne = new Thread(new Runnable() { @Override public void run () { System.out.println("continue" ); countDownLatch.countDown(); System.out.println("continue" ); countDownLatch.countDown(); } }); threadOne.start(); countDownLatch.await(); System.out.println("over" ); } }
运行结果总是相同,主线程在运行到await时,会进入等待状态,只有当子线程两次执行完countDown之后,主线程才会继续执行。
实现原理 查看CountDownLatch的实现,可以看到在new一个CountDownLatch时,需要一个int类型的参数:
1 2 3 4 5 public CountDownLatch (int count) { if (count < 0 ) throw new IllegalArgumentException("count < 0" ); this .sync = new Sync(count); }
Sync是CountDownLatch的一个继承了AbstractQueuedSynchronizer的内部类,用于实现同步控制,具有一些列加锁和解锁的方法。构造CountDownLatch时传入的参数最终用来设置一个被volatile修饰的属性state。这个state值可以理解了当前所有线程可重入的获得了多少锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public void countDown () { sync.releaseShared(1 ); } public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; } protected boolean tryReleaseShared (int releases) { for (;;) { int c = getState(); if (c == 0 ) return false ; int nextc = c-1 ; if (compareAndSetState(c, nextc)) return nextc == 0 ; } }
countDown会将当前的state值减1,这可以理解为释放一把锁的过程。
当主线程调用await方法之后:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public void await () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } public final void acquireSharedInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0 ) doAcquireSharedInterruptibly(arg); } protected int tryAcquireShared (int acquires) { return (getState() == 0 ) ? 1 : -1 ; }
如果当前的state没有减到0,主线程就会去执行doAcquireSharedInterruptibly,这个方法会使得主线程不断死循环的去获取“锁”,或者直到中断,直到state减少到0,主线程才能得到“锁”,解除循环,继续执行。
Semaphore 使用方式 可以用于做流量控制,限制多线程对有限资源的访问,在多并发场景下,如果资源数量有限,只能够支持有限的线程的使用,可以使用信号量。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class SemaphoreTest { private static final int THREAD_COUNT = 30 ; private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT); private static Semaphore s = new Semaphore(10 ); public static void main (String[] args) { for (int i = 0 ; i < 100 * THREAD_COUNT; i++) { threadPool.execute(new Runnable() { @Override public void run () { try { s.acquire(); printThreadCount(); s.release(); } catch (InterruptedException e) { e.printStackTrace(); } } }); } threadPool.shutdown(); } private static synchronized void printThreadCount () { System.out.println("当前可用许可证数:" +s.availablePermits() + " 等待线程数" + s.getQueueLength()); } }
实现原理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private final Sync sync;public Semaphore (int permits) { sync = new NonfairSync(permits); } Sync(int permits) { setState(permits); } protected final void setState (int newState) { state = newState; }
在初始化Semaphore时,默认构造非公平的同步器,传入参数为信号量的值。 在线程执行到acquire时,会尝试可中断的去获取信号量:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public final void acquireSharedInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0 ) doAcquireSharedInterruptibly(arg); } protected int tryAcquireShared (int acquires) { return nonfairTryAcquireShared(acquires); } final int nonfairTryAcquireShared (int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
在释放信号量以后,会通过原子操作给state值加一,如果当前state的值大于0,会在等待队列中唤醒队列首部的线程去获得信号量。