python多线程生产者消费者模型

it2022-05-05  120

最近写了生产者消费者实现的六种途径,可能有些地方不合适。

方法1:

#!/usr/bin/python2 #coding: UTF-8 ''' 函数式 threading标准库 lock ''' import time import threading def consumer(name): global q while True: lock.acquire() if len(q) != 0: res = q.pop() print('消费者%s 准备开吃第%s个馒头'%(name,res)) else: #供不应求 print('稍等') lock.release() time.sleep(1) def producer(name, total): global q global mantou while True: lock.acquire() if(len(q) < int(total)): mantou=mantou+1 q.append(mantou) print('生产者%s 生产了第%s个馒头'%(name,mantou)) else: #供大于求 print('装馒头的篮子满了') lock.release() time.sleep(1) lock = threading.Lock() mantou = 0 #馒头 q = [] #装馒头的篮子 def main(): total = input('输入馒头篮子的最大容量:') producers = ['p1','p2','p3','p4'] #生产者名字 consumers = ['c1','c2','c3'] #消费者名字 for p in producers: pThread = threading.Thread(target=producer,args=(p,total)) pThread.start() for p in consumers: cThread = threading.Thread(target=consumer,args=(p,)) cThread.start() if __name__ == '__main__': main()

方法2:

#!/usr/bin/python2 #coding:UTF-8 ''' 类式 threading标准库 Condition--条件变量--减少CPU资源消耗 ''' import threading import time class Producer(threading.Thread): '''生产者类--继承自Thread类''' def __init__(self, name, maxnum): threading.Thread.__init__(self) self.name = name self.maxnum = maxnum def run(self): global mantou while True: con.acquire() #加锁,准备操作共享变量 if len(task) < int(self.maxnum): #当目前的馒头个数小于最大量时 mantou += 1 task.append(mantou) print('生产者%s生产了第%s个馒头'%(self.name,mantou)) con.notify() con.release() else: #供大于求,阻塞等候 print('供大于求,稍后生产') con.notify_all() #唤醒消费者 con.wait() time.sleep(2) class Consumer(threading.Thread): '''消费者类--继承自Thread类''' def __init__(self, name): threading.Thread.__init__(self) self.name = name def run(self): global mantou while True: con.acquire() if len(task) == 0: #供不应求,阻塞等候 print('消费者%s稍等'%self.name) con.notify_all() #唤醒生产者 con.wait() else: print('消费者%s吃了第%s个馒头'%(self.name,task.pop())) con.notify() con.release() time.sleep(2) con = threading.Condition() #条件变量 mantou = 0 #馒头 task=[] #存放馒头 def main(): maxnum = input('请输入馒头最大数量个数:') #某一时间最多有多少馒头 for i in range(3): Producer(i+1,maxnum).start() for i in range(3): Consumer(i+1).start() if __name__ == '__main__': main()

方法3:

#!/usr/bin/python3 #coding:UTF-8 ''' 函数式 threading标准库 队列queue + lock ''' import queue import threading import time def producer(name): global mantou global maxnum while True: if q.full(): print("生产的馒头太多了。。。。。") q.join() # 阻塞,直到队列为空 else: lock.acquire() mantou += 1 print("生产者%s放入了第%s个馒头"%(name,mantou)) q.put("%s" %mantou) lock.release() time.sleep(1) def consumer(name): while True: if q.empty(): print("消费者稍等。。。。") else: m = q.get() print("消费者%s吃了第 %s个馒头" % (name, m)) q.task_done() #任务完成,通知join方法,取了一个数据了 time.sleep(1) mantou = 0 #馒头 q = queue.Queue(maxsize=5) #某一时刻生产的馒头数目最多有5个,先进先出队列 lock = threading.Lock() def main(): for i in range(4): #启动若干个生产者线程 p = threading.Thread(target=producer, args=(i+1,)) p.start() for i in range(3): #启动若干个消费者线程 c = threading.Thread(target=consumer, args=(i+1,)) c.start() if __name__ == '__main__': main()

方法4:

#!/usr/bin/python3 #coding:UTF-8 ''' 函数式 threading标准库 事件驱动模型 ''' import threading import time def producer(name): global mantou while True: event.wait() #当收到消费者的信号时,开始做馒头 lock.acquire() mantou += 1 basket.append(mantou) print('生产者%s做了第%s个馒头'%(name,mantou)) lock.release() event.set() #设置标志位,通知消费者吃馒头 time.sleep(0.5) def consumer(name): while True: if len(basket) > 0: print('消费者%s吃了第%s个馒头'%(name,basket.pop())) event.clear() else: print('稍等,馒头不够了。。。。') if event.isSet(): #当馒头不够时只用通知一次生产者,否则会影响消费者 pass else: event.set() #设置标志位,通知生产者做馒头 event.wait() #收到生产者的信号时开始吃馒头 time.sleep(1) event = threading.Event() #事件对象 lock = threading.Lock() basket = [] #装馒头的篮子 mantou = 0 #馒头 def main(): for i in range(3): p1 = threading.Thread(target=producer,args=(i,)) p1.start() for i in range(3): c1 = threading.Thread(target=consumer, args=(i,)) c1.start() if __name__ =='__main__': main()

方法5:

#!/usr/bin/python3 #coding: UTF-8 ''' 函数式 threading标准库 事件驱动模型 ''' import threading import time def producer(name): global mantou while True: if psem.acquire(): mutex.acquire() mantou += 1 basket.append(mantou) print('生产者%s生产了第%s个馒头'%(name,mantou)) mutex.release() psem.release() else: print('没有灶台了,稍等。。。。') time.sleep(0.3) def consumer(name): while True: if csem.acquire(): mutex.acquire() if len(basket) > 0: print('消费者%s吃了第%s个馒头'%(name,basket.pop())) else: print('馒头还没有做好,稍等。。。。') mutex.release() csem.release() else: print('店里没有位置了。。。。') time.sleep(0.3) mutex = threading.Semaphore(1) #互斥信号量,同一时间只能有一个线程访问临界资源 psem = threading.Semaphore(5) #同步信号量,同一时间只能有5个生产者线程工作 csem = threading.Semaphore(5) #同步信号量,同一时间只能服务5个消费者 basket = [] #装馒头的篮子 mantou = 0 #馒头 def main(): for i in range(10): p = threading.Thread(target = producer, args=(i+1,)) p.start() for i in range(10): c = threading.Thread(target = consumer, args=(i+1,)) c.start() if __name__ == '__main__': main()

方法6:

#!/usr/bin/python3 #coding:UTF-8 ''' 函数式 协程 ''' import gevent def producer(name): global mantou while True: mantou += 1 basket.append(mantou) print('生产者%s生产了第%s个馒头'%(name,mantou)) gevent.sleep(1) #切换到其他协程 def consumer(name): while True: if len(basket) > 0: print('消费者%s吃了第%s个馒头'%(name, basket.pop())) gevent.sleep(1) #切换到其他协程 else: print('馒头还没做好,稍等。。。。') gevent.sleep(1) #切换到其他协程 mantou = 0 #馒头 basket =[] #装馒头的篮子 def main(): threads = [] for i in range(3): #创建了3个生产者协程 g1 = gevent.spawn(producer, i+1) threads.append(g1) for i in range(3): #创建3个消费者协程 g2 = gevent.spawn(consumer, i+1) threads.append(g2) gevent.joinall(threads) if __name__ == '__main__': main()

 


最新回复(0)