全网整合营销服务商

电脑端+手机端+微信端=数据同步管理

免费咨询热线:400-708-3566

详解Java阻塞队列(BlockingQueue)的实现原理

阻塞队列 (BlockingQueue)是Java util.concurrent包下重要的数据结构,BlockingQueue提供了线程安全的队列访问方式:当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空。并发包下很多高级同步类的实现都是基于BlockingQueue实现的。

BlockingQueue 的操作方法

BlockingQueue 具有 4 组不同的方法用于插入、移除以及对队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:

四组不同的行为方式解释:

  1. 抛异常:如果试图的操作无法立即执行,抛一个异常。
  2. 特定值:如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。
  3. 阻塞:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
  4. 超时:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是true / false)。

无法向一个 BlockingQueue 中插入 null。如果你试图插入 null,BlockingQueue 将会抛出一个 NullPointerException。

可以访问到 BlockingQueue 中的所有元素,而不仅仅是开始和结束的元素。比如说,你将一个对象放入队列之中以等待处理,但你的应用想要将其取消掉。那么你可以调用诸如 remove(o) 方法来将队列之中的特定对象进行移除。但是这么干效率并不高(译者注:基于队列的数据结构,获取除开始或结束位置的其他对象的效率不会太高),因此你尽量不要用这一类的方法,除非你确实不得不那么做。

BlockingQueue 的实现类

BlockingQueue 是个接口,你需要使用它的实现之一来使用BlockingQueue,Java.util.concurrent包下具有以下 BlockingQueue 接口的实现类:

  1. ArrayBlockingQueue:ArrayBlockingQueue 是一个有界的阻塞队列,其内部实现是将对象放到一个数组里。有界也就意味着,它不能够存储无限多数量的元素。它有一个同一时间能够存储元素数量的上限。你可以在对其初始化的时候设定这个上限,但之后就无法对这个上限进行修改了(译者注:因为它是基于数组实现的,也就具有数组的特性:一旦初始化,大小就无法修改)。
  2. DelayQueue:DelayQueue 对元素进行持有直到一个特定的延迟到期。注入其中的元素必须实现 java.util.concurrent.Delayed 接口。
  3. LinkedBlockingQueue:LinkedBlockingQueue 内部以一个链式结构(链接节点)对其元素进行存储。如果需要的话,这一链式结构可以选择一个上限。如果没有定义上限,将使用 Integer.MAX_VALUE 作为上限。
  4. PriorityBlockingQueue:PriorityBlockingQueue 是一个无界的并发队列。它使用了和类 java.util.PriorityQueue 一样的排序规则。你无法向这个队列中插入 null 值。所有插入到 PriorityBlockingQueue 的元素必须实现 java.lang.Comparable 接口。因此该队列中元素的排序就取决于你自己的 Comparable 实现。
  5. SynchronousQueue:SynchronousQueue 是一个特殊的队列,它的内部同时只能够容纳单个元素。如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。据此,把这个类称作一个队列显然是夸大其词了。它更多像是一个汇合点。

使用例子:

阻塞队列的最长使用的例子就是生产者消费者模式,也是各种实现生产者消费者模式方式中首选的方式。使用者不用关心什么阻塞生产,什么时候阻塞消费,使用非常方便,代码如下:

package MyThread;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class BlockingQueueTest {
  //生产者
  public static class Producer implements Runnable{
    private final BlockingQueue<Integer> blockingQueue;
    private volatile boolean flag;
    private Random random;

    public Producer(BlockingQueue<Integer> blockingQueue) {
      this.blockingQueue = blockingQueue;
      flag=false;
      random=new Random();

    }
    public void run() {
      while(!flag){
        int info=random.nextInt(100);
        try {
          blockingQueue.put(info);
          System.out.println(Thread.currentThread().getName()+" produce "+info);
          Thread.sleep(50);
        } catch (InterruptedException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        }        
      }
    }
    public void shutDown(){
      flag=true;
    }
  }
  //消费者
  public static class Consumer implements Runnable{
    private final BlockingQueue<Integer> blockingQueue;
    private volatile boolean flag;
    public Consumer(BlockingQueue<Integer> blockingQueue) {
      this.blockingQueue = blockingQueue;
    }
    public void run() {
      while(!flag){
        int info;
        try {
          info = blockingQueue.take();
          System.out.println(Thread.currentThread().getName()+" consumer "+info);
          Thread.sleep(50);
        } catch (InterruptedException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        }        
      }
    }
    public void shutDown(){
      flag=true;
    }
  }
  public static void main(String[] args){
    BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<Integer>(10);
    Producer producer=new Producer(blockingQueue);
    Consumer consumer=new Consumer(blockingQueue);
    //创建5个生产者,5个消费者
    for(int i=0;i<10;i++){
      if(i<5){
        new Thread(producer,"producer"+i).start();
      }else{
        new Thread(consumer,"consumer"+(i-5)).start();
      }
    }

    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
    producer.shutDown();
    consumer.shutDown();

  }
}

阻塞队列原理:

其实阻塞队列实现阻塞同步的方式很简单,使用的就是是lock锁的多条件(condition)阻塞控制。使用BlockingQueue封装了根据条件阻塞线程的过程,而我们就不用关心繁琐的await/signal操作了。

下面是Jdk 1.7中ArrayBlockingQueue部分代码:

public ArrayBlockingQueue(int capacity, boolean fair) {

    if (capacity <= 0)
      throw new IllegalArgumentException();
    //创建数组  
    this.items = new Object[capacity];
    //创建锁和阻塞条件
    lock = new ReentrantLock(fair);  
    notEmpty = lock.newCondition();
    notFull = lock.newCondition();
  }
//添加元素的方法
public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
      while (count == items.length)
        notFull.await();
      //如果队列不满就入队
      enqueue(e);
    } finally {
      lock.unlock();
    }
  }
 //入队的方法
 private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
      putIndex = 0;
    count++;
    notEmpty.signal();
  }
 //移除元素的方法
 public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
      while (count == 0)
        notEmpty.await();
      return dequeue();
    } finally {
      lock.unlock();
    }
  }
 //出队的方法
 private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
      takeIndex = 0;
    count--;
    if (itrs != null)
      itrs.elementDequeued();
    notFull.signal();
    return x;

双端阻塞队列(BlockingDeque)

concurrent包下还提供双端阻塞队列(BlockingDeque),和BlockingQueue是类似的,只不过BlockingDeque提供从任意一端插入或者抽取元素的队列。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。


# Java阻塞队列  # java阻塞队列原理  # java实现阻塞队列  # 剖析Java中阻塞队列的实现原理及应用场景  # Java 阻塞队列详解及简单使用  # 详解Java中的阻塞队列  # Java并发编程之阻塞队列详解  # Java中使用阻塞队列控制线程集实例  # java阻塞队列实现原理及实例解析  # JavaSE多线程阻塞队列实现代码  # 将会  # 是一个  # 链式  # 移除  # 你可以  # 也就  # 数据结构  # 对其  # 定值  # 自己的  # 的是  # 都是  # 这一  # 是个  # 如果你  # 夸大其词  # 什么时候  # 已有  # 将其  # 而不 


相关文章: c++ stringstream用法详解_c++字符串与数字转换利器  怎么将XML数据可视化 D3.js加载XML  建站VPS选购需注意哪些关键参数?  宁波自助建站系统如何快速打造专业企业网站?  如何在万网开始建站?分步指南解析  杭州银行网站设计制作流程,杭州银行怎么开通认证方式?  家具网站制作软件,家具厂怎么跑业务?  SAX解析器是什么,它与DOM在处理大型XML文件时有何不同?  建站之星手机一键生成:多端自适应+小程序开发快速建站指南  免费视频制作网站,更新又快又好的免费电影网站?  如何选择适配移动端的WAP自助建站平台?  网站海报制作教学视频教程,有什么免费的高清可商用图片网站,用于海报设计?  在线ppt制作网站有哪些软件,如何把网页的内容做成ppt?  实现点击下箭头变上箭头来回切换的两种方法【推荐】  如何用花生壳三步快速搭建专属网站?  一键网站制作软件,义乌购一件代发流程?  微信小程序 input输入框控件详解及实例(多种示例)  香港服务器网站卡顿?如何解决网络延迟与负载问题?  广州网站制作的公司,现在专门做网站的公司有没有哪几家是比较好的,性价比高,模板也多的?  建站VPS能否同时实现高效与安全翻墙?  网站制作难吗安全吗,做一个网站需要多久时间?  手机怎么制作网站教程步骤,手机怎么做自己的网页链接?  南平网站制作公司,2025年南平市事业单位报名时间?  如何选择PHP开源工具快速搭建网站?  建站之星安装失败:服务器环境不兼容?  早安海报制作网站推荐大全,企业早安海报怎么每天更换?  建站ABC备案流程中有哪些关键注意事项?  如何在Golang中处理模块冲突_解决依赖版本不兼容问题  存储型VPS适合搭建中小型网站吗?  建站之星如何快速更换网站模板?  如何用IIS7快速搭建并优化网站站点?  电影网站制作价格表,那些提供免费电影的网站,他们是怎么盈利的?  网站制作哪家好,cc、.co、.cm哪个域名更适合做网站?  网站规划与制作是什么,电子商务网站系统规划的内容及步骤是什么?  ,南京靠谱的征婚网站?  安云自助建站系统如何快速提升SEO排名?  东莞专业网站制作公司有哪些,东莞招聘网站哪个好?  nginx修改上传文件大小限制的方法  专业型网站制作公司有哪些,我设计专业的,谁给推荐几个设计师兼职类的网站?  公司网站设计制作厂家,怎么创建自己的一个网站?  制作ppt免费网站有哪些,有哪些比较好的ppt模板下载网站?  如何选择美橙互联多站合一建站方案?  如何在香港免费服务器上快速搭建网站?  网站制作公司排行榜,抖音怎样做个人官方网站  网站建设制作、微信公众号,公明人民医院怎么在网上预约?  小建面朝正北,A点实际方位是否存在偏差?  如何设计高效校园网站?  如何快速搭建安全的FTP站点?  头像制作网站在线制作软件,dw网页背景图像怎么设置?  建站之星代理商如何保障技术支持与售后服务? 

您的项目需求

*请认真填写需求信息,我们会在24小时内与您取得联系。