在多线程编程中,同步工具是确保线程之间协同工作的重要组成部分。
CyclicBarrier(循环屏障)是Java中的一个强大的同步工具,它允许一组线程在达到某个共同点之前互相等待。
在本文中,我们将深入探讨CyclicBarrier的源码实现以及提供一些示例,以帮助您更好地理解和应用这个有趣的同步工具。
public class CyclicBarrier { private final ReentrantLock lock = new ReentrantLock(); private final Condition trip = lock.newCondition(); private final int parties; private int count; private final Runnable barrierCommand; }
await方法
public int await() throws InterruptedException, BrokenBarrierException { try { lock.lock(); if (Thread.interrupted()) throw new InterruptedException(); int index = --count; if (index == 0) { // 如果是最后一个到达的线程 boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; return 0; } finally { if (!ranAction) breakBarrier(); // 执行失败,重置屏障状态 } } while (index > 0) { try { trip.await(); } catch (InterruptedException ie) { if (index == 1 && !broken) breakBarrier(); throw ie; } } if (broken) throw new BrokenBarrierException(); return index; } finally { lock.unlock(); } }
上述代码主要完成以下几个任务:
当一个大任务可以分解为多个子任务,每个子任务独立执行,但在某个点上需要等待所有子任务完成后再继续执行父任务。CyclicBarrier可以用来同步这些子任务的执行,确保它们在特定的屏障点上等待,然后一起继续执行。
假设我们有一个大型的数据处理任务,需要将数据分解为若干子任务并行处理,然后在所有子任务完成后进行结果的合并。CyclicBarrier 可以用来同步子任务的执行,确保在所有子任务都完成后再进行合并操作。
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class TaskDecompositionAndMergeExample { private static final int NUM_SUBTASKS = 3; private static final CyclicBarrier barrier = new CyclicBarrier(NUM_SUBTASKS, () -> { System.out.println("All subtasks have been completed. Merging results..."); }); public static void main(String[] args) { for (int i = 0; i < NUM_SUBTASKS; i++) { final int subtaskId = i; new Thread(() -> { // Perform individual subtask System.out.println("Subtask " + subtaskId + " is processing."); // Simulate some computation for the subtask try { Thread.sleep((long) (Math.random() * 1000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Subtask " + subtaskId + " has completed."); try { // Wait for all subtasks to complete barrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }).start(); } } }
在并行计算中,当多个计算节点完成局部计算后,需要将它们的结果合并。CyclicBarrier可以用来等待所有计算节点完成局部计算,然后执行合并操作。
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class ParallelComputingExample { private static final int NUM_THREADS = 4; private static final CyclicBarrier barrier = new CyclicBarrier(NUM_THREADS, () -> { System.out.println("All threads have completed the computation. Merging results..."); }); public static void main(String[] args) { for (int i = 0; i < NUM_THREADS; i++) { final int threadId = i; new Thread(() -> { // Perform individual computation System.out.println("Thread " + threadId + " is performing computation."); // Simulate some computation for the thread try { Thread.sleep((long) (Math.random() * 1000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Thread " + threadId + " has completed computation."); try { // Wait for all threads to complete computation barrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }).start(); } } }
在多线程游戏开发中,可能存在多个线程分别负责不同的任务,比如渲染、物理模拟、AI计算等。
在每一帧结束时,这些线程需要同步,确保下一帧开始时所有任务都已完成。CyclicBarrier可以在每一帧结束时等待所有任务完成,然后统一开始下一帧的计算。
比如我们在打匹配游戏的时候,十个人必须全部加载到100%,才可以开局。否则只要有一个人没有加载到100%,那这个游戏就不能开始。先加载完成的玩家必须等待最后一个玩家加载成功才可以。
public class CyclicBarrierDemo { private static CyclicBarrier cyclicBarrier; static class CyclicBarrierThread extends Thread{ @Override public void run() { System.out.println("玩家 " + Thread.currentThread().getName() + " 加载100%"); //等待 try { cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args){ cyclicBarrier = new CyclicBarrier(10, new Runnable() { public void run() { System.out.println("玩家都加载好了,开始游戏...."); } }); for(int i = 0 ; i < 10 ; i++){ new CyclicBarrierThread().start(); } } }
玩家 Thread-0 加载100% 玩家 Thread-2 加载100% 玩家 Thread-3 加载100% 玩家 Thread-6 加载100% 玩家 Thread-1 加载100% 玩家 Thread-4 加载100% 玩家 Thread-5 加载100% 玩家 Thread-8 加载100% 玩家 Thread-7 加载100% 玩家 Thread-9 加载100% 玩家都加载好了,开始游戏....
在某些应用中,可能需要同时加载多个数据源,但要确保所有数据加载完成后再继续执行。CyclicBarrier可以用来等待所有数据加载完成,然后执行后续操作。
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class DataLoaderExample { private static final int NUM_THREADS = 3; private static final CyclicBarrier barrier = new CyclicBarrier(NUM_THREADS, () -> { System.out.println("All data loading threads have completed. Initiating further processing..."); }); public static void main(String[] args) { for (int i = 0; i < NUM_THREADS; i++) { final int threadId = i; new Thread(() -> { // Simulate data loading System.out.println("Thread " + threadId + " is loading data."); // Simulate data loading time try { Thread.sleep((long) (Math.random() * 1000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Thread " + threadId + " has completed data loading."); try { // Wait for all data loading threads to complete barrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } // Perform further processing after data loading is complete System.out.println("Thread " + threadId + " is performing further processing."); }).start(); } } }
CyclicBarrier可以与其他并发工具一起使用,例如 ExecutorService 和 CountDownLatch,以实现更复杂的多线程控制逻辑。
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierExample { private static final int NUM_THREADS = 3; private static final CyclicBarrier barrier = new CyclicBarrier(NUM_THREADS, () -> { System.out.println("All threads have reached the barrier. Let's continue!"); }); public static void main(String[] args) { for (int i = 0; i < NUM_THREADS; i++) { new Thread(() -> { try { // Perform individual tasks System.out.println(Thread.currentThread().getName() + " is performing individual tasks."); // Wait for all threads to reach the barrier barrier.await(); // Continue with collective tasks after reaching the barrier System.out.println(Thread.currentThread().getName() + " is performing collective tasks."); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }).start(); } } }
大佬们可以收藏以备不时之需:
Spring Boot 专栏:http://t.csdnimg.cn/peKde
ChatGPT 专栏:http://t.csdnimg.cn/cU0na
Java 专栏:http://t.csdnimg.cn/YUz5e
Go 专栏:http://t.csdnimg.cn/Jfryo
Netty 专栏:http://t.csdnimg.cn/0Mp1H
Redis 专栏:http://t.csdnimg.cn/JuTue
Mysql 专栏:http://t.csdnimg.cn/p1zU9
架构之路 专栏:http://t.csdnimg.cn/bXAPS
通过本文,我们深入了解了CyclicBarrier的源码实现,并通过一个简单的示例演示了它的用法。
CyclicBarrier是一个强大的同步工具,可以帮助我们实现复杂的多线程协同任务。
在多线程编程中,理解和熟练使用这样的同步工具是至关重要的,能够确保线程之间的协同工作更加高效和可靠。
感谢您的支持和鼓励! 😊🙏
如果大家对相关文章感兴趣,可以关注公众号"架构殿堂",会持续更新AIGC,java基础面试题, netty, spring boot, spring cloud等系列文章,一系列干货随时送达!
上一篇:JavaScript快速入门