谈到线程池,不得不谈到生产者-消费者模式,谈到生产者-消费者,就不得不谈到对应的数据结构,谈到对应的数据结构不得不言BlockingQueue。
顾名思义,BlockingQueue翻译为阻塞队列。队列无非两种操作:入队和出队。而针对于入队出队的边界值的不同,分为几个方法:
抛出异常
特殊值
阻塞
超时
插入
add(e)
offer(e)
put(e)
offer(e, time, unit)
移除
remove()
poll()
take()
poll(time, unit)
检查
element()
peek()
不可用
不可用
测试代码:
public class QueueTest {
public static void main(String args[]) {
final BlockingQueue queue = new ArrayBlockingQueue(5);
init(queue);
System.out.println("queue.size=" + queue.size() + ", top element:" + queue.element());
// queue.add("f"); //1. add方法:队满加入抛异常
/*boolean bool = queue.offer("f"); //2. offer方法,队满加入会返回:false
System.out.println("queue.size=" + queue.size() + ", 入队结果:" + bool);*/
/* try {
queue.put("f"); //3.put方法,队满put会阻塞,下边的systemout方法不会执行,直至有消费者take出去
} catch (InterruptedException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
System.out.println("queue.size=" + queue.size() + ", put结束:");*/
Thread thread1 = new Thread(new Runnable() {
public void run() {
boolean bool = false;
try {
bool = queue.offer("f", 5, TimeUnit.SECONDS); //5.offer带时间参数:当队满时,如果等待一定时间内还是满的就返回false,如果在这个期间队有空间了就可以放入一个元素,返回
System.out.println("queue.size=" + queue.size() + ", 入队结果:" + bool);
} catch (InterruptedException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
});
thread1.start();
//我们开一个消费者线程做出队操作,以便上述的offer可以正常加入
Thread thread = new Thread(new Runnable() {
public void run() {
while (queue.size() > 0) {
try {
String str = (String) queue.take();
System.out.println("queue.size=" + queue.size() + ", 出队结果:" + str);
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
}
});
thread.start();
}
private static void init(BlockingQueue queue) {
queue.add("a");
queue.add("b");
queue.add("c");
queue.add("d");
queue.add("e");
}
}
add:入队,如果队列满会抛出异常。
Exception in thread "main" java.lang.IllegalStateException: Queue full
at java.util.AbstractQueue.add(AbstractQueue.java:71)
at java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:209)
源码:
public boolean add(E e) {
if (offer(e))
return true; //如果加入成功,返回true
else
throw new IllegalStateException("Queue full"); //如果添加失败,抛异常
}
Offer:入队,如果队满会返回false
boolean bool = queue.offer("f"); //offer方法会返回:false
源码:
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false; //返回失败
else {
insert(e);
return true; //返回成功
}
} finally {
lock.unlock();
}
}
Put:入队,如果队满会阻塞
try {
queue.put("f");
} catch (InterruptedException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
System.out.println("queue.size=" + queue.size() + ", put结束:" );
源码:
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
final E[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (count == items.length)
notFull.await(); //阻塞在这儿
} catch (InterruptedException ie) {
notFull.signal(); // propagate to non-interrupted thread
throw ie;
}
insert(e);
} finally {
lock.unlock();
}
}
Offer:带时间的,会有一个缓冲时间,若超过此期间插入失败则失败。
queue.size=5, top element:a
queue.size=4, 出队结果:a
queue.size=5, 入队结果:true
queue.size=4, 出队结果:b
queue.size=3, 出队结果:c
queue.size=2, 出队结果:d
queue.size=1, 出队结果:e
queue.size=0, 出队结果:f
源码:
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) { //轮询
if (count != items.length) {
//如果有其他线程消费元素,队列不满了,才可以插入元素
insert(e);
return true;
}
if (nanos <= 0)//如果到了约定时间没有插入成功,返回false
return false;
try {
nanos = notFull.awaitNanos(nanos); //轮询减时间
} catch (InterruptedException ie) {
notFull.signal(); // propagate to non-interrupted thread
throw ie;
}
}
} finally {
lock.unlock();
}
}
出队的几种形式与入队对应,实例略。
从上述源码也可以看出,BlockingQueque还有几个特点:
1. 不接受null值。
2. 线程安全,方法都用了Lock
3. 最最精华的部分:生产者-消费者模型。
实例:
public class BlockingQueueTest {
private static AtomicInteger count = new AtomicInteger(0);
private static AtomicInteger countCreate = new AtomicInteger(0);
public static void main(String args[]) {
//定义一个阻塞队列,存放文件信息
BlockingQueue fileQueue = new ArrayBlockingQueue(5);
String path = "F:\\Song";
File root = new File(path);
//生产者线程去遍历文件,放入队列
FileCrawler fileCrawler = new FileCrawler(fileQueue, root);
//消费者线程去遍历队列,取出文件
Indexer indexer = new Indexer(fileQueue);
//开启几个生产者线程开始遍历文件
for (File file : root.listFiles()) {
new Thread(new FileCrawler(fileQueue, file)).start();
}
//开启7个消费者者线程开始取出文件
for (int i = 0; i < 7; i++) {
new Thread(new Indexer(fileQueue)).start();
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
System.out.println("生产者生产:" + countCreate.get());
System.out.println("消费者取到:" + count.get());
}
static class FileCrawler implements Runnable {
private final BlockingQueue fileQueue;
private final File root;
FileCrawler(BlockingQueue fileQueue, File root) {
this.fileQueue = fileQueue;
this.root = root;
}
public void run() {
try {
System.out.println("生产者开始生产:" + fileQueue.size());
crawl(root);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void crawl(File root) throws InterruptedException {
File[] files = root.listFiles();
if (files != null) {
for (File file : files) {
if (file.isDirectory()) {
crawl(file);
} else {
fileQueue.put(file); //put
countCreate.incrementAndGet();
}
}
}
}
}
static class Indexer implements Runnable {
private final BlockingQueue fileQueue;
Indexer(BlockingQueue fileQueue) {
this.fileQueue = fileQueue;
}
public void run() {
while (true) {
try {
System.out.println("消费者开始消费:" + fileQueue.size());
File file = (File) fileQueue.take(); //take
count.incrementAndGet();
System.out.println(file.getName());
} catch (InterruptedException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
}
}
}
三、阻塞队列的实现类
基本的任务排队方式有三种:
有界:ArrayBlockingQueue: 一个由数组支持的有界阻塞队列。
无界:LinkedBlockingQueue: 一个基于已链接节点的、范围任意的blocking queue。
同步移交:SynchronousQueue: 同步队列,put和take串行执行。生产者对其的插入操作必须等待消费者的移除操作,反之亦然。同步队列类似于信道,它非常适合传递性设计,在这种设计中,在一个线程中运行的对象要将某些信息、事件或任务传递给在另一个线程中运行的对象,它就必须与该对象同步。
synchronousQueue的思想:
参考:http://ifeve.com/java-synchronousqueue/
实例:
public class SynchroNousQueueTest {
public static void main(String args[]) {
// final SynchronousQueue synchronousQueue = new SynchronousQueue();
SynchroNousQueueTest synchroNousQueueTest = new SynchroNousQueueTest();
final MyShnchronouseQueue<String> synchronousQueue = synchroNousQueueTest.new MyShnchronouseQueue<String>();
//1。开启一个生产者线程
Thread threadPut = new Thread(new Runnable() {
public void run() {
try {
for (int i = 0; i < 10; i++) {
synchronousQueue.put(i + "");
System.out.println("synchronousQueue,insert element:" + i);
}
} catch (InterruptedException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
});
//2。开启一个消费者线程
Thread threadTask = new Thread(new Runnable() {
public void run() {
try {
for (int i = 0; i < 10; i++) {
synchronousQueue.take();
System.out.println("synchronousQueue,output element:" + i);
}
} catch (InterruptedException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
});
threadPut.start();
threadTask.start();
}
class MyShnchronouseQueue<E> {
Lock lock = new ReentrantLock();
Condition isFull = lock.newCondition();
Condition isEmpty = lock.newCondition();
boolean flag = false; //同步开关
E item = null; //只有一个元素
public void put(E e) throws InterruptedException {
lock.lock();
try {
while (flag) { // 当开关为true时,put阻塞,一直await
isEmpty.await();
}
//当开关为false之后,改为true,item设值,唤醒消费者消费
flag = true;
item = e;
isFull.signalAll();
} catch (Exception e1) {
e1.printStackTrace();
} finally {
lock.unlock();
}
}
public synchronized E take() throws InterruptedException {
lock.lock();
try {
while (!flag) { // 当开关为false时,take阻塞,一直await
isFull.await();
}
//当开关为true之后,改为false,获取item的值,唤醒生产者生产
flag = false;
E e = item;
item = null;
isEmpty.signalAll();
return e;
} catch (Exception e1) {
e1.printStackTrace();
} finally {
lock.unlock();
}
return null;
}
}
}
结果:
synchronousQueue,insert element:0
synchronousQueue,output element:0
synchronousQueue,output element:1
synchronousQueue,insert element:1
synchronousQueue,insert element:2
synchronousQueue,output element:2
synchronousQueue,insert element:3
synchronousQueue,output element:3
synchronousQueue,insert element:4
synchronousQueue,output element:4
synchronousQueue,insert element:5
synchronousQueue,output element:5
synchronousQueue,insert element:6
synchronousQueue,output element:6
synchronousQueue,insert element:7
synchronousQueue,output element:7
synchronousQueue,insert element:8
synchronousQueue,output element:8
synchronousQueue,insert element:9
synchronousQueue,output element:9
四、线程池的选择:
根据这些队列的不同特性,我们的线程池也定义了不同的类别:
单一线程池:可以看到corePoolSize=1
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
固定大小线程池:corePoolSize和maximumPoolSize固定,
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
无界线程池:maximumPoolSize为Integer.MAX_VALUE
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
前两者默认情况下将使用一个无界的LinkedBlockingQueue。如果所有工作者线程都处于忙碌状态,那么任务将在队列中等候。如果任务持续快速地到达,并且超过了线程池处理他们的速度,那么队列将无限制地增加。
一种更稳妥的资源管理策略是使用有界队列,例如ArrayBlockingQueue、有界的LinkedBlockingQueue、PriorityBlockingQueue。有界队列有助于避免资源耗尽的情况发生,但它又带来了新的问题:当队列填满后,新的任务该怎么办?(这就需要一些饱和策略)在使用有界队列工作时,队列的大小与线程池的大小必须一起调节。如果线程池较小而队列较大,那么有助于减少内存使用量,降低CPU的使用率,同时可以减少上下文切换,但付出的代价是可能会限制吞吐量。
对于非常大的或者无界的线程池,可以通过使用SynchronousQueue来避免任务排队,以及直接将任务从生产者移交给工作者线程。SynchronousQueue不是一个真正的队列,而是一种在线程之间进行移交的机制。要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接受这个元素。如果没有线程正在等待,并且线程池的当前大小小于最大值,那么ThreadPoolExecutor将创建一个新的线程,否则根据饱和策略,这个任务将被拒绝。使用直接移交将更高效,因为任务会直接移交给执行它的线程,而不是被首先放在队列中,然后由工作者线程从队列中提取该任务。只有当线程池是无界或者可以拒绝任务时,SynchronousQueue才有实际价值。
对于Executor,newCachedThreadPool工厂方法是一种很好的默认选择。它能提供比固定大小的线程池更好的排队性能。当需要限制当前任务的数量以满足资源管理需求时,那么可以选择固定大小的线程池,就像在接受网络客户请求的服务器应用程序中,如果不进行限制,那么狠容易发生过载问题。
从另一个维度来看:cpu密集型任务,由于cpu使用率一直很高,这时的线程不宜过多,建议配置尽可能小的线程,如配置Ncpu+1个线程的线程池。IO密集型任务由于线程并不是一直在执行任务,IO比较频繁,所以可以配置较多的线程,如2*Ncpu。
转载于:https://www.cnblogs.com/qi-dian-ao/p/8467963.html
相关资源:数据结构—成绩单生成器