1、生产者消费者模型
生产者 —— 生产数据的人
消费者 —— 消费数据的人
生产者消费者模型:供销数据不平衡的现象。
1 import time 2 import random 3 from multiprocessing import Process, Queue 4 5 def consumer(q): 6 while True: 7 obj = q.get() 8 print(f'消费了一个数据{obj}') 9 time.sleep(random.randint(1, 3)) 10 11 if __name__ == "__main__": 12 q = Queue() 13 Process(target=consumer, args=(q,)).start() 14 for i in range(10): 15 time.sleep(random.randint(1, 5)) 16 q.put(f"food{i}") 17 print(f'生产了一个数据food{i}') 生产者消费者速度不一致 D:\Python36\python.exe E:/Python/草稿纸.py 生产了一个数据food0 消费了一个数据food0 生产了一个数据food1 消费了一个数据food1 生产了一个数据food2 消费了一个数据food2 生产了一个数据food3 消费了一个数据food3 生产了一个数据food4 消费了一个数据food4 生产了一个数据food5 消费了一个数据food5 生产了一个数据food6 生产了一个数据food7 消费了一个数据food6 生产了一个数据food8 生产了一个数据food9 消费了一个数据food7 消费了一个数据food8 消费了一个数据food9 结果(阻塞)1 import time 2 import random 3 from multiprocessing import Process, Queue 4 5 def consumer(name, q): 6 while True: 7 obj = q.get() 8 print(f'{name}吃了一个{obj}') 9 time.sleep(random.randint(1, 3)) 10 11 def producer(name, food, q): 12 for i in range(10): 13 time.sleep(random.randint(1, 5)) 14 q.put(f'{name}生产的{food}{i}') 15 print(f'{name}生产了一个数据{food}{i}') 16 17 if __name__ == '__main__': 18 q = Queue() 19 Process(target=consumer, args=('alex', q)).start() 20 Process(target=producer, args=('yuan', '泔水', q)).start() 21 Process(target=producer, args=('egon', '骨头', q)).start() 供大于求 D:\Python36\python.exe E:/Python/草稿纸.py egon生产了一个数据骨头0 alex吃了一个egon生产的骨头0 yuan生产了一个数据泔水0 egon生产了一个数据骨头1 alex吃了一个yuan生产的泔水0 alex吃了一个egon生产的骨头1 yuan生产了一个数据泔水1 egon生产了一个数据骨头2 alex吃了一个yuan生产的泔水1 egon生产了一个数据骨头3 yuan生产了一个数据泔水2 alex吃了一个egon生产的骨头2 alex吃了一个egon生产的骨头3 egon生产了一个数据骨头4 alex吃了一个yuan生产的泔水2 yuan生产了一个数据泔水3 alex吃了一个egon生产的骨头4 alex吃了一个yuan生产的泔水3 yuan生产了一个数据泔水4 egon生产了一个数据骨头5 yuan生产了一个数据泔水5 alex吃了一个yuan生产的泔水4 yuan生产了一个数据泔水6 egon生产了一个数据骨头6 alex吃了一个egon生产的骨头5 yuan生产了一个数据泔水7 alex吃了一个yuan生产的泔水5 yuan生产了一个数据泔水8 alex吃了一个yuan生产的泔水6 egon生产了一个数据骨头7 alex吃了一个egon生产的骨头6 yuan生产了一个数据泔水9 alex吃了一个yuan生产的泔水7 egon生产了一个数据骨头8 alex吃了一个yuan生产的泔水8 egon生产了一个数据骨头9 alex吃了一个egon生产的骨头7 alex吃了一个yuan生产的泔水9 alex吃了一个egon生产的骨头8 alex吃了一个egon生产的骨头9 结果(consumer结束不了)
1 import time 2 import random 3 from multiprocessing import Process, Queue 4 5 6 def consumer(name, q): 7 while True: 8 obj = q.get() # 阻塞 9 if obj is None: 10 break 11 print(f'{name}吃了一个{obj}') 12 time.sleep(random.randint(1, 3)) 13 14 15 def producer(name, food, q): 16 for i in range(10): 17 time.sleep(random.randint(1, 5)) 18 q.put(f'{name}生产的{food}{i}') 19 print(f'{name}生产了一个{food}{i}') 20 21 22 if __name__ == '__main__': 23 q = Queue() 24 Process(target=consumer, args=('alex', q)).start() 25 Process(target=consumer, args=('wusir', q)).start() 26 p1 = Process(target=producer, args=('yuan', '泔水', q)) 27 p1.start() 28 p2 = Process(target=producer, args=('egon', '骨头', q)) 29 p2.start() 30 p1.join() 31 p2.join() 32 q.put(None) 33 q.put(None) 数据被消费完就结束进程 D:\Python36\python.exe E:/Python/草稿纸.py egon生产了一个骨头0 alex吃了一个egon生产的骨头0 yuan生产了一个泔水0 wusir吃了一个yuan生产的泔水0 yuan生产了一个泔水1 alex吃了一个yuan生产的泔水1 egon生产了一个骨头1 yuan生产了一个泔水2 alex吃了一个egon生产的骨头1 wusir吃了一个yuan生产的泔水2 egon生产了一个骨头2 alex吃了一个egon生产的骨头2 yuan生产了一个泔水3 wusir吃了一个yuan生产的泔水3 egon生产了一个骨头3 wusir吃了一个egon生产的骨头3 yuan生产了一个泔水4 wusir吃了一个yuan生产的泔水4 egon生产了一个骨头4 alex吃了一个egon生产的骨头4 yuan生产了一个泔水5 wusir吃了一个yuan生产的泔水5 egon生产了一个骨头5 alex吃了一个egon生产的骨头5 egon生产了一个骨头6 wusir吃了一个egon生产的骨头6 yuan生产了一个泔水6 alex吃了一个yuan生产的泔水6 yuan生产了一个泔水7 alex吃了一个yuan生产的泔水7 egon生产了一个骨头7 wusir吃了一个egon生产的骨头7 yuan生产了一个泔水8 alex吃了一个yuan生产的泔水8 yuan生产了一个泔水9 wusir吃了一个yuan生产的泔水9 egon生产了一个骨头8 alex吃了一个egon生产的骨头8 egon生产了一个骨头9 wusir吃了一个egon生产的骨头9 Process finished with exit code 0 结果
q.join() 对这个队列进行阻塞,这个队列中的所有值被取走,且执行了task_done.
1 import time 2 from multiprocessing import Process, JoinableQueue 3 4 def consumer(q): 5 while True: 6 print(q.get()) 7 time.sleep(0.3) 8 q.task_done() # 通知队列一个数据已经被处理完了 9 10 if __name__ == "__main__": 11 q = JoinableQueue() 12 c = Process(target=consumer, args=(q,)) 13 c.daemon = True 14 c.start() 15 for i in range(10): 16 q.put(i) # 10个数据 17 q.join() # join表示所有的数据都被取走且被处理完才结束阻塞 18 print('所有数据都被处理完了') JoinableQueue D:\Python36\python.exe E:/Python/草稿纸.py 0 1 2 3 4 5 6 7 8 9 所有数据都被处理完了 Process finished with exit code 0 结果1 import time 2 import random 3 from multiprocessing import Process, JoinableQueue 4 5 def consumer(name, q): 6 while True: 7 obj = q.get() # 阻塞 8 print(f'{name}吃了一个{obj}') 9 time.sleep(random.randint(1, 3)) 10 q.task_done() 11 12 def producer(name, food, q): 13 for i in range(10): 14 time.sleep(random.randint(1, 5)) 15 q.put(f'{name}生产的{food}{i}') 16 print(f'{name}生产了一个{food}{i}') 17 18 if __name__ == "__main__": 19 q = JoinableQueue() 20 c1 = Process(target=consumer, args=('alex', q)) 21 c2 = Process(target=consumer, args=('wusir', q)) 22 c1.daemon = True 23 c2.daemon = True 24 p1 = Process(target=producer, args=('yuan', '泔水', q)) 25 p2 = Process(target=producer, args=('egon', '骨头', q)) 26 c1.start() 27 c2.start() 28 p1.start() 29 p2.start() 30 p1.join() 31 p2.join() 32 q.join() 协调生产者与消费者之间的供给关系 D:\Python36\python.exe E:/Python/草稿纸.py egon生产了一个骨头0 alex吃了一个egon生产的骨头0 yuan生产了一个泔水0 wusir吃了一个yuan生产的泔水0 egon生产了一个骨头1 wusir吃了一个egon生产的骨头1 egon生产了一个骨头2 alex吃了一个egon生产的骨头2 yuan生产了一个泔水1 wusir吃了一个yuan生产的泔水1 egon生产了一个骨头3 alex吃了一个egon生产的骨头3 yuan生产了一个泔水2 wusir吃了一个yuan生产的泔水2 egon生产了一个骨头4 alex吃了一个egon生产的骨头4 yuan生产了一个泔水3 wusir吃了一个yuan生产的泔水3 egon生产了一个骨头5 alex吃了一个egon生产的骨头5 yuan生产了一个泔水4 wusir吃了一个yuan生产的泔水4 yuan生产了一个泔水5 egon生产了一个骨头6 alex吃了一个yuan生产的泔水5 wusir吃了一个egon生产的骨头6 egon生产了一个骨头7 alex吃了一个egon生产的骨头7 yuan生产了一个泔水6 wusir吃了一个yuan生产的泔水6 yuan生产了一个泔水7 alex吃了一个yuan生产的泔水7 egon生产了一个骨头8 wusir吃了一个egon生产的骨头8 yuan生产了一个泔水8 alex吃了一个yuan生产的泔水8 egon生产了一个骨头9 wusir吃了一个egon生产的骨头9 yuan生产了一个泔水9 alex吃了一个yuan生产的泔水9 Process finished with exit code 0 结果
队列:
维护了一个先进先出的顺序.
且保证了数据在进程之间的安全.
2、管道
管道在数据管理上是不安全的
队列的实现机制就是 管道 + 锁
1 from multiprocessing import Pipe, Process 2 3 # 左发右 4 lp, rp = Pipe() 5 lp.send([1, 2, 3]) 6 print(rp.recv()) 7 # 右发左 8 rp.send('aa') 9 print(lp.recv()) 管道通信 D:\Python36\python.exe E:/Python/草稿纸.py [1, 2, 3] aa Process finished with exit code 0 结果1 from multiprocessing import Pipe, Process 2 3 def consumer(lp, rp): 4 lp.close() 5 while True: 6 try: 7 print(rp.recv()) 8 except EOFError: 9 break 10 11 if __name__ == '__main__': 12 lp, rp = Pipe() 13 Process(target=consumer, args=(lp, rp)).start() 14 Process(target=consumer, args=(lp, rp)).start() 15 Process(target=consumer, args=(lp, rp)).start() 16 Process(target=consumer, args=(lp, rp)).start() 17 Process(target=consumer, args=(lp, rp)).start() 18 rp.close() 19 for i in range(500): 20 lp.send(f'food{i}') 21 lp.close() 示例 D:\Python36\python.exe E:/Python/草稿纸.py food0 food1 food2 food3 food4 food5 food6 food7 food8 food9 food10 food11 food12 food13 food14 food15 food16 food17 food18 food19 food20 food21 food22 food23 food24 food25 food26 food27 food28 food29 food30 food31 food32 food33 food34 food35 food36 food37 food38 food39 food40 food41 food42 food43 food44 food45 food46 food47 food48 food49 food50 food51 food52 food53 food54 food55 food56 food57 food58 food59 food60 food61 food62 food63 food64 food65 food66 food67 food68 food69 food70 food71 food72 food73 food74 food75 food76 food77 food78 food79 food80 food81 food82 food83 food84 food85 food86 food87 food88 food89 food90 food91 food92 food93 food94 food95 food96 food97 food98 food99 food100 food101 food102 food103 food104 food105 food106 food107 food108 food109 food110 food111 food112 food113 food114 food115 food116 food117 food118 food119 food120 food121 food122 food123 food124 food125 food126 food127 food128 food129 food130 food131 food132 food133 food134 food135 food136 food137 food138 food139 food140 food141 food142 food143 food144 food145 food146 food147 food148 food149 food150 food151 food152 food153 food154 food155 food156 food157 food158 food159 food160 food161 food162 food163 food164 food165 food166 food167 food168 food169 food170 food171 food172 food173 food174 food175 food176 food177 food178 food179 food180 food181 food182 food183 food184 food185 food186 food187 food188 food189 food190 food191 food192 food193 food194 food195 food196 food197 food198 food199 food200 food201 food202 food203 food204 food205 food206 food207 food208 food209 food210 food211 food212 food213 food214 food215 food216 food217 food218 food219 food220 food221 food222 food223 food224 food225 food226 food227 food228 food229 food230 food231 food232 food233 food234 food235 food236 food237 food238 food239 food240 food241 food242 food243 food244 food245 food246 food247 food248 food249 food250 food251 food252 food253 food254 food255 food256 food257 food258 food259 food260 food261 food262 food263 food264 food265 food266 food267 food268 food269 food270 food271 food272 food273 food274 food275 food276 food277 food278 food279 food280 food281 food282 food283 food284 food285 food286 food287 food288 food289 food290 food291 food292 food293 food294 food295 food296 food297 food298 food299 food300 food301 food302 food303 food304 food305 food306 food307 food308 food309 food310 food311 food312 food313 food314 food315 food316 food317 food318 food319 food320 food321 food322 food323 food324 food325 food326 food327 food328 food329 food330 food331 food332 food333 food334 food335 food336 food337 food338 food339 food340 food341 food342 food343 food344 food345 food346 food347 food348 food349 food350 food351 food352 food353 food354 food355 food356 food357 food358 food359 food360 food361 food362 food363 food364 food365 food366 food367 food368 food369 food370 food371 food372 food373 food374 food375 food376 food377 food378 food379 food380 food381 food382 food383 food384 food385 food386 food387 food388 food389 food390 food391 food392 food393 food394 food395 food396 food397 food398 food399 food400 food401 food402 food403 food404 food405 food406 food407 food408 food409 food410 food411 food412 food413 food414 food415 food416 food417 food418 food419 food420 food421 food422 food423 food424 food425 food426 food427 food428 food429 food430 food431 food432 food433 food434 food435 food436 food437 food438 food439 food440 food441 food442 food443 food444 food445 food446 food447 food448 food449 food450 food451 food452 food453 food454 food455 food456 food457 food458 food459 food460 food461 food462 food463 food464 food465 food466 food467 food468 food469 food470 food471 food472 food474 food475 food473 food476 food477 food478 food479 food481 food482 food483 food484 food485 food486 food487 food488 food489 food490 food491 food492 food493 food494 food495 food496 food497 food498 food499 food480 Process finished with exit code 0 结果
3、数据共享
普通模式:
1 from multiprocessing import Manager, Process, Lock 2 3 4 def work(d, lock): 5 with lock: # 不加锁而操作共享的数据,肯定会出现数据错乱 6 d['count'] -= 1 7 8 9 if __name__ == '__main__': 10 lock = Lock() 11 m = Manager() 12 dic = m.dict({'count': 100}) 13 p_l = [] 14 for i in range(100): 15 p = Process(target=work, args=(dic, lock)) 16 p_l.append(p) 17 p.start() 18 for p in p_l: 19 p.join() 20 print(dic) 示例 D:\Python36\python.exe E:/Python/草稿纸.py {'count': 0} Process finished with exit code 0 结果
上下文管理模式:
1 from multiprocessing import Manager, Process, Lock 2 3 4 def work(d, lock): 5 with lock: # 不加锁而操作共享的数据,肯定会出现数据错乱 6 d['count'] -= 1 7 8 9 if __name__ == '__main__': 10 lock = Lock() 11 with Manager() as m: 12 dic = m.dict({'count': 100}) 13 p_l = [] 14 for i in range(100): 15 p = Process(target=work, args=(dic, lock)) 16 p_l.append(p) 17 p.start() 18 for p in p_l: 19 p.join() 20 print(dic) 示例 D:\Python36\python.exe E:/Python/草稿纸.py {'count': 0} Process finished with exit code 0 结果
4、进程池
多进程和进程池的对比:
对于纯计算型的代码,使用进程池更好 ——真理
对于高IO的代码,直接使用多进程更好 —— 相对论
结论:进程池比起多进程来说,节省了开启进程回收进程的时间,给操作系统调度进程降低了难度
使用进程池提交任务:
apply # 同步提交任务,没有多进程的优势
apply_async # 异步提交任务,常用,可以通过get方法获取返回值
close # 关闭进程池,阻止往池中添加新的任务
join # join依赖close,一个进程必须先close再join
map # 接收一个任务函数,和一个iterable。节省了for循环和close、join,是一种简便的写法。
apply_async和map相比,操作复杂,但是可以通过get方法获取返回值。
1 import os 2 def wahaha(num): 3 print('子',os.getpid()) 4 return num ** num 5 6 def call(argv): # 回调函数用的是主进程的资源 7 print(os.getpid()) 8 print(argv) 9 10 if __name__ == '__main__': 11 print('主', os.getpid()) 12 p = Pool(5) 13 p.apply_async(func=wahaha, args=(50,), callback=call) 14 p.close() 15 p.join() 进程池 D:\Python36\python.exe E:/Python/草稿纸.py 主 13636 子 13636 13636 8881784197001252323389053344726562500000000000000000000000000000000000000000000000000 Process finished with exit code 0 结果
转载于:https://www.cnblogs.com/ZN-225/p/9179112.html