阻塞队列

java.util.concurrent提供了很多对并发编程有用的工具类,其中BlockingQueue能够实现线程安全的阻塞队列,这篇笔记我们能简单介绍阻塞队列的使用。

阻塞队列的概念

BlockingQueue(阻塞队列)能够高效安全的实现多线程“生产者-消费者”逻辑,典型的例子能够实现这样的功能:一个线程中的生产者将对象放入队列,如果队列满了则阻塞,另一个线程的消费者将对象从队列中取出,如果队列为空则阻塞,生产者和消费者都可能有多个。常见的阻塞队列实现有如下几种:

ArrayBlockingQueue:基于定长数组的阻塞队列实现,创建时需要指定队列容量,运行时不支持扩容。此外,JDK的实现中在存入和取出数据时使用了同一个锁对象,因此存取数据实际上是不能并发执行的。

LinkedBlockingQueue:基于链表的阻塞队列,创建时支持设定容量,默认没有容量限制(即最大值为Integer.MAX_VALUE)。和ArrayBlockingQueue不同的是LinkedBlockingQueue中存取数据使用了不同的锁,因此存取数据可以并发执行;但链表相对于定长数据,插入和删除时会产生额外的开销,造成GC的压力。

DelayQueue:延迟队列,向队列存入的数据达到指定时间后才可以被取得,没有大小限制。

PriorityBlockingQueue:优先队列,支持根据数据的优先级排序取得数据。

LinkedBlockingDeque:基于链表的双端阻塞队列,可以从队列的两端插入或取出元素。

阻塞队列使用实例

下面代码使用阻塞队列,实现了一个消费者和两个生产者的多线程关系,生产者每1000ms产生一个对象存入队列中,消费者每300ms消费一个对象,总共启动了2个生产者和1个消费者:

Producer.java

package com.gacfox.demo;

import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable {
    private BlockingQueue<Integer> blockingQueue;

    public Producer(BlockingQueue<Integer> blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                Thread.sleep(1000);
                blockingQueue.put(1);
                System.out.println("producer:" + Thread.currentThread().getId() + " produce 1 data, queue size:" + blockingQueue.size());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Consumer.java

package com.gacfox.demo;

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {
    private BlockingQueue<Integer> blockingQueue;

    public Consumer(BlockingQueue<Integer> blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                Thread.sleep(300);
                blockingQueue.take();
                System.out.println("consumer:" + Thread.currentThread().getId() + " consume 1 data, queue size:" + blockingQueue.size());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Main.java

package com.gacfox.demo;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class Main {
    public static void main(String[] args) {
        BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(5);
        new Thread(new Producer(blockingQueue)).start();
        new Thread(new Producer(blockingQueue)).start();
        new Thread(new Consumer(blockingQueue)).start();
    }
}

代码中,我们创建了2个生产者和1个消费者,这里其实消费速度快于生产速度,我们可以从程序运行的输出发现这一点。

作者:Gacfox
版权声明:本网站为非盈利性质,文章如非特殊说明均为原创,版权遵循知识共享协议CC BY-NC-ND 4.0进行授权,转载必须署名,禁止用于商业目的或演绎修改后转载。
Copyright © 2017-2024 Gacfox All Rights Reserved.
Build with NextJS | Sitemap