「Java并发编程」用 信号量(Semaphore 实现一个消息池(含代码)

发布时间:2019-09-15 17:39:25访问人数:作者:

欲穷千里目,更上一层楼。—唐


这句诗的意思是:想看到更远更广阔的景物,你就要再上一层楼。想学到更多更深的知识,你就要比原来更努力。


PS: 如果觉得本文有用的话,请帮忙点赞,留言评论支持一下哦,您的支持是我较大的动力!谢谢啦~


Semaphore,计数信号量, 用来控制同时访问某个特定资源的线程数量 ,需要我们设定它的较大访问数量。 Semaphore 管理着一组虚拟许可,许可的初始数量可以通过构造函数来指定。在执行操作时可以首先获取许可,并在使用后释放许可。如果没有许可,那么获取操作将阻塞直到有可用的许可。


Semaphore 可以用于实现一个资源池,也可以将任何一个容器变成一个有界的阻塞容器,他在限制资源访问量上有很大的用处。


Semaphore 的核心方法


首先,我们先来看它的两个构造函数。


** * Creates a {@code Semaphore} with the given number of * permits and nonfair fairness setting. *public Semaphore(int permits { sync = new NonfairSync(permits}** * Creates a {@code Semaphore} with the given number of * permits and the given fairness setting. *public Semaphore(int permits, boolean fair { sync = fair ? new FairSync(permits : new NonfairSync(permits}<pre>


参数 permits 表示许可数量,即同时允许多少个线程访问。参数 fair 表示公平性,即等待越久越先获取到许可。


其次,再来看一下它获取和释放许可的方法,信号量的核心用法就是下面这些。


获取一个许可public void acquire( throws InterruptedException { }获取permits个许可public void acquire(int permits throws InterruptedException { } 释放一个许可public void release( { }  释放permits个许可public void release(int permits { } 尝试获取一个许可,若获取成功,则立即返回true,若获取失败,则立即返回falsepublic boolean tryAcquire( { } 尝试获取一个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false public boolean tryAcquire(long timeout, TimeUnit unit throws InterruptedException { }  尝试获取permits个许可,若获取成功,则立即返回true,若获取失败,则立即返回falsepublic boolean tryAcquire(int permits { }尝试获取permits个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回falsepublic boolean tryAcquire(int permits, long timeout, TimeUnit unit throws InterruptedException { }<pre>


使用场景


前面说过,Semaphore 可以用于实现一个资源池。所以,我们用它来实现一个固定数量的消息池,只允许固定数量的线程同时访问。


这个例子中消息池的数量为 3 个,信号量的许可数量也设置为 3 个,即用 Semaphore 来控制较多同时只能有三个线程使用,其中消息可以循环使用。


如果已有三个线程已经获取到了消息,那么其他线程获取消息的时候将会阻塞,直到有线程释放消息,它才能获取到。获取消息和释放消息通过 Semaphore 的 acqure( 和 release( 方法进行控制,其中 Semaphore 的许可数量不应大于消息池的较大数量。


import java.util.concurrent.Semaphorepublic class SemaphoreTest{表示消息池可用消息只有5个private static final int MAX_POOL_SIZE = 3获取消息的客户端的线程数量private static final int CLIENT_SIZE = 6 消息数组,存放所有消息private static Message messages = new Message[MAX_POOL_SIZE] 信号量,许可数量为消息的较大可用数量private static Semaphore semaphore = new Semaphore(MAX_POOL_SIZE初始化消息数组static void init( { for(int i = 0 i < MAX_POOL_SIZE i++ { messages[i] = new Message( }}同步方法,获取可用的消息static synchronized Message obtain( { Message msg = null for(int i = 0 i < MAX_POOL_SIZE i++ { if(messages[i].getFlag( == false { msg = messages[i] msg.setId(i msg.setFlag(true return msg } } return msg}同步方法,把用完的消息放回消息池static synchronized boolean release(Message msg { if(msg.getFlag( == true { msg.setFlag(false msg.setId(-1 return true } return false}用信号量控制能获取消息的数目static Message obtainMsg( throws InterruptedException { semaphore.acquire( return obtain(}成功释放消息的同时释放信号量static void releaseMsg(Message msg { System.out.print(Thread.currentThread(.getName( + " ***Release msg id*** = "+ msg.getId( + "n" if(release(msg { semaphore.release( }}public static void main(String args {  初始化 init(  创建子线程,获取消息 for(int i = 0 i < CLIENT_SIZE i++ { new Thread(new Runnable( { @Override public void run( { try { 获取消息 Message msg = obtainMsg( System.out.print(Thread.currentThread(.getName( + " Obtain msg id = "+ msg.getId( + "n" 假装耗时操作 Thread.sleep(1000 释放消息 releaseMsg(msg } catch (InterruptedException e { e.printStackTrace( } }}.start( }}声明一个消息类static class Message { private int id 表示每个消息的id private boolean flag 表示消息是否可用 public Message( { this.id = -1 this.flag = false }  public void setId(int id { this.id = id }  public void setFlag(boolean b { this.flag = b }  public int getId( { return this.id }  public boolean getFlag( { return this.flag }}}<pre>


执行结果:


这里写图片描述


从这个结果看,线程 1,线程 2,线程 0 先获取到消息;接着线程 1 和 2 释放消息;释放消息后,那么此时消息池又有两个空闲消息,所以,线程 3 和线程 5 获取了消息;


紧接着线程 0 释放消息,线程 4 立马获取了消息。。。


这程序的执行结果和我们预期的流程一样。 需要注意的点,Semaphore 是线程安全的。在这个例子中,不可能同时有 4 个线程能同时获取到消息。


注意


既然 Semaphore 是线程安全的,为什么上面两个方法需要添加同步?


static synchronized boolean release(Message msgstatic synchronized Message obtain(<pre>


这里我们不能混淆概念,Semaphore 的线程安全是指同时只能有三个线程进入,即 acquire( 和 release( 必定线程安全。 然而获取到许可后的操作不保证线程安全,所以这里加同步锁是为了确保获取消息的过程是安全的。


另外一点需要注意,为什么下面两个方法不需要使用同步锁?


static void releaseMsg(Message msgstatic Message obtainMsg( throws InterruptedException<pre>


细心的朋友可能已经知道,这里加上同步的话,会产生死锁。假如此时 acquire( 发生阻塞,那么obtainMsg( 一直持有同步锁,而 releaseMsg( 的时候必须等待同步锁的释放,这时必定陷入死锁,一直死等,然而没什么软用。


这里不需要加同步锁,是因为我们要确保安全的内容是 获取许可集 后的数据安全,和释放许可集之前的数据安全。


本文完结,如果觉得有帮助,请关注我哦,谢谢啦~


赞+1

华夕网络 版权所有 Copyright © 2012-2018 www.jswlgs.com All Rights Reserved .   备案号:沪ICP备15005556号-3    网站地图    代理商查询

  • QQ
  • 电话
  • 首页
  • 留言
  • 返回顶部
  • zxly.png