java.util.concurrent
提供了很多对并发编程有用的工具类,其中BlockingQueue
能够实现线程安全的阻塞队列,这篇笔记我们能简单介绍阻塞队列的使用。
BlockingQueue(阻塞队列)能够高效安全的实现多线程“生产者-消费者”逻辑,典型的例子能够实现这样的功能:一个线程中的生产者将对象放入队列,如果队列满了则阻塞,另一个线程的消费者将对象从队列中取出,如果队列为空则阻塞,生产者和消费者都可能有多个。常见的阻塞队列实现有如下几种:
ArrayBlockingQueue:基于定长数组的阻塞队列实现,创建时需要指定队列容量,运行时不支持扩容。此外,JDK的实现中在存入和取出数据时使用了同一个锁对象,因此存取数据实际上是不能并发执行的。
LinkedBlockingQueue:基于链表的阻塞队列,创建时支持设定容量,默认没有容量限制(即最大值为Integer.MAX_VALUE
)。和ArrayBlockingQueue
不同的是LinkedBlockingQueue
中存取数据使用了不同的锁,因此存取数据可以并发执行;但链表相对于定长数据,插入和删除时会产生额外的开销,造成GC的压力。
DelayQueue:延迟队列,向队列存入的数据达到指定时间后才可以被取得,没有大小限制。
PriorityBlockingQueue:优先队列,支持根据数据的优先级排序取得数据。
LinkedBlockingDeque:基于链表的双端阻塞队列,可以从队列的两端插入或取出元素。
下面代码使用阻塞队列,实现了一个消费者和两个生产者的多线程关系,生产者每1000ms产生一个对象存入队列中,消费者每300ms消费一个对象,总共启动了2个生产者和1个消费者:
Producer.java
package com.gacfox.demo;
import java.util.concurrent.BlockingQueue;
public class Producer implements Runnable {
private BlockingQueue<Integer> blockingQueue;
public Producer(BlockingQueue<Integer> blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
try {
while (true) {
Thread.sleep(1000);
blockingQueue.put(1);
System.out.println("producer:" + Thread.currentThread().getId() + " produce 1 data, queue size:" + blockingQueue.size());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Consumer.java
package com.gacfox.demo;
import java.util.concurrent.BlockingQueue;
public class Consumer implements Runnable {
private BlockingQueue<Integer> blockingQueue;
public Consumer(BlockingQueue<Integer> blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
try {
while (true) {
Thread.sleep(300);
blockingQueue.take();
System.out.println("consumer:" + Thread.currentThread().getId() + " consume 1 data, queue size:" + blockingQueue.size());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Main.java
package com.gacfox.demo;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class Main {
public static void main(String[] args) {
BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(5);
new Thread(new Producer(blockingQueue)).start();
new Thread(new Producer(blockingQueue)).start();
new Thread(new Consumer(blockingQueue)).start();
}
}
代码中,我们创建了2个生产者和1个消费者,这里其实消费速度快于生产速度,我们可以从程序运行的输出发现这一点。