阻塞式队列的实现方式

首先,阻塞队列( BlockingQueue )是 Java util.concurrent 包下重要的数据结构。当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空。本文与 java 官方的阻塞式队列不同,自己以另一种方式(线程 wait / notifyAll )实现(原理相似),本案 demo 遵循 FIFO (先进先出)原则,只写出了 put (阻塞)/ take (阻塞)方法,官方实现还含有 add / remove (失败抛出异常)、offer / poll (失败时返回值不同)、offer / poll (重载 增加 timeout 阻塞时间限制,并返回值是否成功)在这里都没有写出。除此之外 lock / condition 方式实现也附在文中最下方以供对比。本案 demo 使用 Android UI 但是只需懂得 java 基本语法,不会影响理解。

相关应用:生产者消费者模式

为能够解决绝大多数并发问题,该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。当然,这不是这次要讲的重点,如果有兴趣可以去了解阻塞式队列处理高并发问题。

注意:

在等待 Condition 时,允许发生“虚假唤醒”,这通常作为对基础平台语义的让步。对于大多数应用程序,这带来的实际影响很小,因为 Condition 应该总是在一个循环中被等待,并测试正被等待的状态声明。某个实现可以随意移除可能的虚假唤醒,但建议应用程序程序员总是假定这些虚假唤醒可能发生,因此总是在一个循环中(如 while )等待。
三种形式的条件等待(可中断、不可中断和超时)在一些平台上的实现以及它们的性能特征可能会有所不同。尤其是它可能很难提供这些特性和维护特定语义,比如排序保证。更进一步地说,中断线程实际挂起的能力在所有平台上并不是总是可行的。因此,并不要求某个实现为所有三种形式的等待定义完全相同的保证或语义,也不要求其支持中断线程的实际挂起。要求实现清楚地记录每个等待方法提供的语义和保证,在某个实现不支持中断线程的挂起时,它必须遵从此接口中定义的中断语义。由于中断通常意味着取消,而又通常很少进行中断检查,因此实现可以先于普通方法的返回来对中断进行响应。即使出现在另一个操作后的中断可能会释放线程锁时也是如此。实现应记录此行为。

demo 代码如下:

package com.example.test.test;

import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
import android.view.View;
import android.widget.TextView;

import java.util.LinkedList;
import java.util.Queue;

public class QueueActivity extends AppCompatActivity implements View.OnClickListener {
    private static final String TAG = "QueueActivity";

    public int limit = 5; // 队列最大元素数

    public Queue<Object> queue = new LinkedList<>();

    private int putSleepCount = 0; // 入队 阻塞数量统计
    private int takeSleepCount = 0; // 出队 阻塞数量统计

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_queue);

        findViewById(R.id.bt_put).setOnClickListener(this);
        findViewById(R.id.bt_take).setOnClickListener(this);
    }

    private int put = -1; // 模拟每次入队的元素 0、1、2(依次增长)

    @Override
    public void onClick(View view) {
        int id = view.getId();
        if (id == R.id.bt_put) {
            put++;
            Log.e(TAG, "put: " + put);
            new Thread() {
                @Override
                public void run() {
                    put(put);
                }
            }.start();
            return;
        }
        if (id == R.id.bt_take) {
            new Thread() {
                @Override
                public void run() {
                    Object take = take();
                    Log.e(TAG, "take: " + take);
                }
            }.start();
        }
    }

    /**
     * 入队 如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行
     */
    private synchronized void put(Object obj) {
        while (queue.size() == limit) {
            try {
                putSleepCount++;
                Log.e(TAG, "putSleepCount: " + putSleepCount + "");
                wait();
                putSleepCount--;
                Log.e(TAG, "putSleepCount: " + putSleepCount + "");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        queue.add(obj);
        //如果添加一个元素后集合大小为1则应该唤醒阻塞的线程是否需要该元素出队(如果有阻塞)
        if (queue.size() == 1) {
            notifyAll();
        }
    }

    /**
     * 出队 如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行
     */
    private synchronized Object take() {
        while (queue.size() == 0) {
            try {
                takeSleepCount++;
                Log.e(TAG, "takeSleepCount: " + takeSleepCount + "");
                wait();
                takeSleepCount--;
                Log.e(TAG, "takeSleepCount: " + takeSleepCount + "");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        try {
            return queue.remove();
        } finally {
            //如果取出一个元素后集合大小=limit-1则应该唤醒阻塞的线程入队(如果有阻塞)
            if (queue.size() == limit - 1) {
                notifyAll();
            }
        }
    }
}

测试一: take 阻塞

(调用一次 take 方法)
12-11 21:05:26.745 18459-15228/com.example.test.test E/QueueActivity: takeSleepCount: 1
因为队列为空 所以调用被阻塞 线程 wait
(接着调用一次 put 方法)
12-11 21:06:30.945 18459-18459/com.example.test.test E/QueueActivity: put: 0
12-11 21:06:30.945 18459-15228/com.example.test.test E/QueueActivity: takeSleepCount: 0
12-11 21:06:30.945 18459-15228/com.example.test.test E/QueueActivity: take: 0
从中可以看出 put 添加一个元素时会紧接着被取出
在多次阻塞 take 后 调用 put 仍可以有序取出元素 Log太多就不放了,可以自行测试

测试二: put 阻塞

(调用六次 put 方法) (demo limit值为5)
12-11 21:09:21.205 18459-18459/com.example.test.test E/QueueActivity: put: 0
12-11 21:09:23.115 18459-18459/com.example.test.test E/QueueActivity: put: 1
12-11 21:09:23.625 18459-18459/com.example.test.test E/QueueActivity: put: 2
12-11 21:09:24.085 18459-18459/com.example.test.test E/QueueActivity: put: 3
12-11 21:09:24.475 18459-18459/com.example.test.test E/QueueActivity: put: 4
12-11 21:09:24.835 18459-18459/com.example.test.test E/QueueActivity: put: 5
12-11 21:09:24.835 18459-18855/com.example.test.test E/QueueActivity: putSleepCount: 1    
(接着调用一次 take 方法)
12-11 21:10:07.265 18459-18855/com.example.test.test E/QueueActivity: putSleepCount: 0
12-11 21:10:07.265 18459-19469/com.example.test.test E/QueueActivity: take: 0
从中可以看出 在达到 limit 值后 put 操作会被阻塞,而之后的 take 操作会唤醒所有的阻塞线程再次判断并加入队列

附:
使用 lock / condition 实现部分代码如下(JDK 1.7 ArrayBlockingQueue):

//构造方法
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
//创建数组
this.items = new Object[capacity];
//创建锁和阻塞条件
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
//添加元素的方法
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
//如果队列不满就入队
enqueue(e);
} finally {
lock.unlock();
}
}
//入队的方法
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
//移除元素的方法
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
//出队的方法
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings(“unchecked”)
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count–;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;

这样做的优点是调用 notEmpty.signalAll()只会唤醒 notEmpty.await()下的线程,notFull 同理,但是通常不可能同时存在两种条件(入队和出队同时)阻塞。在 Condition 中,用 await()替换 wait(),用 signal()替换 notify(),用 signalAll()替换 notifyAll(),这里注意,Condition 是被绑定到 Lock 上的,要创建一个 Lock 的 Condition 必须用 newCondition()方法。
当然本文并不是为了比较两种实现方法的优劣,而且只要理解第一种就很容易理解第二种,本文只是通过 demo 让大家理解阻塞式队列的原理从而知道为什么使用它,或能对阻塞式队列多一点了解。