本文正在参与「金石方案 . 分割6万现金大奖」

在JDK的并发包里供给了几个非常有用的并发东西类。CountDownLatch、CyclicBarrier和Semaphore东西类供给了一种并发流程控制的手段,Exchanger东西类供给了在线程间交流数据的一种办法。

它们都在java.util.concurrent包下。先总体归纳一下都有哪些东西类,它们有什么效果,然后再别离介绍它们的首要运用办法和原理。

效果
CountDownLatch 线程等候直到计数器减为0时开端作业
CyclicBarrier 效果跟CountDownLatch相似,可是能够重复运用
Semaphore 约束线程的数量
Exchanger 两个线程交流数据

下面别离介绍这几个类。

CountDownLatch

概述

CountDownLatch能够使一个或多个线程等候其他线程各自履行完毕后再履行。

CountDownLatch界说了一个计数器,和一个堵塞行列, 当计数器的值递减为0之前,堵塞行列里边的线程处于挂起状况,当计数器递减到0时会唤醒堵塞行列一切线程,这里的计数器是一个标志,能够表示一个使命一个线程,也能够表示一个倒计时器

事例

玩吃鸡游戏的时分,正式开端游戏之前,肯定会加载一些前置场景,例如:“加载地图”、“加载人物模型”、“加载背景音乐”等。

public class CountDownLatchDemo {
    // 界说前置使命线程
    static class PreTaskThread implements Runnable {
        private String task;
        private CountDownLatch countDownLatch;
        public PreTaskThread(String task, CountDownLatch countDownLatch) {
            this.task = task;
            this.countDownLatch = countDownLatch;
        }
        @Override
        public void run() {
            try {
                Random random = new Random();
                Thread.sleep(random.nextInt(1000));
                System.out.println(task + " - 使命完结");
                countDownLatch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) {
        // 假定有三个模块需求加载
        CountDownLatch countDownLatch = new CountDownLatch(3);
        // 主使命
        new Thread(() -> {
            try {
                System.out.println("等候数据加载...");
                System.out.println(String.format("还有%d个前置使命", countDownLatch.getCount()));
                countDownLatch.await();
                System.out.println("数据加载完结,正式开端游戏!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        // 前置使命
        new Thread(new PreTaskThread("加载地图数据", countDownLatch)).start();
        new Thread(new PreTaskThread("加载人物模型", countDownLatch)).start();
        new Thread(new PreTaskThread("加载背景音乐", countDownLatch)).start();
    }
}
输出:
等候数据加载...
还有3个前置使命
加载地图数据 - 使命完结
加载人物模型 - 使命完结
加载背景音乐 - 使命完结
数据加载完结,正式开端游戏!

原理

CountDownLatch的办法很简略,如下:

// 结构办法:
public CountDownLatch(int count)
public void await() // 等候
public boolean await(long timeout, TimeUnit unit) // 超时等候
public void countDown() // count - 1
public long getCount() // 获取当时还有多少count

CountDownLatch结构器中的计数值(count)实际上便是闭锁需求等候的线程数量。这个值只能被设置一次,并且CountDownLatch没有供给任何机制去重新设置这个计数值

与CountDownLatch的第一次交互是主线程等候其他线程。主线程必须在发动其他线程后当即调用CountDownLatch.await()办法。这样主线程的操作就会在这个办法上堵塞,直到其他线程完结各自的使命。

其他N 个线程必须引证闭锁目标,因为他们需求通知CountDownLatch目标,他们现已完结了各自的使命。这种通知机制是经过CountDownLatch.countDown()办法来完结的;每调用一次这个办法,在结构函数中初始化的count值就减1。所以当N个线程都调 用了这个办法,count的值等于0,然后主线程就能经过await()办法,康复履行自己的使命。

源码剖析

CountDownLatch有一个内部类叫做Sync,它承继了AbstractQueuedSynchronizer类,其间维护了一个整数state,并且确保了修改state的可见性和原子性,源码如下:

private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
        Sync(int count) {
            setState(count);
        }
        int getCount() {
            return getState();
        }
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c - 1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

创建CountDownLatch实例时,也会创建一个Sync的实例,一起把计数器的值传给Sync实例,源码如下:

public CountDownLatch(int count) {
  if (count < 0) throw new IllegalArgumentException("count < 0");
  this.sync = new Sync(count);
}

countDown办法中,只调用了Sync实例的releaseShared办法,源码如下:

public void countDown() {
    sync.releaseShared(1);
}

其间的releaseShared办法,先对计数器进行减1操作,假如减1后的计数器为0,唤醒被await办法堵塞的一切线程,源码如下:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) { //对计数器进行减一操作
        doReleaseShared();//假如计数器为0,唤醒被await办法堵塞的一切线程
        return true;
    }
    return false;
}

其间的tryReleaseShared办法,先获取当时计数器的值,假如计数器为0时,就直接回来;假如不为0时,运用CAS办法对计数器进行减1操作,源码如下:

protected boolean tryReleaseShared(int releases) {
    for (;;) {//死循环,假如CAS操作失利就会不断持续测验。
        int c = getState();//获取当时计数器的值。
        if (c == 0)// 计数器为0时,就直接回来。
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))// 运用CAS办法对计数器进行减1操作
            return nextc == 0;//假如操作成功,回来计数器是否为0
    }
}

await办法中,只调用了Sync实例的acquireSharedInterruptibly办法,源码如下:

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

其间acquireSharedInterruptibly办法,判别计数器是否为0,假如不为0则堵塞当时线程,源码如下:

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)//判别计数器是否为0
        doAcquireSharedInterruptibly(arg);//假如不为0则堵塞当时线程
}

其间tryAcquireShared办法,是AbstractQueuedSynchronizer中的一个模板办法,其具体完结在Sync类中,其首要是判别计数器是否为零,假如为零则回来1,假如不为零则回来-1,源码如下:

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

CyclicBarrier

概述

CyclicBarrier 翻译为中文是循环(Cyclic)栅栏(Barrier)的意思,它的大约含义是完结一个可循环运用的屏障。

CyclicBarrier 效果是让一组线程相互等候,当抵达一个共同点时,一切之前等候的线程再持续履行,且 CyclicBarrier 功用可重复运用,运用reset()办法重置屏障。

事例

同样用玩游戏的例子。假如玩一个游戏有多个“关卡”,那运用CountDownLatch明显不太合适,那需求为每个关卡都创建一个实例。那咱们能够运用CyclicBarrier来完结每个关卡的数据加载等候功用。

public class CyclicBarrierDemo {
    static class PreTaskThread implements Runnable {
        private String task;
        private CyclicBarrier cyclicBarrier;
        public PreTaskThread(String task, CyclicBarrier cyclicBarrier) {
            this.task = task;
            this.cyclicBarrier = cyclicBarrier;
        }
        @Override
        public void run() {
            // 假定一共三个关卡
            for (int i = 1; i < 4; i++) {
                try {
                    Random random = new Random();
                    Thread.sleep(random.nextInt(1000));
                    System.out.println(String.format("关卡%d的使命%s完结", i, task));
                    cyclicBarrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
            System.out.println("本关卡一切前置使命完结,开端游戏...");
        });
        new Thread(new PreTaskThread("加载地图数据", cyclicBarrier)).start();
        new Thread(new PreTaskThread("加载人物模型", cyclicBarrier)).start();
        new Thread(new PreTaskThread("加载背景音乐", cyclicBarrier)).start();
    }
}
输出:
关卡1的使命加载背景音乐完结
关卡1的使命加载地图数据完结
关卡1的使命加载人物模型完结
本关卡一切前置使命完结,开端游戏...
关卡2的使命加载人物模型完结
关卡2的使命加载背景音乐完结
关卡2的使命加载地图数据完结
本关卡一切前置使命完结,开端游戏...
关卡3的使命加载背景音乐完结
关卡3的使命加载地图数据完结
关卡3的使命加载人物模型完结
本关卡一切前置使命完结,开端游戏...

与CountDownLatch有一些不同。CyclicBarrier没有分为await()countDown(),而是只要独自的一个await()办法。

一旦调用await()办法的线程数量等于结构办法中传入的使命总量,就代表抵达屏障了。CyclicBarrier答应咱们在抵达屏障的时分能够履行一个使命,能够在结构办法传入一个Runnable类型的目标。

源码剖析

结构函数:

public CyclicBarrier(int parties, Runnable barrierAction) {
  if (parties <= 0) throw new IllegalArgumentException();
  this.parties = parties;
  this.count = parties;
  this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
  this(parties, null);
}

默认barrierAction是null,这个参数是Runnable参数,当最终线程抵达的时分履行的使命,上述事例便是在抵达屏障时,输出“本关卡一切前置使命完结,开端游戏…”。parties 是参与的线程数。

接着看下await办法,有两个重载,区别是是否有等候超时,源码如下:

 public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
           BrokenBarrierException,
           TimeoutException {
    return dowait(true, unit.toNanos(timeout));
}

重点看下dowait(),核心逻辑便是这个办法,源码如下:

private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {       
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 每次运用屏障都会生成一个实例
            final Generation g = generation;
            // 假如被损坏了就抛反常
            if (g.broken)
                throw new BrokenBarrierException();
            // 线程中断检测
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            // 剩余的等候线程数
            int index = --count;
            // 最终线程抵达时 
            if (index == 0) {  // tripped
                // 标记使命是否被履行(便是传进入的runable参数)
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    // 履行使命
                    if (command != null)
                        command.run();
                    ranAction = true;
                    // 完结后 进行下一组 初始化 generation 初始化 count 并唤醒一切等候的线程 
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }
            // index 不为0时 进入自旋
            for (;;) {
                try {
                    // 先判别超时 没超时就持续等着
                    if (!timed)
                        trip.await();
                        // 假如超出指定时间 调用 awaitNanos 超时了开释锁
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                        // 中断反常捕获
                } catch (InterruptedException ie) {
                    // 判别是否被损坏
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // 否则的话中断当时线程
                        Thread.currentThread().interrupt();
                    }
                }
                // 被损坏抛反常
                if (g.broken)
                    throw new BrokenBarrierException();
                // 正常调用 就回来 
                if (g != generation)
                    return index;
                // 超时了而被唤醒的情况 调用 breakBarrier()
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

总结下dowait()办法的逻辑:

  1. 线程调用后,会查看barrier的状况、线程状况,反常状况会中断。
  2. 在初始化CyclicBarrier时,设置的资源值count,会进行--count
  3. 当10个线程中前9个线程,履行dowait()后,因为count!=0,因此会进行for(;;),在内部会履行Condition的trip.await()办法,进行堵塞。
  4. 堵塞结束的条件有:超时、被唤醒、线程中断。
  5. 当第10个线程履行dowait()后,因为count==0,会先查看并履行command的内容。
  6. 最终履行nextGeneration(),在内部调用trip.signalAll()唤醒一切trip.await()的线程。

假如被损坏了怎样康复呢?来看下reset()办法,源码如下:

public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier();   // break the current generation
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}

源码很简略,break之后重新生成新的实例,对应的会重新初始化count,在dowaitindex==0也调用了nextGeneration,所以说它是能够循环运用的。

与CountDonwLatch的区别

  • CountDownLatch减计数,CyclicBarrier加计数。

  • CountDownLatch是一次性的,CyclicBarrier能够重用。

  • CountDownLatch和CyclicBarrier都有让多个线程等候同步然后再开端下一步动作的意思,可是CountDownLatch的下一步的动作施行者是主线程,具有不可重复性;而CyclicBarrier的下一步动作施行者仍是“其他线程”本身,具有往复屡次施行动作的特色。

Semaphore

概述

Semaphore 一般译作 信号量,它也是一种线程同步东西,首要用于多个线程对共享资源进行并行操作的一种东西类。它代表了一种答应的概念,是否答应多线程对同一资源进行操作的答应,运用 Semaphore 能够控制并发拜访资源的线程个数。

运用场景

Semaphore 的运用场景首要用于流量控制

比方数据库衔接,一起运用的数据库衔接会有数量约束,数据库衔接不能超过必定的数量,当衔接抵达了约束数量后,后边的线程只能排队等前面的线程开释数据库衔接后才干获得数据库衔接。

比方泊车场的场景中,一个泊车场有有限数量的车位,一起能够容纳多少台车,车位满了之后只要等里边的车脱离泊车场外面的车才干够进入。

事例

模仿一下泊车场的事务场景:

在进入泊车场之前会有一个提示牌,上面显示着泊车位还有多少,当车位为 0 时,不能进入泊车场,当车位不为 0 时,才会答应车辆进入泊车场。所以泊车场有几个要害因素:泊车场车位的总容量,当一辆车进入时,泊车场车位的总容量 – 1,当一辆车脱离时,总容量 + 1,泊车场车位缺乏时,车辆只能在泊车场外等候。

public class SemaphoreDemo {
    private static Semaphore semaphore = new Semaphore(10);
    public static void main(String[] args) {
        for (int i = 0; i < 100; i++) {
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("欢迎 " + Thread.currentThread().getName() + " 来到泊车场");
                    // 判别是否答应泊车
                    if (semaphore.availablePermits() == 0) {
                        System.out.println("车位缺乏,请耐性等候");
                    }
                    try {
                        // 测验获取
                        semaphore.acquire();
                        System.out.println(Thread.currentThread().getName() + " 进入泊车场");
                        Thread.sleep(new Random().nextInt(10000));// 模仿车辆在泊车场逗留的时间
                        System.out.println(Thread.currentThread().getName() + " 驶出泊车场");
                        semaphore.release();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, i + "号车");
            thread.start();
        }
    }
}

Semaphore 的初始容量,也便是只要 10 个车位,咱们用这 10 个车位来控制 100 辆车的流量,所以成果和咱们料想的很相似,即大部分车都在等候状况。可是一起仍答应一些车驶入泊车场,驶入泊车场的车辆,就会 semaphore.acquire 占用一个车位,驶出泊车场时,就会 semaphore.release 让出一个车位,让后边的车再次驶入。

原理

Semaphore内部有一个承继了AQS的同步器Sync,重写了tryAcquireShared办法。在这个办法里,会去测验获取资源。

假如获取失利(想要的资源数量小于现在已有的资源数量),就会回来一个负数(代表测验获取资源失利)。然后当时线程就会进入AQS的等候行列。

Exchanger

概述

Exchanger类用于两个线程交流数据。它支撑泛型,也便是说你能够在两个线程之间传送任何数据。一个线程在完结必定的事务后想与另一个线程交流数据,则第一个先拿出数据的线程会一向等候第二个线程,直到第二个线程拿着数据到来时才干彼此交流对应数据。

事例

事例1:A同学和B同学交流各自保藏的大片。

public class ExchangerDemo {
    public static void main(String[] args) throws InterruptedException {
        Exchanger<String> stringExchanger = new Exchanger<>();
        Thread studentA = new Thread(() -> {
            try {
                String dataA = "A同学保藏多年的大片";
                String dataB = stringExchanger.exchange(dataA);
                System.out.println("A同学得到了" + dataB);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        System.out.println("这个时分A同学是堵塞的,在等候B同学的大片");
        Thread.sleep(1000);
        Thread studentB = new Thread(() -> {
            try {
                String dataB = "B同学保藏多年的大片";
                String dataA = stringExchanger.exchange(dataB);
                System.out.println("B同学得到了" + dataA);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        studentA.start();
        studentB.start();
    }
}
输出:
这个时分A同学是堵塞的,在等候B同学的大片
A同学得到了B同学保藏多年的大片
B同学得到了A同学保藏多年的大片

能够看到,当一个线程调用exchange办法后,它是处于堵塞状况的,只要当另一个线程也调用了exchange办法,它才会持续向下履行。

Exchanger类还有一个有超时参数的办法,假如在指定时间内没有另一个线程调用exchange,就会抛出一个超时反常。

public V exchange(V x, long timeout, TimeUnit unit)

事例2:A同学被放鸽子,买卖失利。

public class ExchangerDemo {
    public static void main(String[] args) {
        Exchanger<String> stringExchanger = new Exchanger<>();
        Thread studentA = new Thread(() -> {
            String dataB = null;
            try {
                String dataA = "A同学保藏多年的大片";
                dataB = stringExchanger.exchange(dataA,5, TimeUnit.SECONDS);
                System.out.println("A同学得到了" + dataB);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                System.out.println("等候超时-TimeoutException");
            }
            System.out.println("A同学得到了:"+dataB);
        });
        studentA.start();
    }
}
输出:
等候超时-TimeoutException
A同学得到了:null

原理

Exchanger类底层要害的技术有:

  • 运用CAS自旋指令完结数据交流;
  • 运用LockSupport的park办法使交流线程进入休眠等候,运用LockSupport的unpark办法唤醒等候线程。
  • 此外还声明晰一个Node目标用于存储交流数据。

Exchanger一般用于两个线程之间更方便地在内存中交流数据,因为其支撑泛型,所以咱们能够传输任何的数据,比方IO流或许IO缓存。依据JDK里边的注释的说法,能够总结为一下特性:

  • 此类供给对外的操作是同步的;
  • 用于成对出现的线程之间交流数据;
  • 能够视作双向的同步行列;
  • 可应用于遗传算法、流水线规划等场景。

需求留意的是,exchange是能够重复运用的。也便是说,两个线程能够运用Exchanger在内存中不断地再交流数据。

小结

本文合作一些应用场景介绍了JDK中供给的几个并发东西类,简略剖析了一下运用原理及事务场景,作业中,一旦有对应的事务场景,能够试试这些东西类。

期望收到你的点赞、保藏、关注~~

参考资料:《java并发编程的艺术》、《深入浅出Java多线程》等。