Java并发学习笔记(九):Semaphore、CountdownLatch、CyclicBarrier

JUC

四、Semaphore

1、基本使用

信号量,用来限制能同时访问共享资源的线程上限。

public static void main(String[] args) {
    //创建Semaphore 对象,参数用户限定共享变量的个数
    Semaphore semaphore = new Semaphore(3);

    //10线程同时运行
    for (int i = 0; i < 10; i++) {
        new Thread(()->{
            try {
                 // 获取许可 
                semaphore.acquire();
                TheadPrint.print("run...");
                Thread.sleep(1000);
                TheadPrint.print("end...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                 // 释放许可 
                semaphore.release();
            }
        }).start();
    }
}

从下面的结果可以看出,每个时刻最多有三个线程在运行,Semaphore 起到效果

07:35:15.485 c.TestSemaphore [Thread-2] - running... 
07:35:15.485 c.TestSemaphore [Thread-1] - running... 
07:35:15.485 c.TestSemaphore [Thread-0] - running... 
07:35:16.490 c.TestSemaphore [Thread-2] - end... 
07:35:16.490 c.TestSemaphore [Thread-0] - end... 
07:35:16.490 c.TestSemaphore [Thread-1] - end... 
07:35:16.490 c.TestSemaphore [Thread-3] - running... 
07:35:16.490 c.TestSemaphore [Thread-5] - running... 
07:35:16.490 c.TestSemaphore [Thread-4] - running... 
07:35:17.490 c.TestSemaphore [Thread-5] - end... 
07:35:17.490 c.TestSemaphore [Thread-4] - end... 
07:35:17.490 c.TestSemaphore [Thread-3] - end... 
07:35:17.490 c.TestSemaphore [Thread-6] - running... 
07:35:17.490 c.TestSemaphore [Thread-7] - running... 
07:35:17.490 c.TestSemaphore [Thread-9] - running... 
07:35:18.491 c.TestSemaphore [Thread-6] - end... 
07:35:18.491 c.TestSemaphore [Thread-7] - end... 
07:35:18.491 c.TestSemaphore [Thread-9] - end... 
07:35:18.491 c.TestSemaphore [Thread-8] - running... 
07:35:19.492 c.TestSemaphore [Thread-8] - end... 
2、semaphore应用
  • 使用 Semaphore 限流,在访问高峰期时,让请求线程阻塞,高峰期过去再释放许可,当然它只适合限制单机 线程数量,并且仅是限制线程数,而不是限制资源数(例如连接数,请对比 Tomcat LimitLatch 的实现)
  • 用 Semaphore 实现简单连接池,对比『享元模式』下的实现(用wait notify),性能和可读性显然更好, 注意下面的实现中线程数和数据库连接数是相等的
class Pool {

    //定义连接池大小
    private final int poolSize;

    //连接对象数组
    private Connection[] connections;

    //定义连接状态数组 0:表示空闲,1:表示繁忙
    private AtomicIntegerArray states;

    //定义信号量
    private Semaphore semaphore;

    //构造方法初始化
    public Pool(int poolSize) {
        this.poolSize = poolSize;
        semaphore = new Semaphore(poolSize);
        this.connections = new Connection[poolSize];
        this.states = new AtomicIntegerArray(new int[poolSize]);
        for (int i = 0; i < poolSize; i++) {
            connections[i] = new MockConnection("conn " + i);
        }
    }

    //借连接
    public Connection borrow() {
        //首先获取许可,获取到许可才会继续向下执行
        try {
            semaphore.acquire();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //遍历连接状态数组,找出空闲的连接并返回
        for (int i = 0; i < poolSize; i++) {
            if (states.get(i) == 0) {
                //为了先线程安全,需要使用CAS对连接状态进行设置
                if (states.compareAndSet(i, 0, 1)) {
                    TheadPrint.print(" borrow " + connections[i]);
                    return connections[i];
                }
            }
        }
        return null;
    }

    //归还连接
    public void free(Connection conn) {
        for (int i = 0; i < poolSize; i++) {
            if (connections[i] == conn) {
                //由于此时只有一个线程持有connections[i],所以不会有线程安全问题
                states.set(i, 0);
                //归还之后,归还信号量
                TheadPrint.print( " free " + conn);
                semaphore.release();
                break;
            }
        }
    }
}

测试代码:

public static void main(String[] args) {
    System.out.println(new SemaphoreConnectPoolTest().a);
    //创建2两个连接
    Pool pool = new Pool(2);
    //创建5个线程使用链接
    for (int i = 0; i < 5; i++) {
        new Thread(() -> {
            //创建连接
            Connection conn = pool.borrow();
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                //释放连接
                pool.free(conn);
            }
        }, "线程 " + i).start();
    }
}

从下面的结果可以看出每个时刻都只有两个线程在使用连接

21:33:28:379 线程 1:  borrow MockConnection{name='conn 1'}
21:33:28:381 线程 0:  borrow MockConnection{name='conn 0'}
21:33:29:388 线程 0:  free MockConnection{name='conn 0'}
21:33:29:390 线程 2:  borrow MockConnection{name='conn 0'}
21:33:29:390 线程 1:  free MockConnection{name='conn 1'}
21:33:29:391 线程 3:  borrow MockConnection{name='conn 1'}
21:33:30:391 线程 2:  free MockConnection{name='conn 0'}
21:33:30:391 线程 4:  borrow MockConnection{name='conn 0'}
21:33:30:405 线程 3:  free MockConnection{name='conn 1'}
21:33:31:392 线程 4:  free MockConnection{name='conn 0'}
3、semaphore原理

Semaphore 有点像一个停车场,permits 就好像停车位数量,当线程获得了 permits 就像是获得了停车位,然后 停车场显示空余车位减一
刚开始,permits(state)为 3,这时 5 个线程来获取资源,他们都会调用到AQS的tryAcquireShared方法来尝试获取:

//semaphore.acquire();
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //调用tryAcquireShared尝试获取
    if (tryAcquireShared(arg) < 0)
        //获取失败(< 0),会进入doAcquireSharedInterruptibly
        doAcquireSharedInterruptibly(arg);
}


//调用tryAcquireShared最终会进入该方法中
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        //获取当前state值
        int available = getState();
        //state--
        int remaining = available - acquires;
        //判断当前remaining的值
        if (remaining < 0 ||
            //如果remaining>=0,则会使用CAS设置state的值为remaining并返回
            compareAndSetState(available, remaining))
            //如果remaining<0,则不更改state直接返回
            return remaining;
    }
}

在这里插入图片描述
假设其中 Thread-1,Thread-2,Thread-4 cas 竞争成功,而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列 park 阻塞

//tryAcquireShared(arg) < 0会进入该方法,
//也就是Thread-0 和 Thread-3 会进入该方法
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        //将自己添加到阻塞队列
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            //进行休眠等待唤醒
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

在这里插入图片描述
这时 Thread-4 释放了 permits,状态如下
在这里插入图片描述
接下来 Thread-0 竞争成功,permits 再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark 接 下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态

在这里插入图片描述

五、CountdownLatch

用来进行线程同步协作,等待所有线程完成倒计时。
其中构造参数用来初始化等待计数值,await() 用来等待计数归零,countDown() 用来让计数减一

示例:

public static void main(String[] args) throws InterruptedException {

    CountDownLatch latch = new CountDownLatch(3);

    new Thread(()->{
        try {
            TheadPrint.print("begin....");
            Thread.sleep(1000);
            latch.countDown();
            TheadPrint.print("end....");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();

    new Thread(()->{
        try {
            TheadPrint.print("begin....");
            Thread.sleep(2000);
            latch.countDown();
            TheadPrint.print("end....");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();

    new Thread(()->{
        try {
            TheadPrint.print("begin....");
            Thread.sleep(1500);
            latch.countDown();
            TheadPrint.print("end....");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();

    TheadPrint.print("main wait....");
    //等待三个线程全部都调用latch.countDown();
    latch.await();
    TheadPrint.print("main end....");
}

从结果可以看出,所有线程几乎同时开始,而主线程等到执行时间最长(2秒)的Thread-2调用完latch.countDown(),之后才继续执行

22:04:27:199 [Thread-0]: begin....
22:04:27:200 [main]: main wait....
22:04:27:199 [Thread-1]: begin....
22:04:27:206 [Thread-2]: begin....
22:04:28:202 [Thread-0]: end....
22:04:28:707 [Thread-2]: end....
22:04:29:203 [Thread-1]: end....
22:04:29:203 [main]: main end....

可以看到CountDownLatch的作用与Join十分相似,都是用于一个线程等待其他线程结束之后在执行。那为什么还需要CountDownLatch呢?

主要是因为在使用线程池中,线程一般是一直执行不会停止,所以这时候Join不太好使了,就需要我们用到CountDownLatch,CountDownLatch不要求线程一定结束,只要调用了countDown方法就可以

示例:

public static void main(String[] args) throws InterruptedException {

    //等待3个
    CountDownLatch latch = new CountDownLatch(3);
    //使用线程池
    ExecutorService executorService = Executors.newFixedThreadPool(3);

    executorService.submit(() -> {
        try {
            TheadPrint.print("begin....");
            Thread.sleep(1000);
            latch.countDown();
            TheadPrint.print("end....");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });

    executorService.submit(() -> {
        try {
            TheadPrint.print("begin....");
            Thread.sleep(2000);
            latch.countDown();
            TheadPrint.print("end....");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });

    executorService.submit(() -> {
        try {
            TheadPrint.print("begin....");
            Thread.sleep(1500);
            latch.countDown();
            TheadPrint.print("end....");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
    
    TheadPrint.print("main wait....");
    latch.await();
    TheadPrint.print("main end....");
}

可以看到结果和上面一致

22:15:49:367 [pool-1-thread-1]: begin....
22:15:49:372 [pool-1-thread-2]: begin....
22:15:49:373 [main]: main wait....
22:15:49:377 [pool-1-thread-3]: begin....
22:15:50:368 [pool-1-thread-1]: end....
22:15:50:877 [pool-1-thread-3]: end....
22:15:51:372 [pool-1-thread-2]: end....
22:15:51:372 [main]: main end....

另外如果需要返回值时就需要使用Future

六、CyclicBarrier

循环栅栏,用来进行线程协作,等待线程满足某个计数。构造时设置『计数个数』,每个线程执 行到某个需要“同步”的时刻调用 await() 方法进行等待,当等待的线程数满足『计数个数』时,继续执行

CyclicBarrier cb = new CyclicBarrier(2); // 个数为2时才会继续执行

new Thread(() -> {
    System.out.println("线程1开始.." + new Date());
    try {
        cb.await();// 当个数不足时,等待 
    } catch (InterruptedException | BrokenBarrierException e) {
        e.printStackTrace();
    }
    System.out.println("线程1继续向下运行..." + new Date());
}).start();

new Thread(() -> {
    System.out.println("线程2开始.." + new Date());
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
    }
    try {
        cb.await();// 2 秒后,线程个数够2,继续运行
    } catch (InterruptedException | BrokenBarrierException e) {
        e.printStackTrace();
    }
    System.out.println("线程2继续向下运行..." + new Date());
}).start();

注意: CyclicBarrier 与 CountDownLatch 的主要区别在于 CyclicBarrier 是可以重用的。 CyclicBarrier 可以被比 喻为『人满发车』