CountDownLatch 允许一个或多个线程等待其他线程完成操作。
假如有这样一个需求:我们需要解析一个 Excel 里多个 sheet(表)的数据,此时可以考虑使用多线程,每个线程解析一个 sheet 里的数据,等到所有的 sheet 都解析完之后,程序需要提示解析完成。在这个需求中,要实现主线程等待所有线程完成 sheet 的解析操作,最简单的做法是使用 join() 方法。
join()方法的原理
主要两点,线程如何被阻塞,线程又是如何被唤醒
join()方法是Thread类中的,所以我们可以直接查看源码,找到join()方法,如下:
调用了另一个重载方法,参数为 0,继续看
从源码来看,实际上join方法就是调用了wait方法来使得线程阻塞,一直到线程结束运行。注意到,join方法前的synchronized修饰符,它相当于:
public final void join(long millis){ synchronized(this){ //代码块 } }
也就是说加锁的对象即调用这个锁的线程对象,在main()方法中即为parser1/parser2,持有这个锁的是主线程即main()方法,也就是说代码相当于如下:
//parser1.join()前的代码 synchronized (parser1) { // 调用者线程进入 parser1 的 waitSet 等待, 直到 parser1 运行结束 while (parser1.isAlive()) { parser1.wait(0); } } //parser1.join()后的代码
也因此主线程进入等待队列,直到 parser1 线程结束。wait 方法被调用以后,是让持有锁的线程进入等待队列,即主线程,因此 parser1 线程并不会被阻塞。
那么问题在于,这里只看到了wait方法,却并没有看到 notify 或者是 notifyAll 方法,那么主线程在那里被唤醒呢?
唤醒进程的方法位于jvm中,在线程(调用join方法的线程)结束后,会调用该方法,最后唤醒主线程。
在JDK1.5之后的并发包中提供的 CountDownLatch 也可以实现 join 的功能,并且比 join 的功能更多。
CountDownLatch 的构造函数接收一个 int 类型的参数作为计数器,如果你想等待 N 个点完成,这里就传入N。
当我们调用 CountDownLatch 的 countDown 方法时,N 就会减 1,CountDownLatch 的 await 方法会阻塞当前线程,直到 N 变成 0。由于 countDown 方法可以用在任何地方,所以这里说的 N 哥点,可以是 N 个线程,也可以是 1 个线程里的 N 个执行步骤。用在多个线程时,只需要把这个 CountDownLatch 的引用传递到线程里即可。
CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。
CyclicBarrier 默认的构造方法是 CyclicBarrier (int parties),其参数表示屏障拦截的线程数量,每个线程调用 await 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。假设parties为 3,如果第 3 个线程没有到达屏障,那么之前到达屏障的 2 个线程都不会继续执行。
CyclicBarrier 还提供了一个更高级的构造函数 CyclicBarrier (int parties, Runnable barrierAction),当所有的线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景。
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
Semaphore 可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接。假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发地读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时我们必须控制只有10个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候就可以使用Semaphore来做流量控制。
①Semaphore初始化
Semaphore semaphore=new Semaphore(2);
当调用new Semaphore(permits) 方法时,默认会创建一个非公平的锁的同步阻塞队列。把初始令牌数量赋值给同步队列的state状态,state的值就代表当前所剩余的令牌数量。
初始化完成后同步队列信息如下图:
②获取许可证
semaphore.acquire();
③释放许可证
semaphore.release();