Java并发学习笔记(九):Semaphore、CountdownLatch、CyclicBarrier
JUC
四、Semaphore
1、基本使用
信号量,用来限制能同时访问共享资源的线程上限。
public static void main(String[] args) {
//创建Semaphore 对象,参数用户限定共享变量的个数
Semaphore semaphore = new Semaphore(3);
//10线程同时运行
for (int i = 0; i < 10; i++) {
new Thread(()->{
try {
// 获取许可
semaphore.acquire();
TheadPrint.print("run...");
Thread.sleep(1000);
TheadPrint.print("end...");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放许可
semaphore.release();
}
}).start();
}
}
从下面的结果可以看出,每个时刻最多有三个线程在运行,Semaphore 起到效果
07:35:15.485 c.TestSemaphore [Thread-2] - running...
07:35:15.485 c.TestSemaphore [Thread-1] - running...
07:35:15.485 c.TestSemaphore [Thread-0] - running...
07:35:16.490 c.TestSemaphore [Thread-2] - end...
07:35:16.490 c.TestSemaphore [Thread-0] - end...
07:35:16.490 c.TestSemaphore [Thread-1] - end...
07:35:16.490 c.TestSemaphore [Thread-3] - running...
07:35:16.490 c.TestSemaphore [Thread-5] - running...
07:35:16.490 c.TestSemaphore [Thread-4] - running...
07:35:17.490 c.TestSemaphore [Thread-5] - end...
07:35:17.490 c.TestSemaphore [Thread-4] - end...
07:35:17.490 c.TestSemaphore [Thread-3] - end...
07:35:17.490 c.TestSemaphore [Thread-6] - running...
07:35:17.490 c.TestSemaphore [Thread-7] - running...
07:35:17.490 c.TestSemaphore [Thread-9] - running...
07:35:18.491 c.TestSemaphore [Thread-6] - end...
07:35:18.491 c.TestSemaphore [Thread-7] - end...
07:35:18.491 c.TestSemaphore [Thread-9] - end...
07:35:18.491 c.TestSemaphore [Thread-8] - running...
07:35:19.492 c.TestSemaphore [Thread-8] - end...
2、semaphore应用
- 使用 Semaphore 限流,在访问高峰期时,让请求线程阻塞,高峰期过去再释放许可,当然它只适合限制单机 线程数量,并且仅是限制线程数,而不是限制资源数(例如连接数,请对比 Tomcat LimitLatch 的实现)
- 用 Semaphore 实现简单连接池,对比『享元模式』下的实现(用wait notify),性能和可读性显然更好, 注意下面的实现中线程数和数据库连接数是相等的
class Pool {
//定义连接池大小
private final int poolSize;
//连接对象数组
private Connection[] connections;
//定义连接状态数组 0:表示空闲,1:表示繁忙
private AtomicIntegerArray states;
//定义信号量
private Semaphore semaphore;
//构造方法初始化
public Pool(int poolSize) {
this.poolSize = poolSize;
semaphore = new Semaphore(poolSize);
this.connections = new Connection[poolSize];
this.states = new AtomicIntegerArray(new int[poolSize]);
for (int i = 0; i < poolSize; i++) {
connections[i] = new MockConnection("conn " + i);
}
}
//借连接
public Connection borrow() {
//首先获取许可,获取到许可才会继续向下执行
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
//遍历连接状态数组,找出空闲的连接并返回
for (int i = 0; i < poolSize; i++) {
if (states.get(i) == 0) {
//为了先线程安全,需要使用CAS对连接状态进行设置
if (states.compareAndSet(i, 0, 1)) {
TheadPrint.print(" borrow " + connections[i]);
return connections[i];
}
}
}
return null;
}
//归还连接
public void free(Connection conn) {
for (int i = 0; i < poolSize; i++) {
if (connections[i] == conn) {
//由于此时只有一个线程持有connections[i],所以不会有线程安全问题
states.set(i, 0);
//归还之后,归还信号量
TheadPrint.print( " free " + conn);
semaphore.release();
break;
}
}
}
}
测试代码:
public static void main(String[] args) {
System.out.println(new SemaphoreConnectPoolTest().a);
//创建2两个连接
Pool pool = new Pool(2);
//创建5个线程使用链接
for (int i = 0; i < 5; i++) {
new Thread(() -> {
//创建连接
Connection conn = pool.borrow();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//释放连接
pool.free(conn);
}
}, "线程 " + i).start();
}
}
从下面的结果可以看出每个时刻都只有两个线程在使用连接
21:33:28:379 线程 1: borrow MockConnection{name='conn 1'}
21:33:28:381 线程 0: borrow MockConnection{name='conn 0'}
21:33:29:388 线程 0: free MockConnection{name='conn 0'}
21:33:29:390 线程 2: borrow MockConnection{name='conn 0'}
21:33:29:390 线程 1: free MockConnection{name='conn 1'}
21:33:29:391 线程 3: borrow MockConnection{name='conn 1'}
21:33:30:391 线程 2: free MockConnection{name='conn 0'}
21:33:30:391 线程 4: borrow MockConnection{name='conn 0'}
21:33:30:405 线程 3: free MockConnection{name='conn 1'}
21:33:31:392 线程 4: free MockConnection{name='conn 0'}
3、semaphore原理
Semaphore 有点像一个停车场,permits 就好像停车位数量,当线程获得了 permits 就像是获得了停车位,然后 停车场显示空余车位减一
刚开始,permits(state)为 3,这时 5 个线程来获取资源,他们都会调用到AQS的tryAcquireShared方法来尝试获取:
//semaphore.acquire();
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//调用tryAcquireShared尝试获取
if (tryAcquireShared(arg) < 0)
//获取失败(< 0),会进入doAcquireSharedInterruptibly
doAcquireSharedInterruptibly(arg);
}
//调用tryAcquireShared最终会进入该方法中
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
//获取当前state值
int available = getState();
//state--
int remaining = available - acquires;
//判断当前remaining的值
if (remaining < 0 ||
//如果remaining>=0,则会使用CAS设置state的值为remaining并返回
compareAndSetState(available, remaining))
//如果remaining<0,则不更改state直接返回
return remaining;
}
}

假设其中 Thread-1,Thread-2,Thread-4 cas 竞争成功,而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列 park 阻塞
//tryAcquireShared(arg) < 0会进入该方法,
//也就是Thread-0 和 Thread-3 会进入该方法
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
//将自己添加到阻塞队列
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//进行休眠等待唤醒
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

这时 Thread-4 释放了 permits,状态如下

接下来 Thread-0 竞争成功,permits 再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark 接 下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态

五、CountdownLatch
用来进行线程同步协作,等待所有线程完成倒计时。
其中构造参数用来初始化等待计数值,await() 用来等待计数归零,countDown() 用来让计数减一
示例:
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);
new Thread(()->{
try {
TheadPrint.print("begin....");
Thread.sleep(1000);
latch.countDown();
TheadPrint.print("end....");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(()->{
try {
TheadPrint.print("begin....");
Thread.sleep(2000);
latch.countDown();
TheadPrint.print("end....");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(()->{
try {
TheadPrint.print("begin....");
Thread.sleep(1500);
latch.countDown();
TheadPrint.print("end....");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
TheadPrint.print("main wait....");
//等待三个线程全部都调用latch.countDown();
latch.await();
TheadPrint.print("main end....");
}
从结果可以看出,所有线程几乎同时开始,而主线程等到执行时间最长(2秒)的Thread-2调用完latch.countDown(),之后才继续执行
22:04:27:199 [Thread-0]: begin....
22:04:27:200 [main]: main wait....
22:04:27:199 [Thread-1]: begin....
22:04:27:206 [Thread-2]: begin....
22:04:28:202 [Thread-0]: end....
22:04:28:707 [Thread-2]: end....
22:04:29:203 [Thread-1]: end....
22:04:29:203 [main]: main end....
可以看到CountDownLatch的作用与Join十分相似,都是用于一个线程等待其他线程结束之后在执行。那为什么还需要CountDownLatch呢?
主要是因为在使用线程池中,线程一般是一直执行不会停止,所以这时候Join不太好使了,就需要我们用到CountDownLatch,CountDownLatch不要求线程一定结束,只要调用了countDown方法就可以
示例:
public static void main(String[] args) throws InterruptedException {
//等待3个
CountDownLatch latch = new CountDownLatch(3);
//使用线程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
executorService.submit(() -> {
try {
TheadPrint.print("begin....");
Thread.sleep(1000);
latch.countDown();
TheadPrint.print("end....");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executorService.submit(() -> {
try {
TheadPrint.print("begin....");
Thread.sleep(2000);
latch.countDown();
TheadPrint.print("end....");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executorService.submit(() -> {
try {
TheadPrint.print("begin....");
Thread.sleep(1500);
latch.countDown();
TheadPrint.print("end....");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
TheadPrint.print("main wait....");
latch.await();
TheadPrint.print("main end....");
}
可以看到结果和上面一致
22:15:49:367 [pool-1-thread-1]: begin....
22:15:49:372 [pool-1-thread-2]: begin....
22:15:49:373 [main]: main wait....
22:15:49:377 [pool-1-thread-3]: begin....
22:15:50:368 [pool-1-thread-1]: end....
22:15:50:877 [pool-1-thread-3]: end....
22:15:51:372 [pool-1-thread-2]: end....
22:15:51:372 [main]: main end....
另外如果需要返回值时就需要使用Future。
六、CyclicBarrier
循环栅栏,用来进行线程协作,等待线程满足某个计数。构造时设置『计数个数』,每个线程执 行到某个需要“同步”的时刻调用 await() 方法进行等待,当等待的线程数满足『计数个数』时,继续执行
CyclicBarrier cb = new CyclicBarrier(2); // 个数为2时才会继续执行
new Thread(() -> {
System.out.println("线程1开始.." + new Date());
try {
cb.await();// 当个数不足时,等待
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("线程1继续向下运行..." + new Date());
}).start();
new Thread(() -> {
System.out.println("线程2开始.." + new Date());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
try {
cb.await();// 2 秒后,线程个数够2,继续运行
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("线程2继续向下运行..." + new Date());
}).start();
注意: CyclicBarrier 与 CountDownLatch 的主要区别在于 CyclicBarrier 是可以重用的。 CyclicBarrier 可以被比 喻为『人满发车』