JUC并发编程 07——Java中的并发工具类
作者:mmseoamin日期:2023-12-21

一.等待多线程完成的 CountDownLatch

CountDownLatch 允许一个或多个线程等待其他线程完成操作。

join

假如有这样一个需求:我们需要解析一个 Excel 里多个 sheet(表)的数据,此时可以考虑使用多线程,每个线程解析一个 sheet 里的数据,等到所有的 sheet 都解析完之后,程序需要提示解析完成。在这个需求中,要实现主线程等待所有线程完成 sheet 的解析操作,最简单的做法是使用 join() 方法

JUC并发编程 07——Java中的并发工具类,第1张join()方法的原理     

主要两点,线程如何被阻塞,线程又是如何被唤醒

join()方法是Thread类中的,所以我们可以直接查看源码,找到join()方法,如下:

JUC并发编程 07——Java中的并发工具类,第2张

调用了另一个重载方法,参数为 0,继续看

JUC并发编程 07——Java中的并发工具类,第3张

从源码来看,实际上join方法就是调用了wait方法来使得线程阻塞,一直到线程结束运行。注意到,join方法前的synchronized修饰符,它相当于:

public final void join(long millis){
 synchronized(this){
        //代码块
    }
}

也就是说加锁的对象即调用这个锁的线程对象,在main()方法中即为parser1/parser2,持有这个锁的是主线程即main()方法,也就是说代码相当于如下:

//parser1.join()前的代码
synchronized (parser1) {
 // 调用者线程进入 parser1 的 waitSet 等待, 直到 parser1 运行结束
 while (parser1.isAlive()) {
   parser1.wait(0);
 }
}
//parser1.join()后的代码

也因此主线程进入等待队列,直到 parser1 线程结束。wait 方法被调用以后,是让持有锁的线程进入等待队列,即主线程,因此 parser1  线程并不会被阻塞。

那么问题在于,这里只看到了wait方法,却并没有看到 notify 或者是 notifyAll 方法,那么主线程在那里被唤醒呢?

唤醒进程的方法位于jvm中,在线程(调用join方法的线程)结束后,会调用该方法,最后唤醒主线程。

CountDownLatch 

在JDK1.5之后的并发包中提供的 CountDownLatch 也可以实现 join 的功能,并且比 join 的功能更多。

JUC并发编程 07——Java中的并发工具类,第4张

CountDownLatch 的构造函数接收一个 int 类型的参数作为计数器,如果你想等待 N 个点完成,这里就传入N。

当我们调用 CountDownLatch 的 countDown 方法时,N 就会减 1,CountDownLatch 的 await 方法会阻塞当前线程,直到 N 变成 0。由于 countDown 方法可以用在任何地方,所以这里说的 N 哥点,可以是 N 个线程,也可以是 1 个线程里的 N 个执行步骤。用在多个线程时,只需要把这个 CountDownLatch 的引用传递到线程里即可。

二.同步屏障 CyclicBarrier

CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。

CyclicBarrier 默认的构造方法是 CyclicBarrier (int parties),其参数表示屏障拦截的线程数量,每个线程调用 await 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。假设parties为 3,如果第 3 个线程没有到达屏障,那么之前到达屏障的 2 个线程都不会继续执行。

CyclicBarrier 还提供了一个更高级的构造函数 CyclicBarrier (int parties, Runnable barrierAction),当所有的线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景。

JUC并发编程 07——Java中的并发工具类,第5张

JUC并发编程 07——Java中的并发工具类,第6张

三.CyclicBarrier 和 CountDownLatch 的区别

  • CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置,可以使用多次,所以CyclicBarrier能够处理更为复杂的场景;
  • CyclicBarrier还提供了一些其他有用的方法,比如getNumberWaiting()方法可以获得CyclicBarrier阻塞的线程数量,isBroken()方法用来了解阻塞的线程是否被中断;
  • CountDownLatch允许一个或多个线程等待一组事件的产生,而CyclicBarrier用于等待其他线程运行到栅栏位置。

    四.控制并发线程数的 Semaphore

    Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

    应用场景

    Semaphore 可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接。假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发地读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时我们必须控制只有10个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候就可以使用Semaphore来做流量控制。

    JUC并发编程 07——Java中的并发工具类,第7张

    • Semaphore(int permits):表示允许permits个线程获取许可证,也就是最大并发数是permits。
    • acquire():获取一个许可证,在 获取到许可证 或者 被其他线程调用中断 之前线程一直处于阻塞状态
    • release():归还许可证
    • tryAcquire():尝试获得许可证,返回获取许可证成功或失败,不阻塞线程
    • availablePermits():返回此信号量中当前可用的许可证数量
    • getQueueLength():获取等待队列里阻塞的线程数
    • hasQueuedThreads():等待队列里是否还存在等待线程

      Semaphore实现原理

      ①Semaphore初始化

      Semaphore semaphore=new Semaphore(2);

      当调用new Semaphore(permits) 方法时,默认会创建一个非公平的锁的同步阻塞队列。把初始令牌数量赋值给同步队列的state状态,state的值就代表当前所剩余的令牌数量。

      初始化完成后同步队列信息如下图:

      JUC并发编程 07——Java中的并发工具类,第8张

      ②获取许可证

      semaphore.acquire();
      1. 当前线程会尝试去同步队列获取一个许可证,获取许可证的过程也就是使用原子的操作去修改同步队列的state,获取一个令牌则修改为state=state-1。
      2. 当计算出来的state<0,则代表令牌数量不足,此时会创建一个Node节点加入阻塞队列,挂起当前线程。
      3. 当计算出来的state>=0,则代表获取令牌成功。

      ③释放许可证

      semaphore.release();
      1. 线程会尝试释放一个许可证,释放许可证的过程也就是把同步队列的state修改为state=state+1的过程
      2. 释放许可证成功之后,同时会唤醒同步队列中的一个线程。
      3. 被唤醒的节点会重新尝试去修改state=state-1 的操作,如果state>=0则获取令牌成功,否则重新进入阻塞队列,挂起线程。