Java之阻塞队列和消息队列 | 您所在的位置:网站首页 › java消息通知设计 › Java之阻塞队列和消息队列 |
目录 一.上节复习 1.什么是单列模式 2.饿汉模式 3.懒汉模式 二.阻塞队列 1.什么是阻塞队列 三.消息队列 1.什么是消息队列 2.消息队列的作用 1.解耦 2.削峰填谷 3.异步 四.JDK中的阻塞队列 1.常见的阻塞队列 2.向阻塞队列中放入元素---put() 3.向阻塞队列中拿出元素---take() 五.手动实现阻塞队列 1.普通队列的实现 2.堵塞队列的实现 六.实现生产者和消费者模型 1.消费速度大于生产速度 2.生产速度大于消费速度 3.虚假唤醒 一.上节复习上节内容指路:Java之单例模式 1.什么是单列模式单例模式是一种设计模式(设计模式:就是在特定的场景下,解决问题最优的方式,类似于棋谱),单例:顾名思义,全局只有一个实例对象 2.饿汉模式饿汉模式:类加载的时候就完成初始化,DCL双重检查锁 public class Singleton { private static Singleton instance = new Singleton(); private Singleton() { } public static Singleton getInstance() { return instance; } }优点:书写简单,不容易出错 3.懒汉模式懒汉模式:程序使用对象的时候才进行初始化 public class SingletonLazy { private static volatile SingletonLazy instance = null; private SingletonLazy() { } public static SingletonLazy getInstance() { if (instance == null) { //在获取单例对象的时候,判断是否已经被创建,没有创建则创建 synchronized (SingletonLazy.class) { if (instance == null) { instance = new SingletonLazy(); } } } return instance; } }优点:避免了资源的浪费 二.阻塞队列 1.什么是阻塞队列和之前学习过的队列一样,也是FIFO(先进先出). 入队元素时,先判断队列是否满了,如何满了就阻塞(等待),直到队列中有空余空间再入队. 出队元素时,先判断队列是否为空,如果空了就阻塞(等待),直到队列中有元素使再出队. 实例:包饺子:分为擀饺子皮和包饺子两个操作 当放饺子皮的盘子满了,擀饺子皮的人停止擀皮(等待)--入队列操作,等待有空间了再工作 当放饺子皮的盘子空了,包饺子的人就停止包饺子(等待)---出队列操作,等待有饺子皮再工作 这种模式叫做生产者消费者模型 三.消息队列 1.什么是消息队列消息队列本质上就是阻塞队列,在此基础上为放入阻塞队列的消息打上了标签 为不同的消息进行了分组的操作 在基础数据结构上,做了一些针对应用场景的优化和实现,那么我们把这样的框架和软件,称为“中间件” 消息队列就是中间件 2.消息队列的作用 1.解耦一个良好的程序应该是"高内聚,低耦合"的. 高内聚:将功能强相关的代码写在一块,方便维护 低耦合:两个相关的模块尽可以能把依赖的部分降低到最小,不要让两个系统产生强依赖 实例:当我们订单支付,订单系统会对支付系统发起请求支付的指令,调用支付系统的相关命令,这样子就是一个高耦合的例子,如果支付系统崩溃,会直接对订单系统产生影响,支付系统修改相关的代码,订单系统也要发生修改 这个时候我们维护一个消息队列,可以对两个系统进行解耦操作,即使支付系统崩溃了,订单系统也能进行正常的工作,代码修改也不会牵一发而动全身 2.削峰填谷峰和谷是指消息的密集程度 例如现实中的三峡大坝 汛期:蓄水,防止下游遭遇洪峰的冲击 削峰 旱期:把存的水进行灌溉使用 填谷 例如在双十一等流量很大的时间点,我们使用消息队列可以存储订单的信息,然后慢慢的处理订单的信息,过了时间点流量慢慢小的时候,我们依旧可以处理订单信息.没必要在大流量的时候一下子处理所有的订单,这样可能会造成服务器崩溃. 3.异步同步:请求方必须死等对方的响应才能开始下一步操作. 异步:请求方发出请求之后,可以进行其他的操作,没必须等待对方的响应才开始操作. 比如订单系统对支付系统发出请求之后,没必须死等支付系统成功的响应就才开始其他订单的操作,而是进行其他订单的操作,同时等待这个订单的相应结果. 四.JDK中的阻塞队列 1.常见的阻塞队列 LinkedBlockingQueue 基于链表ArrayBlockingQueue 基于数组PriorityBlockingQueue 基于优先级队列在创建的时候可以指定队列的大小 2.向阻塞队列中放入元素---put()会抛出InterruptedException异常,阻塞队列中专用的入队的方法,不能使用offer()和add() public class Demo01_BlockingQueue { public static void main(String[] args) throws InterruptedException { //定义一个阻塞队列 BlockingQueue queue=new LinkedBlockingQueue(3); //使用put()方法,不能使用add()和offer() queue.put(1); queue.put(2); queue.put(3); System.out.println("此时插入了三个元素"); System.out.println(queue); queue.put(4); System.out.println("此时插入了四个元素"); } }打印的结果: 因为我们指定的阻塞队列的大小为3,当插入第四个元素的时候就会进入到阻塞等待的状态 3.向阻塞队列中拿出元素---take()会抛出InterruptedException异常,阻塞队列中专用的出队的方法,不能使用poll() public class Demo01_BlockingQueue { public static void main(String[] args) throws InterruptedException { //定义一个阻塞队列 BlockingQueue queue = new LinkedBlockingQueue(3); //使用put()方法,不能使用add()和offer() queue.put(1); queue.put(2); queue.put(3); System.out.println("此时插入了三个元素"); System.out.println(queue); // queue.put(4); // System.out.println("此时插入了四个元素"); //一定要使用take()方法,不能使用poll()方法 System.out.println(queue.take()); System.out.println(queue.take()); System.out.println(queue.take()); //进入阻塞等待状态 System.out.println(queue.take()); } }打印结果: 获取第四个元素的时候队列为空,因此进入阻塞等待的状态. 五.手动实现阻塞队列 1.普通队列的实现在实现阻塞队列之前,我们现在实现普通队列 public class MyBlockingQueue { private int[] element; //队首下标 private int head; //队尾下标 private int tail; //元素个数 private int size; public MyBlockingQueue() { this(3); } public MyBlockingQueue(int capacity) { element = new int[capacity]; } /** * 入队一个元素 * * @param val */ public void put(int val) { if (size >= element.length) { return; } //向队尾插入元素 element[tail] = val; //向后移动 tail = (tail + 1) % element.length; size++; } /** * 出队一个元素 * * @return */ public int take() { if (size == 0) { return -1; } int val = element[head]; head = (head + 1) % element.length; size--; return val; } } 2.堵塞队列的实现1.之前实现一个普通队列,底层用到了两种数据结构,一个是链表,一个是循环数组 2.阻塞队列就是在普通的队列上加入了阻塞等待(wait())和唤醒操作(notify()),与synchronized相关 确定synchronized的范围,如果一个对象需要new出来使用,锁对象一般是this,其他情况具体分析 1.(线程1)当执行入队(put)操作时,判断阻塞队列满了,执行wait()操作,进入阻塞等待操作,当之后(别的线程)执行了出队列操作完成时,队列此时不满,这个时候唤醒队列.(线程1)继续完成put操作 2.(线程1)当执行出队(take)操作时,判断阻塞队列为空,执行wait()操作,进入阻塞等待操作,当之后(别的线程)执行了入队列操作完成时,队列此时不空,这个时候唤醒队列.(线程1)继续完成take操作 public class MyBlockingQueue { private int[] element; //队首下标 private int head; //队尾下标 private int tail; //元素个数 private int size; public MyBlockingQueue() { this(3); } public MyBlockingQueue(int capacity) { element = new int[capacity]; } /** * 入队一个元素 * * @param val */ public void put(int val) throws InterruptedException { //加入修改的范围加锁 synchronized (this) { if (size >= element.length) { this.wait(); } //向队尾插入元素 element[tail] = val; //向后移动 tail = (tail + 1) % element.length; size++; //做唤醒操作 this.notifyAll(); } } /** * 出队一个元素 * * @return */ public int take() throws InterruptedException { synchronized (this) { if (size == 0) { this.wait(); } int val = element[head]; head = (head + 1) % element.length; size--; //唤醒操作 this.notifyAll(); return val; } } } 六.实现生产者和消费者模型我们使用两个线程,一个线程模仿生产者,向阻塞队列中放元素,一个线程模仿消费者,从阻塞队列中取元素. 1.消费速度大于生产速度 public class Demo3_ProducerConsumer { static MyBlockingQueue queue = new MyBlockingQueue(3); public static void main(String[] args) { Thread producer = new Thread(() -> { int num = 1; while (true) { //生产一条消息 try { queue.put(num); System.out.println("生产者生产了消息" + num); num++; TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); //启动生产者 producer.start(); Thread consumer = new Thread(() -> { while (true) { //消费一条消息 try { int num = queue.take(); System.out.println("消费者消费了消息" + num); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); consumer.start(); } }结果打印: 2.生产速度大于消费速度 public class Demo3_ProducerConsumer { static MyBlockingQueue queue = new MyBlockingQueue(10); public static void main(String[] args) throws InterruptedException { Thread producer = new Thread(() -> { int num = 1; while (true) { //生产一条消息 try { queue.put(num); System.out.println("生产者生产了消息" + num); num++; } catch (InterruptedException e) { throw new RuntimeException(e); } } }); //启动生产者 producer.start(); Thread consumer = new Thread(() -> { while (true) { //消费一条消息 try { int num = queue.take(); System.out.println("消费者消费了消息" + num); TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); //消费者后启动,让阻塞队列满 TimeUnit.SECONDS.sleep(2); consumer.start(); } }打印的结果: 3.虚假唤醒这是Java官方文档中给出虚假唤醒的定义 翻译成中文为: 线程也可以在没有被通知、中断或超时的情况下唤醒,这就是所谓的虚假唤醒。虽然这种情况在实践中很少发生,但应用程序必须通过测试本应导致线程被唤醒的条件,并在条件不满足时继续等待来防止这种情况的发生。换句话说,等待应该总是发生在循环中,就像下面这样: 所以在实践中wait条件的判断要加while循环. 阻塞队列中put和take方法的优化,并且多线程环境下共享变量要加voliatile,最终的阻塞队列的代码为: public class MyBlockingQueue { private volatile int[] element; //队首下标 private volatile int head; //队尾下标 private volatile int tail; //元素个数 private volatile int size=0; public MyBlockingQueue() { this(3); } public MyBlockingQueue(int capacity) { element = new int[capacity]; } /** * 入队一个元素 * * @param val */ public void put(int val) throws InterruptedException { //加入修改的范围加锁 synchronized (this) { while (size >= element.length) { this.wait(); } //向队尾插入元素 element[tail] = val; //向后移动 tail = (tail + 1) % element.length; size++; //做唤醒操作 this.notifyAll(); } } /** * 出队一个元素 * * @return */ public int take() throws InterruptedException { synchronized (this) { while (size == 0) { this.wait(); } int val = element[head]; head = (head + 1) % element.length; size--; //唤醒操作 this.notifyAll(); return val; } } } |
CopyRight 2018-2019 实验室设备网 版权所有 |