阻塞队列

当我们要创建ThreadPoolExecutor的时候需要传进来一个类型为BlockingQueue的参数,它就是阻塞队列,在这一篇文章里我们会介绍阻塞队列的定义、种类、实现原理以及应用。

阻塞队列,是一个队列,而在一个阻塞队列在数据结构中所起的作用大致如下图所示

阻塞队列

当阻塞队列是空时,从队列中获取元素的操作将会被阻塞

当阻塞队列时满时,往队列里添加元素的操作将会被阻塞

阻塞队列有什么好处

在多线程领域:所谓阻塞,在某些情况下,会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒

ArrayBlockingQueue:由数狙结构組成的有界阻塞队列。

LinkedBlockingQueue:由链表构狙成的有界(但大小默人値カInteger.MAX_ _VALUE)阻塞队列

SynchronousQueue:不存储元素的阻塞队列,也即単个元素的队列

PriorityBlockingQueue:支持优先级排序的无界阻塞叺列

DelayQueue:使用优先级队列实现的延迟无界阻塞臥列

linkedTransferQueue:由链表结构組成的无界阻塞队列

LinkedBlockingDeque:由链表结构組成的双向阻塞队列

阻塞队列提供了四种处理方法:

方法类型 抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e,time,unit)
移除 remove() poll() take() poll(time,unit)
检查 element() peek() 不可用 不可用
  • 抛出异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出 IllegalStateException(“Queue full”) 异常。当队列为空时,从队列里获取元素时会抛出 NoSuchElementException 异常 。
  • 返回特殊值:插入方法会返回是否成功,成功则返回 true。移除方法,则是从队列里拿出一个元素,如果没有则返回 null
  • 一直阻塞:当阻塞队列满时,如果生产者线程往队列里 put 元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里 take 元素,队列也会阻塞消费者线程,直到队列可用。
  • 超时退出:当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。

线程通信之生产者消费者阻塞队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class ProdConsumer_BlockQueue {
public static void main(String[] args) throws InterruptedException {
MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10));
new Thread(()->{
System.out.println(Thread.currentThread().getName() + "\t生产线程启动");
try {
myResource.myProd();

} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Prod").start();

new Thread(()->{
System.out.println(Thread.currentThread().getName() + "\t消费线程启动");
try {
myResource.myConsumer();

} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Consumer").start();

TimeUnit.SECONDS.sleep(5);

System.out.println(Thread.currentThread().getName() + "\t 主线程叫停FLAG=false,生产动作结束");
myResource.close();
}
}
class MyResource{
private volatile boolean FLAG = true;//默认开启,进行生产+消费
private AtomicInteger atomicInteger = new AtomicInteger();
BlockingQueue<String> blockingQueue = null;

public MyResource(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
System.out.println(blockingQueue.getClass().getName());
}

public void myProd() throws InterruptedException {
String data;
boolean retValue;
while(FLAG){
data = atomicInteger.incrementAndGet() + "";
retValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
if(retValue){
System.out.println(Thread.currentThread().getName() + "\t 插入队列"+data+"成功");
}else{
System.out.println(Thread.currentThread().getName() + "\t 插入队列"+data+"失败");
}
TimeUnit.SECONDS.sleep(1);
}
System.out.println(Thread.currentThread().getName() + "/ FLAG = false,生产结束");
}

public void myConsumer() throws InterruptedException{
String result;
while(FLAG){
result = blockingQueue.poll(2L, TimeUnit.SECONDS);
if(null == result || result.equalsIgnoreCase("")){
FLAG = false;
System.out.println(Thread.currentThread().getName() + "\t 超过2秒没有消费成功,消费退出");
return;
}
System.out.println(Thread.currentThread().getName() + "\t消费队列" + result + "成功");
}
}

public void close(){
FLAG =false;
}
}
// 输出
Prod 生产线程启动
Consumer 消费线程启动
Prod 插入队列1成功
Consumer 消费队列1成功
Consumer 消费队列2成功
Prod 插入队列2成功
Consumer 消费队列3成功
Prod 插入队列3成功
Prod 插入队列4成功
Consumer 消费队列4成功
Prod 插入队列5成功
Consumer 消费队列5成功
main 主线程叫停FLAG=false,生产动作结束
Prod FLAG = false,生产结束
Consumer 超过2秒没有消费成功,消费退出
0%