侧边栏壁纸
博主头像
DJ's Blog博主等级

行动起来,活在当下

  • 累计撰写 133 篇文章
  • 累计创建 51 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

【Java】BlockingQueue

Administrator
2022-04-04 / 0 评论 / 0 点赞 / 113 阅读 / 6308 字

【Java】BlockingQueue

简介

BlockingQueue也是java.util.concurrent下的主要用来控制线程同步的工具。
他是一个接口,提供以下方法:

public interface BlockingQueue<E> extends Queue<E> {
	boolean add(E arg0);
	boolean offer(E arg0);
	void put(E arg0) throws InterruptedException;
	boolean offer(E arg0, long arg1, TimeUnit arg3) throws InterruptedException;
	E take() throws InterruptedException;
	E poll(long arg0, TimeUnit arg2) throws InterruptedException;
	int remainingCapacity();
	boolean remove(Object arg0);
	boolean contains(Object arg0);
	int drainTo(Collection<? super E> arg0);
	int drainTo(Collection<? super E> arg0, int arg1);
}

主要的方法是:puttake一对阻塞存取;addpoll一对非阻塞存取。

接口方法

插入

  1. boolean add(anObject):把anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则抛出异常。
  2. boolean offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false。
  3. void put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续,有阻塞,放不进去就等待。

读取

  1. E poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,再取不到时返回null。
  2. E take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的对象被加入为止。这种方式会一直阻塞,取不到就一直等。

其他

  1. int remainingCapacity():返回队列剩余的容量,在队列插入和获取的时候,不要瞎搞。数据可能不准,不能保证数据的准确性。
  2. boolean remove(Object o):从队列移除元素,如果存在,即移除一个或者更多,队列改变了返回true。
  3. public boolean contains(Object o):查看队列是否存在这个元素,存在返回true。
  4. int drainTo(Collection<? super E> c):移除此队列中所有可用的元素,并将它们添加到给定collection中即取出所有个数放到集合中。
  5. int drainTo(Collection<? super E> c, int maxElements):和上面方法类似,区别在于,指定了移动的数量,即取出指定个数放到集合中。

ArrayBlockingQueue

一个由数组支持的有界阻塞队列,规定大小的BlockingQueue。其构造函数必须带一个int参数来指明其大小。其所含的对象是以FIFO(先入先出)顺序排序的。

LinkedBlockingQueue

大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制;若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定。其所含的对象是以FIFO(先入先出)顺序排序的。
LinkedBlockingQueue可以指定容量,也可以不指定,不指定的话,默认最大是Integer.MAX_VALUE,其中主要用到puttake方法,put方法在队列满的时候会阻塞直到有队列成员被消费,take方法在队列空的时候会阻塞,直到有队列成员被放进来。

ArrayBlockingQueue和LinkedBlockingQueue区别

LinkedBlockingQueueArrayBlockingQueue比较起来,它们背后所用的数据结构不一样,导致LinkedBlockingQueue的数据吞吐量要大于ArrayBlockingQueue,但在线程数量很大时其性能的可预见性低于ArrayBlockingQueue

代码示例

生产者

import java.util.concurrent.BlockingQueue;
public class BlockingQueueProducer implements Runnable {
    BlockingQueue<String> queue;
    public BlockingQueueProducer() {
    }
    public BlockingQueueProducer(BlockingQueue<String> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        Thread currentThread = Thread.currentThread();
        System.out.println("生产者线程:" + currentThread.getName() + "开始生产");
        String temp = currentThread.getName();
        try {
            Thread.sleep(3 * 1000);
            // 如果队列是满的话,会阻塞当前线程
            queue.put(temp);
            System.out.println("生产者线程:" + currentThread.getName() + "放入队列");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

消费者

import java.util.concurrent.BlockingQueue;
public class BlockingQueueConsumer implements Runnable {
    BlockingQueue<String> queue;
    public BlockingQueueConsumer() {
    }
    public BlockingQueueConsumer(BlockingQueue<String> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        Thread currentThread = Thread.currentThread();
        System.out.println("消费者线程:" + currentThread.getName() + "开始消费");
        String temp = null;
        try {
            Thread.sleep(5 * 1000);
            // 如果队列为空,会阻塞当前线程
            temp = queue.take();
            System.out.println("消费者线程:" + currentThread.getName() + " get a product:" + temp);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

测试类

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueTest {
    public static void main(String[] args) {
        // 创建ArrayBlockingQueue
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(2);
        // 创建LinkedBlockingQueue
        //BlockingQueue<String> queue = new LinkedBlockingQueue<String>(2);
        BlockingQueueProducer producer = new BlockingQueueProducer(queue);
        BlockingQueueConsumer consumer = new BlockingQueueConsumer(queue);
        for (int i = 0; i < 5; i++) {
            Thread thread = new Thread(producer, "product" + (i + 1));
            thread.start();
        }
        for (int i = 0; i < 5; i++) {
            Thread thread = new Thread(consumer, "consumer" + (i + 1));
            thread.start();
        }
    }
}

输出结果

生产者线程:product1开始生产
生产者线程:product2开始生产
生产者线程:product3开始生产
生产者线程:product4开始生产
生产者线程:product5开始生产
消费者线程:consumer3开始消费
消费者线程:consumer4开始消费
消费者线程:consumer2开始消费
消费者线程:consumer1开始消费
消费者线程:consumer5开始消费
生产者线程:product1放入队列
生产者线程:product3放入队列
消费者线程:consumer3 get a product:product1
生产者线程:product2放入队列
消费者线程:consumer2 get a product:product3
生产者线程:product4放入队列
消费者线程:consumer4 get a product:product2
生产者线程:product5放入队列
消费者线程:consumer1 get a product:product4
消费者线程:consumer5 get a product:product5
0

评论区