Java之阻塞队列和消息队列 您所在的位置:网站首页 java消息通知设计 Java之阻塞队列和消息队列

Java之阻塞队列和消息队列

2023-05-24 16:54| 来源: 网络整理| 查看: 265

目录

一.上节复习

1.什么是单列模式

2.饿汉模式

3.懒汉模式

二.阻塞队列

1.什么是阻塞队列

三.消息队列

1.什么是消息队列

2.消息队列的作用

1.解耦

 2.削峰填谷

3.异步

四.JDK中的阻塞队列

1.常见的阻塞队列

 2.向阻塞队列中放入元素---put()

3.向阻塞队列中拿出元素---take()

五.手动实现阻塞队列

1.普通队列的实现

2.堵塞队列的实现

六.实现生产者和消费者模型

1.消费速度大于生产速度

2.生产速度大于消费速度

3.虚假唤醒

一.上节复习

上节内容指路:Java之单例模式

1.什么是单列模式

单例模式是一种设计模式(设计模式:就是在特定的场景下,解决问题最优的方式,类似于棋谱),单例:顾名思义,全局只有一个实例对象

2.饿汉模式

饿汉模式:类加载的时候就完成初始化,DCL双重检查锁

public class Singleton { private static Singleton instance = new Singleton(); private Singleton() { } public static Singleton getInstance() { return instance; } }

 优点:书写简单,不容易出错

3.懒汉模式

懒汉模式:程序使用对象的时候才进行初始化

public class SingletonLazy { private static volatile SingletonLazy instance = null; private SingletonLazy() { } public static SingletonLazy getInstance() { if (instance == null) { //在获取单例对象的时候,判断是否已经被创建,没有创建则创建 synchronized (SingletonLazy.class) { if (instance == null) { instance = new SingletonLazy(); } } } return instance; } }

优点:避免了资源的浪费

二.阻塞队列 1.什么是阻塞队列

和之前学习过的队列一样,也是FIFO(先进先出).

入队元素时,先判断队列是否满了,如何满了就阻塞(等待),直到队列中有空余空间再入队.

出队元素时,先判断队列是否为空,如果空了就阻塞(等待),直到队列中有元素使再出队.

实例:包饺子:分为擀饺子皮和包饺子两个操作

当放饺子皮的盘子满了,擀饺子皮的人停止擀皮(等待)--入队列操作,等待有空间了再工作

当放饺子皮的盘子空了,包饺子的人就停止包饺子(等待)---出队列操作,等待有饺子皮再工作

这种模式叫做生产者消费者模型

三.消息队列 1.什么是消息队列

消息队列本质上就是阻塞队列,在此基础上为放入阻塞队列的消息打上了标签

为不同的消息进行了分组的操作

在基础数据结构上,做了一些针对应用场景的优化和实现,那么我们把这样的框架和软件,称为“中间件”

消息队列就是中间件

2.消息队列的作用 1.解耦

  一个良好的程序应该是"高内聚,低耦合"的.

  高内聚:将功能强相关的代码写在一块,方便维护

  低耦合:两个相关的模块尽可以能把依赖的部分降低到最小,不要让两个系统产生强依赖

实例:当我们订单支付,订单系统会对支付系统发起请求支付的指令,调用支付系统的相关命令,这样子就是一个高耦合的例子,如果支付系统崩溃,会直接对订单系统产生影响,支付系统修改相关的代码,订单系统也要发生修改

 这个时候我们维护一个消息队列,可以对两个系统进行解耦操作,即使支付系统崩溃了,订单系统也能进行正常的工作,代码修改也不会牵一发而动全身

 2.削峰填谷

峰和谷是指消息的密集程度

例如现实中的三峡大坝 

汛期:蓄水,防止下游遭遇洪峰的冲击   削峰

旱期:把存的水进行灌溉使用    填谷

例如在双十一等流量很大的时间点,我们使用消息队列可以存储订单的信息,然后慢慢的处理订单的信息,过了时间点流量慢慢小的时候,我们依旧可以处理订单信息.没必要在大流量的时候一下子处理所有的订单,这样可能会造成服务器崩溃.

3.异步

同步:请求方必须死等对方的响应才能开始下一步操作.

异步:请求方发出请求之后,可以进行其他的操作,没必须等待对方的响应才开始操作.

比如订单系统对支付系统发出请求之后,没必须死等支付系统成功的响应就才开始其他订单的操作,而是进行其他订单的操作,同时等待这个订单的相应结果.

四.JDK中的阻塞队列 1.常见的阻塞队列 LinkedBlockingQueue   基于链表ArrayBlockingQueue     基于数组PriorityBlockingQueue   基于优先级队列

在创建的时候可以指定队列的大小

 2.向阻塞队列中放入元素---put()

会抛出InterruptedException异常,阻塞队列中专用的入队的方法,不能使用offer()和add()

public class Demo01_BlockingQueue { public static void main(String[] args) throws InterruptedException { //定义一个阻塞队列 BlockingQueue queue=new LinkedBlockingQueue(3); //使用put()方法,不能使用add()和offer() queue.put(1); queue.put(2); queue.put(3); System.out.println("此时插入了三个元素"); System.out.println(queue); queue.put(4); System.out.println("此时插入了四个元素"); } }

打印的结果:

 因为我们指定的阻塞队列的大小为3,当插入第四个元素的时候就会进入到阻塞等待的状态

3.向阻塞队列中拿出元素---take()

会抛出InterruptedException异常,阻塞队列中专用的出队的方法,不能使用poll()

public class Demo01_BlockingQueue { public static void main(String[] args) throws InterruptedException { //定义一个阻塞队列 BlockingQueue queue = new LinkedBlockingQueue(3); //使用put()方法,不能使用add()和offer() queue.put(1); queue.put(2); queue.put(3); System.out.println("此时插入了三个元素"); System.out.println(queue); // queue.put(4); // System.out.println("此时插入了四个元素"); //一定要使用take()方法,不能使用poll()方法 System.out.println(queue.take()); System.out.println(queue.take()); System.out.println(queue.take()); //进入阻塞等待状态 System.out.println(queue.take()); } }

打印结果:

获取第四个元素的时候队列为空,因此进入阻塞等待的状态.

五.手动实现阻塞队列 1.普通队列的实现

在实现阻塞队列之前,我们现在实现普通队列

public class MyBlockingQueue { private int[] element; //队首下标 private int head; //队尾下标 private int tail; //元素个数 private int size; public MyBlockingQueue() { this(3); } public MyBlockingQueue(int capacity) { element = new int[capacity]; } /** * 入队一个元素 * * @param val */ public void put(int val) { if (size >= element.length) { return; } //向队尾插入元素 element[tail] = val; //向后移动 tail = (tail + 1) % element.length; size++; } /** * 出队一个元素 * * @return */ public int take() { if (size == 0) { return -1; } int val = element[head]; head = (head + 1) % element.length; size--; return val; } } 2.堵塞队列的实现

1.之前实现一个普通队列,底层用到了两种数据结构,一个是链表,一个是循环数组

2.阻塞队列就是在普通的队列上加入了阻塞等待(wait())和唤醒操作(notify()),与synchronized相关

确定synchronized的范围,如果一个对象需要new出来使用,锁对象一般是this,其他情况具体分析

1.(线程1)当执行入队(put)操作时,判断阻塞队列满了,执行wait()操作,进入阻塞等待操作,当之后(别的线程)执行了出队列操作完成时,队列此时不满,这个时候唤醒队列.(线程1)继续完成put操作

2.(线程1)当执行出队(take)操作时,判断阻塞队列为空,执行wait()操作,进入阻塞等待操作,当之后(别的线程)执行了入队列操作完成时,队列此时不空,这个时候唤醒队列.(线程1)继续完成take操作

public class MyBlockingQueue { private int[] element; //队首下标 private int head; //队尾下标 private int tail; //元素个数 private int size; public MyBlockingQueue() { this(3); } public MyBlockingQueue(int capacity) { element = new int[capacity]; } /** * 入队一个元素 * * @param val */ public void put(int val) throws InterruptedException { //加入修改的范围加锁 synchronized (this) { if (size >= element.length) { this.wait(); } //向队尾插入元素 element[tail] = val; //向后移动 tail = (tail + 1) % element.length; size++; //做唤醒操作 this.notifyAll(); } } /** * 出队一个元素 * * @return */ public int take() throws InterruptedException { synchronized (this) { if (size == 0) { this.wait(); } int val = element[head]; head = (head + 1) % element.length; size--; //唤醒操作 this.notifyAll(); return val; } } } 六.实现生产者和消费者模型

我们使用两个线程,一个线程模仿生产者,向阻塞队列中放元素,一个线程模仿消费者,从阻塞队列中取元素.

1.消费速度大于生产速度 public class Demo3_ProducerConsumer { static MyBlockingQueue queue = new MyBlockingQueue(3); public static void main(String[] args) { Thread producer = new Thread(() -> { int num = 1; while (true) { //生产一条消息 try { queue.put(num); System.out.println("生产者生产了消息" + num); num++; TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); //启动生产者 producer.start(); Thread consumer = new Thread(() -> { while (true) { //消费一条消息 try { int num = queue.take(); System.out.println("消费者消费了消息" + num); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); consumer.start(); } }

结果打印:

2.生产速度大于消费速度 public class Demo3_ProducerConsumer { static MyBlockingQueue queue = new MyBlockingQueue(10); public static void main(String[] args) throws InterruptedException { Thread producer = new Thread(() -> { int num = 1; while (true) { //生产一条消息 try { queue.put(num); System.out.println("生产者生产了消息" + num); num++; } catch (InterruptedException e) { throw new RuntimeException(e); } } }); //启动生产者 producer.start(); Thread consumer = new Thread(() -> { while (true) { //消费一条消息 try { int num = queue.take(); System.out.println("消费者消费了消息" + num); TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); //消费者后启动,让阻塞队列满 TimeUnit.SECONDS.sleep(2); consumer.start(); } }

打印的结果:

3.虚假唤醒

这是Java官方文档中给出虚假唤醒的定义

翻译成中文为: 线程也可以在没有被通知、中断或超时的情况下唤醒,这就是所谓的虚假唤醒。虽然这种情况在实践中很少发生,但应用程序必须通过测试本应导致线程被唤醒的条件,并在条件不满足时继续等待来防止这种情况的发生。换句话说,等待应该总是发生在循环中,就像下面这样:

所以在实践中wait条件的判断要加while循环.

阻塞队列中put和take方法的优化,并且多线程环境下共享变量要加voliatile,最终的阻塞队列的代码为:

public class MyBlockingQueue { private volatile int[] element; //队首下标 private volatile int head; //队尾下标 private volatile int tail; //元素个数 private volatile int size=0; public MyBlockingQueue() { this(3); } public MyBlockingQueue(int capacity) { element = new int[capacity]; } /** * 入队一个元素 * * @param val */ public void put(int val) throws InterruptedException { //加入修改的范围加锁 synchronized (this) { while (size >= element.length) { this.wait(); } //向队尾插入元素 element[tail] = val; //向后移动 tail = (tail + 1) % element.length; size++; //做唤醒操作 this.notifyAll(); } } /** * 出队一个元素 * * @return */ public int take() throws InterruptedException { synchronized (this) { while (size == 0) { this.wait(); } int val = element[head]; head = (head + 1) % element.length; size--; //唤醒操作 this.notifyAll(); return val; } } }



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有