CountDownLatch

CountDownLatch 是一个线程同步东西,能够协调多个线程的同步。能够使当时线程等候其他一个或多个线程履行结束后主动康复履行。它相当所以一个计数器,初始化时指定需求履行的线程数量,每个线程结束之后计数器 -1,到计数器为 0时,在康复此前堵塞的线程。在一些场景如果不借助这种东西,代码可能便不那么「高雅」。

举个例子

有这样一个需求,需求在两个线程处理数据,在组合两个成果之后处理后续逻辑,如果不必 CountDownLatch,可能会这样写:

public class ThreadTest7 {
  volatile int seq = 0;
  public static void main(String[] args) {
    ThreadTest7 test7 = new ThreadTest7();
    new SomethingProcessor("thread1", value -> {
      test7.seq++;
      System.out.println("callback:" + value);
      test7.tryDoSomethingElse();
    }).start();
    new SomethingProcessor("thread2", value -> {
      test7.seq++;
      System.out.println("callback:" + value);
      test7.tryDoSomethingElse();
    }).start();
  }
  private void tryDoSomethingElse(){
    if(seq == 2){
      System.out.println("Doing something!");
    }
  }
}
class SomethingProcessor extends Thread{
  private String name;
  private ProcessorCallback callback;
  SomethingProcessor(String name,ProcessorCallback callback){
    this.name = name;
    this.callback = callback;
  }
  @Override
  public void run() {
    try {
      Thread.sleep((long) (1000 * Math.random()));
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
    callback.onCallback(name + "Fake processor!");
  }
  interface ProcessorCallback{
    void onCallback(String value);
  }
}
=========================================================
callback:thread1Fake processor!
callback:thread2Fake processor!
Doing something!

这样的代码应该也并不少见,业务逻辑是需求两个线程都处理结束之后再组合处理之后的逻辑。而处理使命的两个线程相互不通讯,无法知道其他线程的状况,所以两个线程都需求去调用后续办法,在这个办法内回去判断是不是一切线程都履行结束,如果是则继续,不是则 retrun

代码看起来没问题,功能也ok,不过不太「美观」。现在只要两个线程,所以你需求履行两次 tryDoSomethingElse(),那如果是十个八个线程呢?是不是感觉就头疼了?而且费事的是,如果调用 tryDoSomethingElse() 就意味着你的业务线程需求耦合其他逻辑,这便增加了代码耦合度,背离了咱们代码架构的准则。

如果这时候,你用 CountDownLatch ,则能够更「漂亮」地解决问题,来看下怎么改造:

public class ThreadTest7 {
  public static void main(String[] args) {
    CountDownLatch latch = new CountDownLatch(2);
    ThreadTest7 test7 = new ThreadTest7();
    new SomethingProcessor("thread1", value -> {
      System.out.println("callback:" + value);
    }, latch).start();
    new SomethingProcessor("thread2", value -> {
      System.out.println("callback:" + value);
    }, latch).start();
    try {
      latch.await();
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
    test7.doSomethingElse();
  }
  private void doSomethingElse() {
    System.out.println("Doing something!");
  }
}
class SomethingProcessor extends Thread {
  private String name;
  private ProcessorCallback callback;
  private CountDownLatch countDownLatch;
  SomethingProcessor(String name, ProcessorCallback callback, CountDownLatch latch) {
    this.name = name;
    this.callback = callback;
    countDownLatch = latch;
  }
  @Override
  public void run() {
    try {
      Thread.sleep((long) (1000 * Math.random()));
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
    callback.onCallback(name + "Fake processor!");
    countDownLatch.countDown();
  }
  interface ProcessorCallback {
    void onCallback(String value);
  }
}

能够看到,咱们去掉了计数器 seq ,声明一个 CounDownLatch(2) 注入到子线程。这样,在子线程的回调中就不需求去管后续操作是什么,需求尝试调用什么办法了。只要两个线程没有履行完,则主线程会被堵塞,待子线程悉数履行结束之后便主动康复履行。这样咱们是不是就将业务线程与主线程的逻辑解耦了?

怎么运用

CountDownLatch 的运用十分简单,这个类只要 300+ 行代码,只要 4 个 public 的办法,咱们能够自己阅读下。运用时先通过结构办法声明需求履行的子线程数量 CountDownLatch(int count),主要运用的办法有两个 await()countDown():

  • await(),会堵塞当时线程,直到 latch「计数器」 的数量为 0 时才康复
  • countDown(),每次调用 latch「计数器」 数量会减 1 ,当 latch 数量为 0 时会唤醒此前堵塞的线程

在需求等候其他线程 finish 的当地调用 await(),在子线程履行结束时调用 countDown() ,就能够了。

运用场景

CountDownLatch 的工作流程是当时线程等候,直到其他线程(使命)履行结束之后才回复。所以,有两个运用场景比较典型:

  • 大型 App 主页展现:App 主页需求展现多个服务内容, 如广告、商品、电影等等,它们不太可能在同一个服务里,所以需求有多个异步请求,在一切请求处理完成之后再展现。
  • 初始化:在运用某些才干时,需求初始化多个服务,在都初始化完成后再继续业务处理。

CyclicBarrier

字面意思循环栅栏或叫循环屏障,通过它能够完成让一组线程等候至某个状况之后再悉数一起履行。叫做循环是因为当一切等候线程都被释放以后,CyclicBarrier 能够被重用。咱们暂时把这个状况就叫做barrier,当调用await()办法之后,线程就处于barrier了。

主要办法:

public CyclicBarrier(int parties, Runnable barrierAction)

另外还有个单参数的结构函数,就相当于此处的 barrierAction 为空,这儿就不另外列了。parties 就是需求让多少个线程(使命)等候至 barrier 状况。barrierAction 是在一切线程都抵达 barrier 状况后,会优先履行「在被堵塞的线程之前」的内容。

public int await()

此办法用来挂起当时线程,直至一切线程都抵达barrier状况再一起康复履行后续使命

运用场景

比如,有一款游戏,就相似 PUBG 吧,能够创建房间,在房间到达四人且预备后开端游戏。或许跑步吧,100米竞赛赛场有八个赛道,需求等候每个赛道的运动员都预备好才干开端竞赛。咱们来用 CyclicBarrier 完成这样的需求。

public static void main(String[] args) {
  final CyclicBarrier cyclicBarrier = new CyclicBarrier(4,()->{
    System.out.println("一切玩家预备结束,能够开端游戏了,开端前预备!");
  });
  for (int i = 0; i < 4; i++) {
    new Thread("玩家「"+i+"」"){
      @Override
      public void run() {
        try {
          System.out.println(Thread.currentThread().getName() + "正在预备");
          Thread.sleep((long) (1000 * Math.random()));
          System.out.println(Thread.currentThread().getName() + "预备好了");
          cyclicBarrier.await();
          System.out.println(Thread.currentThread().getName() + "开端玩!");
        } catch (InterruptedException e) {
          e.printStackTrace();
        } catch (BrokenBarrierException e) {
          e.printStackTrace();
        }
      }
    }.start();
  }
}
============================================================================
玩家「0」正在预备
玩家「3」正在预备
玩家「1」正在预备
玩家「2」正在预备
玩家「1」预备好了
玩家「3」预备好了
玩家「2」预备好了
玩家「0」预备好了
一切玩家预备结束,能够开端游戏了,开端前预备!
玩家「0」开端玩!
玩家「1」开端玩!
玩家「3」开端玩!
玩家「2」开端玩!

await()就相当于玩家在游戏界面内点了「预备」按钮,表明现已预备好了「即抵达Barrier」。使命线程不知道其他线程状况,只能奉告 CyclicBarrier「房主」我现已预备好了。当一切人都预备好了后,房主便能够先完成游戏开端前的预备工作再通知一切玩家,能够开端游戏了。

CountDownLatch 与 CyclicBarrier 的差异

  • 它们都是用来控制一个或多个线程与一组线程之间的同步东西
  • CountDownLatch 的状况无法重置,它的计数器在被归0后无法再康复。CyclicBarrier 则提供了 reset 办法去重置状况,它能够被重复运用。
  • 运用场景不太相同,CountDownLatch 是在计数器归 0 之后去康复之前被堵塞的线程,只康复这一个。而 CycliBarrier 是在 parties 线程数都抵达栅栏「调用await()」后去康复一切此前被堵塞的线程。康复的线程数量是 parties个。

最后

关于CounDownLatchCyclicBarrier的运用就跟咱们聊这些,期望对咱们会有协助,也欢迎咱们多多沟通,谢谢!