java中的并发工具类(CountDownLatch 、CyclicBarrier )

it2022-05-05  40

1.等待多线程完成的CountDownLatch

  CountDownLatch是一个同步工具类,用来协调多个线程之间的同步,起到线程之间的通信(而不是用作互斥的作用)。CountDownLatch能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行。使用一个计数器进行实现。计数器初始值为线程的数量。当每一个线程完成自己任务后,计数器的值就会减一。当计数器的值为0时,表示所有的线程都已经完成了任务,然后在CountDownLatch上等待的线程就可以恢复执行任务。

  CountDownLatch是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当CountDownLatch使用完毕后,它不能再次被使用。

  如果有某个线理得比慢,我不可能线程一直等待,所以可以使用另外一个指定时间await方法——awaitlong timeTimeUnit unit),个方法等待特定后,就会不再阻塞当前线程。

简单例子:

public class CountDownLatchTest { static CountDownLatch c = new CountDownLatch(2); public static void main(String[] args) throws InterruptedException { new Thread(new Runnable() { @Override public void run() { System.out.println(1); c.countDown(); System.out.println(2); c.countDown(); } }).start(); c.await(); System.out.println("3"); } }

输出结果:1 2 3

 

public class CountDownLatchTest { static CountDownLatch c = new CountDownLatch(2); public static void main(String[] args) throws InterruptedException { new Thread(new Runnable() { @Override public void run() { c.countDown(); System.out.println(1); } }).start(); new Thread(new Runnable() { @Override public void run() { c.countDown(); System.out.println(2); } }).start(); c.await(); System.out.println("3"); } }

输出结果: 2 1 3

 

2.同步屏障CyclicBarrier

  CyclicBarrier的字面意思是可循使用(Cyclic)的屏障(Barrier)。它要做的事情是,组线程到达一个屏障(也可以叫同步点)被阻塞,直到最后一个线程到达屏障,屏障才会,所有被屏障截的线程才会继续运行。 

  它是循环栅栏,栅栏就是一种障碍物.假如我们将计数器设置为10,那么凑齐第一批10个线程后,计数器就会归零,然后接着凑齐下一批10个线程,这就是循环栅栏的含义。

 

public class CyclicBarrierTest { static CyclicBarrier c = new CyclicBarrier(3); public static void main(String[] args) { new Thread(new Runnable() { @Override public void run() { try { c.await(); } catch (Exception e) { } System.out.println(1); } }).start(); try { c.await(); } catch (Exception e) { } System.out.println(2); } }

线程和子线程会永等待,没有第三个线await方法,即没有第三个线程到达屏障,所以之前到达屏障的两个线程都不会继续执行。 这里所说的“屏障”就是(c.await()),也就是所有的线程全部停止在这里。

 

 

public class CyclicBarrierTest { static CyclicBarrier c = new CyclicBarrier(2, new A()); public static void main(String[] args) { new Thread(new Runnable() { @Override public void run() { try { c.await(); } catch (Exception e) { } System.out.println("...child thread .... "); } }).start(); try { c.await(); } catch (Exception e) { } System.out.println(".... main thread..."); } static class A implements Runnable { @Override public void run() { System.out.println("A Runable ..."); } } }

CyclicBarrier提供一个更高的构造函数CyclicBarrierint partiesRunnable barrierAction),用于在线程到达屏障barrierAction,方便理更复业务场

 

public class BankWaterService implements Runnable { /** * 创建4个屏障,处理完之后执行当前类的run方法 */ private CyclicBarrier c = new CyclicBarrier(4, this); /** * 假设只有4个sheet,所以只启动4个线程 */ private Executor executor = Executors.newFixedThreadPool(4); /** * 保存每个sheet计算出的银流结果 */ private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<String, Integer>(); private void count() { for (int i = 0; i< 4; i++) { executor.execute(new Runnable() { @Override public void run() { // 计算当前sheet的银流数据,计算代码省略 sheetBankWaterCount .put(Thread.currentThread().getName(), 1); // 银流计算完成,插入一个屏障 try { c.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }); } } @Override public void run() { int result = 0; // 汇总每个sheet计算出的结果 for (Map.Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()) { result += sheet.getValue(); } // 将结果输出 sheetBankWaterCount.put("result", result); System.out.println(result); } public static void main(String[] args) { BankWaterService bankWaterCount = new BankWaterService(); bankWaterCount.count(); } }

 

3.CyclicBarrierCountDownLatch的区

CountDownLatch数器只能使用一次,而CyclicBarrier数器可以使用reset()方法重置。所以CyclicBarrier理更业务场景。例如,如果错误,可以重置器,并让线程重新行一次 。

而且CountDownLatch不会阻止当前线程,而CyclicBarrier将当前线程阻止在屏障。

 

CyclicBarrier提供其他有用的方法,比如getNumberWaiting方法可以CyclicBarrier阻塞的线程数量。isBroken()方法用来了解阻塞的线程是否被中断。 

import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;public class CyclicBarrierTest3 { static CyclicBarrier c = new CyclicBarrier(2); public static void main(String[] args) throws InterruptedException, BrokenBarrierException { Thread thread = new Thread(new Runnable() { @Override public void run() { try { System.out.println("child thread before.."); c.await(); System.out.println("child thread after.."); } catch (Exception e) { } } }); thread.start(); thread.interrupt(); try { System.out.println("main thread before..."); c.await(); System.out.println("main thread after..."); } catch (Exception e) { System.out.println(c.isBroken()); } }}

输出结果:

  main thread before...

  child thread before..

  true

 

 

 

 

 

 

 

转载于:https://www.cnblogs.com/z-3FENG/p/9599127.html


最新回复(0)