【Java】BlockingQueue
简介
BlockingQueue
也是java.util.concurrent下的主要用来控制线程同步的工具。
他是一个接口,提供以下方法:
public interface BlockingQueue<E> extends Queue<E> {
boolean add(E arg0);
boolean offer(E arg0);
void put(E arg0) throws InterruptedException;
boolean offer(E arg0, long arg1, TimeUnit arg3) throws InterruptedException;
E take() throws InterruptedException;
E poll(long arg0, TimeUnit arg2) throws InterruptedException;
int remainingCapacity();
boolean remove(Object arg0);
boolean contains(Object arg0);
int drainTo(Collection<? super E> arg0);
int drainTo(Collection<? super E> arg0, int arg1);
}
主要的方法是:put
、take
一对阻塞存取;add
、poll
一对非阻塞存取。
接口方法
插入
boolean add(anObject)
:把anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则抛出异常。boolean offer(anObject)
:表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false。void put(anObject)
:把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续,有阻塞,放不进去就等待。
读取
E poll(time)
:取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,再取不到时返回null。E take()
:取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的对象被加入为止。这种方式会一直阻塞,取不到就一直等。
其他
int remainingCapacity()
:返回队列剩余的容量,在队列插入和获取的时候,不要瞎搞。数据可能不准,不能保证数据的准确性。boolean remove(Object o)
:从队列移除元素,如果存在,即移除一个或者更多,队列改变了返回true。public boolean contains(Object o)
:查看队列是否存在这个元素,存在返回true。int drainTo(Collection<? super E> c)
:移除此队列中所有可用的元素,并将它们添加到给定collection中即取出所有个数放到集合中。int drainTo(Collection<? super E> c, int maxElements)
:和上面方法类似,区别在于,指定了移动的数量,即取出指定个数放到集合中。
ArrayBlockingQueue
一个由数组支持的有界阻塞队列,规定大小的BlockingQueue
。其构造函数必须带一个int
参数来指明其大小。其所含的对象是以FIFO
(先入先出)顺序排序的。
LinkedBlockingQueue
大小不定的BlockingQueue
,若其构造函数带一个规定大小的参数,生成的BlockingQueue
有大小限制;若不带大小参数,所生成的BlockingQueue
的大小由Integer.MAX_VALUE
来决定。其所含的对象是以FIFO
(先入先出)顺序排序的。
LinkedBlockingQueue
可以指定容量,也可以不指定,不指定的话,默认最大是Integer.MAX_VALUE
,其中主要用到put
和take
方法,put
方法在队列满的时候会阻塞直到有队列成员被消费,take
方法在队列空的时候会阻塞,直到有队列成员被放进来。
ArrayBlockingQueue和LinkedBlockingQueue区别
LinkedBlockingQueue
和ArrayBlockingQueue
比较起来,它们背后所用的数据结构不一样,导致LinkedBlockingQueue
的数据吞吐量要大于ArrayBlockingQueue
,但在线程数量很大时其性能的可预见性低于ArrayBlockingQueue
。
代码示例
生产者
import java.util.concurrent.BlockingQueue;
public class BlockingQueueProducer implements Runnable {
BlockingQueue<String> queue;
public BlockingQueueProducer() {
}
public BlockingQueueProducer(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
Thread currentThread = Thread.currentThread();
System.out.println("生产者线程:" + currentThread.getName() + "开始生产");
String temp = currentThread.getName();
try {
Thread.sleep(3 * 1000);
// 如果队列是满的话,会阻塞当前线程
queue.put(temp);
System.out.println("生产者线程:" + currentThread.getName() + "放入队列");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
消费者
import java.util.concurrent.BlockingQueue;
public class BlockingQueueConsumer implements Runnable {
BlockingQueue<String> queue;
public BlockingQueueConsumer() {
}
public BlockingQueueConsumer(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
Thread currentThread = Thread.currentThread();
System.out.println("消费者线程:" + currentThread.getName() + "开始消费");
String temp = null;
try {
Thread.sleep(5 * 1000);
// 如果队列为空,会阻塞当前线程
temp = queue.take();
System.out.println("消费者线程:" + currentThread.getName() + " get a product:" + temp);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
测试类
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueTest {
public static void main(String[] args) {
// 创建ArrayBlockingQueue
BlockingQueue<String> queue = new ArrayBlockingQueue<>(2);
// 创建LinkedBlockingQueue
//BlockingQueue<String> queue = new LinkedBlockingQueue<String>(2);
BlockingQueueProducer producer = new BlockingQueueProducer(queue);
BlockingQueueConsumer consumer = new BlockingQueueConsumer(queue);
for (int i = 0; i < 5; i++) {
Thread thread = new Thread(producer, "product" + (i + 1));
thread.start();
}
for (int i = 0; i < 5; i++) {
Thread thread = new Thread(consumer, "consumer" + (i + 1));
thread.start();
}
}
}
输出结果
生产者线程:product1开始生产
生产者线程:product2开始生产
生产者线程:product3开始生产
生产者线程:product4开始生产
生产者线程:product5开始生产
消费者线程:consumer3开始消费
消费者线程:consumer4开始消费
消费者线程:consumer2开始消费
消费者线程:consumer1开始消费
消费者线程:consumer5开始消费
生产者线程:product1放入队列
生产者线程:product3放入队列
消费者线程:consumer3 get a product:product1
生产者线程:product2放入队列
消费者线程:consumer2 get a product:product3
生产者线程:product4放入队列
消费者线程:consumer4 get a product:product2
生产者线程:product5放入队列
消费者线程:consumer1 get a product:product4
消费者线程:consumer5 get a product:product5
评论区