您现在的位置是:亿华云 > IT科技

并发编程之CyclicBarrier原理与使用

亿华云2025-10-03 02:07:59【IT科技】2人已围观

简介前言控制并发流程的工具类,作用就是帮助我们程序员更容易的让线程之间合作,让线程之间相互配合来满足业务逻辑。比如让线程A等待线程B执行完毕后再执行等合作策略。控制并发流程的工具类主要有:简介从字面意思看

 前言

控制并发流程的并发编程工具类,作用就是原理帮助我们程序员更容易的让线程之间合作,让线程之间相互配合来满足业务逻辑。使用比如让线程A等待线程B执行完毕后再执行等合作策略。并发编程

控制并发流程的原理工具类主要有:

简介

从字面意思看,这个类的使用中文意思是“循环栅栏”。大概的并发编程意思就是一个可循环利用的屏障。它的原理作用就是会让所有线程都等待完成后才会继续下一步行动。

举个例子,使用就像生活中我们会约朋友到某个餐厅一起吃饭,并发编程有些朋友可能会早到,原理有些朋友可能会晚到,使用但这个餐厅规定必须等到所有人到期之后才会让我们进去。并发编程这里的原理朋友们就各个线程,餐厅就是使用CyclicBarrier。

在JUC包中为我们提供了一个同步工具类能够很好的模拟这类场景,它就是CyclicBarrier类。利用CyclicBarrier类可以实现一组线程相互等待,当所有线程都到达某个屏障点后再进行后续的源码库操作。下图演示了这一过程。

应用场景

可用于多线程计数数据,最后合并计数结果的场景。

使用CyclicBarrier实现等待的线程都被称为参与方。参与方只需要执行cyclicBarrier.await() 就可以实现等待。由于CyclicBarrier内部维护了一个显示锁,这可以知道参与方中谁最后一个执行cyclicBarrier.await() 。当最后一个线程执行完,会使得使用相应CyclicBarrier实例的其他参与方被唤醒,而最后一个线程自身不会被暂停。其流程图如下:

public static void main(String[] args) {          CyclicBarrier cyclicBarrier = new CyclicBarrier(7,() ->{              System.out.println("****召唤神龙");         });         for(int i = 1;i <= 7; i++){              int finalI = i;             new Thread(() -> {                  System.out.println(Thread.currentThread().getName() + "\t 收集到第"+ finalI +"颗龙珠");                 try {                      cyclicBarrier.await();                 } catch (InterruptedException e) {                      e.printStackTrace();                 } catch (BrokenBarrierException e) {                      e.printStackTrace();                 }             },String.valueOf(i)).start();         }     } 

 源码分析

CyclicBarrier 类图

CyclicBarrier是包含了 “ReentrantLock对象lock” 和 “Condition对象trip”,它是通过独占锁实现的。

其内部主要变量和方法如下:

成员变量

//同步操作锁

private final ReentrantLock lock = new ReentrantLock();

//同步操作锁 private final ReentrantLock lock = new ReentrantLock(); //线程拦截器 private final Condition trip = lock.newCondition(); //每次拦截的线程数 private final int parties; //换代前执行的任务 private final Runnable barrierCommand; //表示栅栏的当前代 private Generation generation = new Generation(); //计数器 private int count; //静态内部类Generation private static class Generation {    boolean broken = false; } 

 可以看到 CyclicBarrier 内部是通过条件队列 trip 来对线程进行阻塞的,并且其内部维护了两个 int 型的变量 parties 和 count:

parties 表示每次拦截的线程数,该值在构造时进行赋值; count 是内部计数器,它的初始值和 parties 相同,以后随着每次 await 方法的香港云服务器调用而减 1,直到减为 0 就将所有线程唤醒。

CycliBarrier 有一个静态内部类 Generation,该类的对象代表栅栏的当前代,就像玩游戏时代表的本局有些,利用它可以实现循环等待。barrierCommand 表示换代前执行的任务,当 count 减为 0 时表示本局游戏结束,需要转到下一句。在转到下一句游戏之前会将所有阻塞的线程唤醒,在唤醒所有线程之前你可以通过指定 barrierCommand 来执行自己的任务。

构造函数

主要提供了两个构造方法

public CyclicBarrier(int parties) {   this(parties, null); } public CyclicBarrier(int parties, Runnable barrierAction) {      if (parties <= 0) throw new IllegalArgumentException();     // parties表示“必须同时到达barrier的线程个数”。     this.parties = parties;     // count表示“处在等待状态的线程个数”。     this.count = parties;     // barrierCommand表示“parties个线程到达barrier时,会执行的动作”。     this.barrierCommand = barrierAction; } 

 解析:

parties 是参与线程的个数 第二个构造方法有一个Runnable参数,这个参数的意思是最后一个到达线程要执行的动作。

重要方法

CyclicBarrier类最主要的功能就是站群服务器使先到达屏障点的线程阻塞并等待后面的线程,其中它提供了两种等待的方法,分别是定时等待和非定时等待。

await()方法

//非定时等待 public int await() throws InterruptedException, BrokenBarrierException {    try {      return dowait(false, 0L);   } catch (TimeoutException toe) {      throw new Error(toe);   } } //定时等待 public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {    return dowait(true, unit.toNanos(timeout)); } 

 解析:

线程调用await()表示总结已经到达栅栏 BrokenBarrierException表示栅栏已经被破坏,破坏的原因可能是其中一个线程await()时被中断或者超时。

dowait()方法

可以看到不管是定时等待还是非定时等待,它们都调用了dowait方法,只不过是传入的参数不同而已。下面我们就来看看dowait方法都做了些什么。

//核心等待方法 private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {    // 显示锁   final ReentrantLock lock = this.lock;   lock.lock();   try {      final Generation g = generation;     //检查当前栅栏是否被打翻     if (g.broken) {        throw new BrokenBarrierException();     }     //检查当前线程是否被中断     if (Thread.interrupted()) {        //如果当前线程被中断会做以下三件事       //1.打翻当前栅栏       //2.唤醒拦截的所有线程       //3.抛出中断异常       breakBarrier();       throw new InterruptedException();     }     //每次都将计数器的值减1     int index = --count;     //计数器的值减为0则需唤醒所有线程并转换到下一代     if (index == 0) {        boolean ranAction = false;       try {          //唤醒所有线程前先执行指定的任务         final Runnable command = barrierCommand;         if (command != null) {            command.run();         }         ranAction = true;         //唤醒所有线程并转到下一代         nextGeneration();         return 0;       } finally {          //确保在任务未成功执行时能将所有线程唤醒         if (!ranAction) {            breakBarrier();         }       }     }     //如果计数器不为0则执行此循环     for (;;) {        try {          //根据传入的参数来决定是定时等待还是非定时等待         if (!timed) {            trip.await();         }else if (nanos > 0L) {            nanos = trip.awaitNanos(nanos);         }       } catch (InterruptedException ie) {          //若当前线程在等待期间被中断则打翻栅栏唤醒其他线程         if (g == generation && ! g.broken) {            breakBarrier();           throw ie;         } else {            //若在捕获中断异常前已经完成在栅栏上的等待, 则直接调用中断操作           Thread.currentThread().interrupt();         }       }       //如果线程因为打翻栅栏操作而被唤醒则抛出异常       if (g.broken) {          throw new BrokenBarrierException();       }       //如果线程因为换代操作而被唤醒则返回计数器的值       if (g != generation) {          return index;       }       //如果线程因为时间到了而被唤醒则打翻栅栏并抛出异常       if (timed && nanos <= 0L) {          breakBarrier();         throw new TimeoutException();       }     }   } finally {      lock.unlock();   } } 

 上面执行的代码相对比较容易看懂,我们再来看一下执行流程:

获得显示锁,判断当前线程状态是否被中断,如果是,则执行 breakBarrier 方法,唤醒之前阻塞的所有线程,并将计数器重置; 计数器 count 减 1,如果 count == 0,表示最后一个线程达到栅栏,接着执行之前指定的 Runnable 接口,同时执行 nextGeneration 方法进入下一代; 否则,进入自旋,判断当前线程是进入定时等待还是非定时等待,如果在等待过程中被中断,执行 breakBarrier 方法,唤醒之前阻塞的所有线程; 判断是否是因为执行 breakBarrier 方法而被唤醒,如果是,则抛出异常; 判断是否是正常的换代操作而被唤醒,如果是,则返回计数器的值; 判断是否是超时而被唤醒,如果是,则唤醒之前阻塞的所有线程,并抛出异常; 释放锁。

breakBarrier()方法

private void breakBarrier() {   generation.broken = true;//栅栏被打破  count = parties;//重置count  trip.signalAll();//唤醒之前阻塞的线程 } 

 nextGeneration()方法

private void nextGeneration() {   //唤醒所以的线程  trip.signalAll();  //重置计数器  count = parties;  //重新开始  generation = new Generation(); } 

 reset()方法

接下来看看栅栏重置的方法

// 重置barrier到初始状态,所有还在等待中的线程最终会抛出BrokenBarrierException。 public void reset() {   final ReentrantLock lock = this.lock;     lock.lock();     try {       breakBarrier();   // break the current generation         nextGeneration(); // start a new generation     } finally {       lock.unlock();     } } 

 其它方法

CyclicBarrier 其它还提供了例如getParties,isBroken,getNumberWaiting等方法,都比较简单,其中除了getParties由于parties被final修饰不可变,其余方法都会先去获得互斥锁。

/**  * 获取当前这一轮是否已经broken。  */ public boolean isBroken() {      final ReentrantLock lock = this.lock;     lock.lock();     try {          return generation.broken;     } finally {          lock.unlock();     } } /**  * 获得当前在barrier中等待的线程数。  */ public int getNumberWaiting() {      final ReentrantLock lock = this.lock;     lock.lock();     try {          return parties - count;     } finally {          lock.unlock();     } } 

 总结

CountDownLatch和CyclicBarrier区别

CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同: CountDownLatch一般用于一个或多个线程,等待其他线程执行完任务后,再才执行; CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行; CountDownLatch 是一次性的,CyclicBarrier 是可循环利用的; CountDownLathch是一个计数器,线程完成一个记录一个,计数器递减,只能用一次。如下图:

CyclicBarrier的计数器更像一个阀门,需要所有线程都到达,然后继续执行,计数器递减,提供reset功能,可以多次使用。如下图:

PS:以上代码提交在 Github :

https://github.com/Niuh-Study/niuh-juc-final.git

PS:这里有一个技术交流群(QQ群:1158819530),方便大家一起交流,持续学习,共同进步,有需要的可以加一下。

很赞哦!(31)