BlockingQueue和ArrayBlockingQueue队列

it2022-05-05  131

BlockingQueue用法:

通常用于在线程上生存另一个线程消耗的对象

生成线程将继续生成新对象并将它们插入队列,知道队列达到它包含内容的上限,理论上这个上限取决于你的硬盘有多大,当队列达到上限时,会进入阻塞状态。此时,当你尝试插入新对象时会阻止生成线程,它一直被阻塞,直到消费线程将一个对象从队列中取出。

消费线程不断将对象从阻塞队列中取出并处理它们,如果消费线程试图将对象从空队列中取出,则消耗线程将被阻塞,直到生成线程将对象放入队列

 抛出异常特殊价值块超时插入add(o)offer(o)put()offer(o,timeout,timeunit)去掉remove(o)poll()take()poll(timeout,timeunit)检查element()peek()  

BlockingQueue是一个接口

1.主程序:开启两个线程,一个往队列里面添加数据,一个从队列中读取数据,一旦队列里有新数据产生,立马读取

package aesean; import java.util.Scanner; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import aesean.queue.Consumer; import aesean.queue.Producer; /** * Created by wrs on 2019/7/18,16:57 * projectName: Testz * packageName: aesean */ public class BlockingQueueExample { public static void main(String[] args) throws InterruptedException { BlockingQueueExample testClient = new BlockingQueueExample(); BlockingQueue queue = new ArrayBlockingQueue(1024); Producer producer = new Producer(queue); Consumer consumer = new Consumer(queue); new Thread(producer).start(); new Thread(consumer).start(); Thread.sleep(4000); //开始命令 System.out.println("开始任务: "); Scanner scanner = new Scanner(System.in); while (true) { System.out.print("请输入命令:"); int order = scanner.nextInt(); switch (order) { case 1: producer.setData1("tom","jack"); break; case 2: producer.setData2("two"); break; default: System.out.println("无法识别的命令"); break; } } } }

2.写的线程方法:

package aesean.queue; import java.util.concurrent.BlockingQueue; /** * Created by wrs on 2019/7/18,16:57 * projectName: Testz * packageName: aesean.queue */ public class Producer implements Runnable{ protected BlockingQueue queue = null; public Producer(BlockingQueue queue) { this.queue = queue; } public void run() { try { queue.put("1"); Thread.sleep(1000); queue.put("2"); Thread.sleep(2000); queue.put("3"); } catch (InterruptedException e) { e.printStackTrace(); } } public void setData1(String content1,String content2) { try { queue.put(content1); Thread.sleep(1000); queue.put(content2); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } public void setData2(String content) { try { queue.put(content); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }

3.读的线程

package aesean.queue; import java.util.concurrent.BlockingQueue; import sun.rmi.runtime.Log; /** * Created by wrs on 2019/7/18,16:58 * projectName: Testz * packageName: aesean.queue */ public class Consumer implements Runnable{ private String TAG = "Consumer"; protected BlockingQueue queue = null; public Consumer(BlockingQueue queue) { this.queue = queue; } public void run() { while (true) { try { System.out.println(queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } } } }

看看运行的效果图: 实现的效果是,每当我往队列里面新增数据的时候  都通过读的线程,把队列里面的数据取出来,这样再也不用担心消息发送失败了,存到队列里面 定时发送

end

 


最新回复(0)