Java中的阻塞队列

来自Wikioe
Eijux讨论 | 贡献2022年11月30日 (三) 11:14的版本 →‎阻塞队列
(差异) ←上一版本 | 最后版本 (差异) | 下一版本→ (差异)
跳到导航 跳到搜索



关于阻塞队列

Queue

Queue 接口与List、Set同一级别,都是继承了Collection接口。Queue的使用主要由两方面:

  1. 非阻塞队列:PriorityQueue、LinkedList(LinkedList是双向链表,它实现了Dequeue接口);
  2. 阻塞队列

阻塞队列

阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞

使用阻塞队列的优点:在面对类似消费者-生产者的模型时,对当前线程产生阻塞,不必额外地实现同步策略以及线程间唤醒策略,这样提供了极大的方便性。

比如一个线程从一个空的阻塞队列中取元素,此时线程会被阻塞直到阻塞队列中有了元素。当队列中有元素后,被阻塞的线程会自动被唤醒(不需要我们编写代码去唤醒)。

几种主要的阻塞队列

Java 1.5之后,在java.util.concurrent包下提供了若干个阻塞队列,主要有以下几个:

  1. ArrayBlockingQueue:基于数组实现的一个阻塞队列,在创建对象时必须制定容量大小。
    并且可以指定公平性与非公平性(默认情况下为非公平),即不保证等待时间最长的队列最优先能够访问队列。
  2. LinkedBlockingQueue:基于链表实现的一个阻塞队列,创建对象时如果不指定容量大小,则默认大小为“Integer.MAX_VALUE”。
  3. PriorityBlockingQueue:按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素。
    注意,此阻塞队列为无界阻塞队列,即容量没有上限(通过源码就可以知道,它没有容器满的信号标志),前面2种都是有界队列。
  4. DelayQueue:基于PriorityQueue,一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。
    DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

阻塞队列 与 非阻塞队列

非阻塞队列的主要方法

  1. add(E e) : 将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满),则会抛出异常;
  2. offer(E e) :将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满),则返回false;
  3. remove() :移除队首元素,若移除成功,则返回true;如果移除失败(队列为空),则会抛出异常;
  4. poll() :移除并获取队首元素,若成功,则返回队首元素;否则返回null;
  5. peek() :获取队首元素,若成功,则返回队首元素;否则返回null

阻塞队列的主要方法

阻塞队列包括了以上非阻塞队列中方法(在阻塞队列中都进行了同步措施),并提供了另外的方法:

  1. put(E e) : 用来向队尾存入元素,如果队列满,则等待;
  2. take() : 用来从队首取元素,如果队列为空,则等待;
  3. offer(E e,long timeout, TimeUnit unit) : 用来向队尾存入元素,如果队列满,则等待一定的时间,当时间期限达到时,如果还没有插入成功,则返回false;否则返回true;
  4. poll(long timeout, TimeUnit unit) : 用来从队首取元素,如果队列空,则等待一定的时间,当时间期限达到时,如果取到,则返回null;否则返回取得的元素;

阻塞队列的实现原理

阻塞队列中,使用通知模式实现生产者和消费者之间高效率的进行通讯。
“所谓通知模式,就是当生产者往满的队列里添加元素时会阻塞住生产者,当消费者消费了一个队列中的元素后,会通知生产者当前队列可用。”


通过查看JDK源码发现ArrayBlockingQueue使用了Condition来实现:

 /** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

public ArrayBlockingQueue(int capacity, boolean fair,
                         Collection<? extends E> c) {
    this(capacity, fair);

    final ReentrantLock lock = this.lock;
    lock.lock(); // Lock only for visibility, not mutual exclusion
    try {
        int i = 0;
        try {
            for (E e : c) {
                checkNotNull(e);
                items[i++] = e;
            }
        } catch (ArrayIndexOutOfBoundsException ex) {
            throw new IllegalArgumentException();
        }
        count = i;
        putIndex = (i == capacity) ? 0 : i;
    } finally {
        lock.unlock();
    }
}