asyncio标准库7 Producerconsumer

it2022-05-05  196

asyncio标准库7 Producer/consumer

使用asyncio.Queue

import asyncio import random async def produce(queue, n): for x in range(1, n + 1): # produce an item print('producing {}/{}'.format(x, n)) # simulate i/o operation using sleep await asyncio.sleep(random.random()) item = str(x) # put the item in the queue await queue.put(item) # indicate the producer is done await queue.put(None) async def consume(queue): while True: # wait for an item from the producer item = await queue.get() if item is None: # the producer emits None to indicate that it is done break # process the item print('consuming item {}...'.format(item)) # simulate i/o operation using sleep await asyncio.sleep(random.random()) loop = asyncio.get_event_loop() queue = asyncio.Queue(loop=loop) # 队列初始化 producer_coro = produce(queue, 10) consumer_coro = consume(queue) loop.run_until_complete(asyncio.gather(producer_coro, consumer_coro)) loop.close()

使用Queue.task_done 和 Queue.join:

import asyncio import random async def produce(queue, n): for x in range(n): # produce an item print('producing {}/{}'.format(x, n)) # simulate i/o operation using sleep await asyncio.sleep(random.random()) item = str(x) # put the item in the queue await queue.put(item) async def consume(queue): while True: # wait for an item from the producer item = await queue.get() # process the item print('consuming {}...'.format(item)) # simulate i/o operation using sleep await asyncio.sleep(random.random()) # Notify the queue that the item has been processed queue.task_done() async def run(n): queue = asyncio.Queue() # schedule the consumer consumer = asyncio.ensure_future(consume(queue)) # run the producer and wait for completion await produce(queue, n) # wait until the consumer has processed all items await queue.join() # the consumer is still awaiting for an item, cancel it consumer.cancel() loop = asyncio.get_event_loop() loop.run_until_complete(run(10)) loop.close() posted on 2018-03-19 17:15 北京涛子 阅读( ...) 评论( ...) 编辑 收藏

转载于:https://www.cnblogs.com/liujitao79/p/8603396.html


最新回复(0)