首先声明,本文不是完全由自己编写完成,参考了网上的例子并自己实践后整理而成,并且添加了自己实际的体会和注解。文末会附件上参考的链接,尊重别人的劳作。
http://www.cnblogs.com/yyyg/p/5602753.html
线程和进程是对计算机而言的,任何编程语言都只是对线程和进程的操作而已。因此不同编程语言中线程和进程的概念从根本上来说都是一样的,下面介绍下大家耳熟能详的进程和线程的概念和区别:
>什么是线程 线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。一个线程是一个execution context(执行上下文),即一个cpu执行时所需要的一串指令。 >线程的工作方式 假设你正在读一本书,没有读完,你想休息一下,但是你想在回来时恢复到当时读的具体进度。有一个方法就是记下页数、行数与字数这三个数值,这些数值就是execution context。如果你的室友在你休息的时候,使用相同的方法读这本书。你和她只需要这三个数字记下来就可以在交替的时间共同阅读这本书了。 线程的工作方式与此类似。CPU会给你一个在同一时间能够做多个运算的幻觉,实际上它在每个运算上只花了极少的时间,本质上CPU同一时刻只干了一件事。它能这样做就是因为它有每个运算的execution context。就像你能够和你朋友共享同一本书一样,多任务也能共享同一块CPU。 >什么是进程 进程是程序分配资源的最小单位,一个程序的执行实例就是一个进程。每一个进程提供执行程序所需的所有资源。(进程本质上是资源的集合)。一个进程有一个虚拟的地址空间、可执行的代码、操作系统的接口、安全的上下文(记录启动该进程的用户和权限等等)、唯一的进程ID、环境变量、优先级类、最小和最大的工作空间(内存空间),还要有至少一个线程。 每一个进程启动时都会最先产生一个线程,即主线程。然后主线程会再创建其他的子线程。 与进程相关的资源包括: ---内存页(同一个进程中的所有线程共享同一个内存空间) ---文件描述符(e.g. open sockets) ---安全凭证(e.g.启动该进程的用户ID) 进程与线程区别 1.同一个进程中的线程共享同一内存空间,但是进程之间是独立的。 2.同一个进程中的所有线程的数据是共享的(进程通讯),进程之间的数据是独立的。 3.对主线程的修改可能会影响其他线程的行为,但是父进程的修改(除了删除以外)不会影响其他子进程。 4.线程是一个上下文的执行指令,而进程则是与运算相关的一簇资源。 5.同一个进程的线程之间可以直接通信,但是进程之间的交流需要借助中间代理来实现。 6.创建新的线程很容易,但是创建新的进程需要对父进程做一次复制。 7.一个线程可以操作同一进程的其他线程,但是进程只能操作其子进程。 8.线程启动速度快,进程启动速度慢(但是两者运行速度没有可比性)。threading模块是Python3中的多线程的相关模块,在详细的介绍这个模块之前先了解threading.py源码中开头的一段注释内容:
# Note regarding PEP 8 compliant names # This threading model was originally inspired by Java, and inherited # the convention of camelCase function and method names from that # language. Those original names are not in any imminent danger of # being deprecated (even for Py3k),so this module provides them as an # alias for the PEP 8 compliant names # Note that using the new PEP 8 compliant names facilitates substitution # with the multiprocessing module, which doesn't provide the old # Java inspired names.可以看出Python3的threading模块实际和Java的Thread非常的类似。如果知道Java中的Thread相关的操作,那么对于理解Python3的多线程和多进程会很有帮助。后面介绍Python3的多线程的创建,销毁,通信等内容Python3中创建一个线程是通过threading.Thread()类创建的,下面是这个类的部分源码定义:
##threading.py 部分源码#threading模块全部属性,方法,子类 __all__ = ['get_ident', 'active_count', 'Condition', 'current_thread', 'enumerate', 'main_thread', 'TIMEOUT_MAX', 'Event', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', 'Barrier', 'BrokenBarrierError', 'Timer', 'ThreadError', 'setprofile', 'settrace', 'local', 'stack_size'] #threading.Thread()子类的定义和初始化函数。class Thread: def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None):其中最重要的2个参数就是 target和args。这两个参数用于传递运行的函数和运行函数的参数下面列出Thread()子类的部分源码,用于了解常用的方法:
class Thread: def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None):def _reset_internal_locks(self, is_alive):def __repr__(self): def start(self): def run(self): def _bootstrap(self):def _bootstrap_inner(self): def _stop(self):def _delete(self): def join(self, timeout=None):def _wait_for_tstate_lock(self, block=True, timeout=-1): @property def name(self): @name.setter def name(self, name): @property def ident(self): def is_alive(self): @property def daemon(self): @daemon.setter def daemon(self, daemonic):Python3 通过两个标准库 _thread 和 threading 提供对线程的支持。
_thread 提供了低级别的、原始的线程以及一个简单的锁,它相比于 threading 模块的功能还是比较有限的。
threading 模块除了包含 _thread 模块中的所有方法外,还提供的其他方法:
threading.currentThread(): 返回当前的线程变量。threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。除了使用方法外,线程模块同样提供了Thread类来处理线程,Thread类提供了以下方法:
run(): 用以表示线程活动的方法。start():启动线程活动。join([time]): 等待至线程中止。这阻塞调用线程直至线程的join() 方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生。isAlive(): 返回线程是否活动的。getName(): 返回线程名。setName(): 设置线程名。线程的常用方法:
方法注释start()线程准备就绪,等待CPU调度,启动线程的唯一方法,参考javasetName()为线程设置名称getName()获取线程名称setDaemon(True)设置为守护线程join()逐个执行每个线程,执行完毕后继续往下执行run()线程被cpu调度后自动执行线程对象的run方法,如果想自定义线程类,直接重写run方法就行了
方法1:普通方式
import threading import time def testRun(n): print("Thread Task", n) time.sleep(1) print('2s') time.sleep(1) print('1s') time.sleep(1) print('0s') time.sleep(1) T1 = threading.Thread(target=testRun, args=("T1",)) T2 = threading.Thread(target=testRun, args=("T2",)) T1.start() T2.start()输出:Thread Task T1Thread Task T22s2s1s1s0s0s 可以看出2个线程之间是并行运行的方法2:继承threading.Thread并重写run方法
import threading import time class MyThread(threading.Thread): def __init__(self, n): super(MyThread, self).__init__() # 重构run函数必须要写 self.n = n def run(self): print("task", self.n) time.sleep(1) print('2s') time.sleep(1) print('1s') time.sleep(1) print('0s') time.sleep(1) if __name__ == "__main__": t1 = MyThread("t1") t2 = MyThread("t2") t1.start() t2.start()输出:task t1 task t2 2s 2s 1s 1s 0s 0s
time.sleep()的时候是不会占用cpu的,在sleep的时候操作系统会把线程暂时挂起。Thread.join() #等此线程执行完后,再执行其他线程或主线程threading.current_thread() #输出当前线程
import threading import time def run(n): print("task", n, threading.current_thread()) # 输出当前的线程 time.sleep(1) print('3s') time.sleep(1) print('2s') time.sleep(1) print('1s') strat_time = time.time() t_obj = [] # 定义列表用于存放子线程实例 for i in range(3): t = threading.Thread(target=run, args=("t-%s" % i,)) t.start() t_obj.append(t) for tmp in t_obj: tmp.join() # 为每个子线程添加join之后,主线程就会等这些子线程执行完之后再执行。 print("cost:", time.time() - strat_time) # 主线程 print(threading.current_thread()) # 输出当前线程输出:task t-0 <Thread(Thread-1, started 10564)>task t-1 <Thread(Thread-2, started 13928)>task t-2 <Thread(Thread-3, started 13692)>3s3s3s2s2s2s1s1s1scost: 3.0080878734588623<_MainThread(MainThread, started 11676)>
可以看出3个线程每一个线程sleep 3秒,如果sleep计算CPU时间的话,总耗时应该是9秒。但是sleep的时候实际是不计入CPU时间的,所以实际耗时是3个线程总共3秒左右。实际运行的默认线程是主线程:MainThreadthreading.active_count():统计当前活跃的线程总数。
实例1:
import threading import time def run(n): print("task", n) time.sleep(1) #此时子线程停1s for i in range(3): t = threading.Thread(target=run, args=("t-%s" % i,)) t.start() time.sleep(0.5) #主线程停0.5秒 print(threading.active_count()) #输出当前活跃的线程数 输出: task t-0 task t-1 task t-2 4解释:由于主线程运行速度要比子线程运行的速度快很多。所以当主线程执行 threading.active_count()的时候,其他3个子线程任然在运行,所以3+1=4.
实例2:
import threading import time def run(n): print("task", n) time.sleep(1) #此时子线程停1s for i in range(3): t = threading.Thread(target=run, args=("t-%s" % i,)) t.start() time.sleep(1) #主线程停1秒 print(threading.active_count()) #输出当前活跃的线程数 输出: task t-0 task t-1 task t-2 1解释:唯一的区别就是主线程sleep时间变长,所以当主线程运行threading.active_count()的时候,其他3个子线程都已经执行完毕,此时活跃线程就只有主线程,所以线程数为1.setDaemon(True)把所有的子线程都变成了主线程的守护线程,因此当主进程结束后,子线程也会随之结束。所以当主线程结束后,整个程序就退出了。
import threading import time def run(n): print("task", n) time.sleep(1) #此时子线程停1s print('3') time.sleep(1) print('2') time.sleep(1) print('1') for i in range(3): t = threading.Thread(target=run, args=("t-%s" % i,)) t.setDaemon(True) #把子线程设置为守护线程,必须在start()之前设置 t.start() time.sleep(0.5) #主线程停0.5秒 print(threading.active_count()) #输出活跃的线程数 输出: task t-0 task t-1 task t-2 4
在非python环境中,单核情况下,同时只能有一个任务执行。多核时可以支持多个线程同时执行。但是在python中,无论有多少核,同时只能执行一个线程。究其原因,这就是由于GIL的存在导致的。
GIL的全称是Global Interpreter Lock(全局解释器锁),来源是python设计之初的考虑,为了数据安全所做的决定。某个线程想要执行,必须先拿到GIL,我们可以把GIL看作是“通行证”,并且在一个python进程中,GIL只有一个。拿不到通行证的线程,就不允许进入CPU执行。GIL只在cpython中才有,因为cpython调用的是c语言的原生线程,所以他不能直接操作cpu,只能利用GIL保证同一时间只能有一个线程拿到数据。而在pypy和jpython中是没有GIL的。Python多线程的工作过程:
python在使用多线程的时候,调用的是c语言的原生线程。
1-拿到公共数据2-申请gil3-python解释器调用os原生线程4-os操作cpu执行运算5-当该线程执行时间到后,无论运算是否已经执行完,gil都被要求释放6-进而由其他进程重复上面的过程7-等其他进程执行完后,又会切换到之前的线程(从他记录的上下文继续执行)
整个过程是每个线程执行自己的运算,当执行时间到就进行切换(context switch)。
python针对不同类型的代码执行效率也是不同的:
1、CPU密集型代码(各种循环处理、计算等等),在这种情况下,由于计算工作多,ticks计数很快就会达到阈值,然后触发GIL的释放与再竞争(多个线程来回切换当然是需要消耗资源的),所以python下的多线程对CPU密集型代码并不友好。2、IO密集型代码(文件处理、网络爬虫等涉及文件读写的操作),多线程能够有效提升效率(单线程下有IO操作会进行IO等待,造成不必要的时间浪费,而开启多线程能在线程A等待时,自动切换到线程B,可以不浪费CPU的资源,从而能提升程序执行效率)。所以python的多线程对IO密集型代码比较友好。
使用建议?
python下想要充分利用多核CPU,就用多进程。因为每个进程有各自独立的GIL,互不干扰,这样就可以真正意义上的并行执行,在python中,多进程的执行效率优于多线程(仅仅针对多核CPU而言)。
GIL在python中的版本差异:
1、在python2.x里,GIL的释放逻辑是当前线程遇见IO操作或者ticks计数达到100时进行释放。(ticks可以看作是python自身的一个计数器,专门做用于GIL,每次释放后归零,这个计数可以通过sys.setcheckinterval 来调整)。而每次释放GIL锁,线程进行锁竞争、切换线程,会消耗资源。并且由于GIL锁存在,python里一个进程永远只能同时执行一个线程(拿到GIL的线程才能执行),这就是为什么在多核CPU上,python的多线程效率并不高。2、在python3.x中,GIL不使用ticks计数,改为使用计时器(执行时间达到阈值后,当前线程释放GIL),这样对CPU密集型程序更加友好,但依然没有解决GIL导致的同一时间只能执行一个线程的问题,所以效率依然不尽如人意。
多线程环境下,最容易出现的问题就是线程安全的问题。Java语言,Python语言都会有这个问题。究其原因就是多线程对共享变量的访问和修改是并行的。由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,当多个线程同时修改同一条数据时可能会出现脏数据,所以,出现了线程锁,即同一时刻允许一个线程执行操作。线程锁用于锁定资源,你可以定义多个锁, 像下面的代码, 当你需要独占某一资源时,任何一个锁都可以锁这个资源,就好比你用不同的锁都可以把相同的一个门锁住是一个道理。由于线程之间是进行随机调度,如果有多个线程同时操作一个对象,如果没有很好地保护该对象,会造成程序结果的不可预期,我们也称此为“线程不安全”。
实际:模拟线程不安全
#实测:在python2.7、mac os下,运行以下代码可能会产生脏数据。Python3未复现线程不安全的状态。
import threading import time def run(n): global num num += 1 num = 0 t_obj = [] for i in range(20000): t = threading.Thread(target=run, args=("t-%s" % i,)) t.start() t_obj.append(t) for t in t_obj: t.join() print "num:", num预期结果:20000.如果结果不是20000就说明出现了线程不安全。为了方式上面情况的发生,就出现了互斥锁(Lock)
import threading def run(n): lock.acquire() #获取锁 global num num += 1 lock.release() #释放锁 lock = threading.Lock() #实例化一个锁对象 num = 0 t_obj = [] for i in range(20000): t = threading.Thread(target=run, args=("t-%s" % i,)) t.start() t_obj.append(t) for t in t_obj: t.join() print("num:", num)RLcok类的用法和Lock类一模一样,但它支持嵌套,在多个锁没有释放的时候一般会使用使用RLcok类。
import threading import time gl_num = 0 lock2 = threading.RLock() def Func(): lock2.acquire() global gl_num gl_num += 1 time.sleep(1) print(gl_num) lock2.release() for i in range(10): t = threading.Thread(target=Func) t.start() 输出: 1-10互斥锁同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如3个坑,那最多只允许3个人,后面的人只能等里面有人出来了才能再进去。
import threading import time def run(n): semaphore.acquire() #加锁 time.sleep(1) print("run the thread:%s\n" % n) semaphore.release() #释放 num = 0 semaphore = threading.BoundedSemaphore(5) # 最多允许5个线程同时运行 for i in range(22): t = threading.Thread(target=run, args=("t-%s" % i,)) t.start() while threading.active_count() != 1: pass # print threading.active_count() else: print('-----all threads done-----') 输出: 5个5个的输出 run the thread:t-2 run the thread:t-3 run the thread:t-0 run the thread:t-1 run the thread:t-4 run the thread:t-5 run the thread:t-6 run the thread:t-7 run the thread:t-9 run the thread:t-8 run the thread:t-10 run the thread:t-11 run the thread:t-13 run the thread:t-12 run the thread:t-14 run the thread:t-16 run the thread:t-17 run the thread:t-15 run the thread:t-18 run the thread:t-19 run the thread:t-20 run the thread:t-21 -----all threads done-----python线程的事件用于主线程控制其他线程的执行,事件是一个简单的线程同步对象,其主要提供以下几个方法:
方法注释clear将flag设置为“False”set将flag设置为“True”is_set判断是否设置了flagwait会一直监听flag,如果没有检测到flag就一直处于阻塞状态
事件处理的机制:全局定义了一个“Flag”,当flag值为“False”,那么event.wait()就会阻塞,当flag值为“True”,那么event.wait()便不再阻塞。
#利用Event类模拟红绿灯 import threading import time event = threading.Event() def lighter(): count = 0 event.set() #初始值为绿灯 while True: if 5 < count <=10 : event.clear() # 红灯,清除标志位 print("\33[41;1mred light is on...\033[0m") elif count > 10: event.set() # 绿灯,设置标志位 count = 0 else: print("\33[42;1mgreen light is on...\033[0m") time.sleep(1) count += 1 def car(name): while True: if event.is_set(): #判断是否设置了标志位 print("[%s] running..."%name) time.sleep(1) else: print("[%s] sees red light,waiting..."%name) event.wait() print("[%s] green light is on,start going..."%name) light = threading.Thread(target=lighter,) light.start() car = threading.Thread(target=car,args=("MINI",)) car.start()输出:green light is on...[MINI] running...green light is on...[MINI] running...[MINI] running...green light is on...[MINI] running...green light is on...green light is on...[MINI] running...green light is on...[MINI] running...[MINI] running...red light is on...[MINI] sees red light,waiting...red light is on...red light is on...
multiprocessing(包)模块是Python3中的多进程的相关模块,在详细的介绍这个模块之前先了解multiprocessing包中__init__.py文件源码中开头的一段注释内容:
# This package is intended to duplicate the functionality (and much of # the API) of threading.py but uses processes instead of threads. A # subpackage 'multiprocessing.dummy' has the same API but is a simple # wrapper for 'threading'.可以看出多进程实际是模仿多线程实现的,只不过用进程代替了线程而已,因此了解了多线程以后对于我们了解多进程有很大的帮助,并且multiprocessing.dumy包中也提供了一个简单的多线程操作类。Python3中创建一个多线程是通过 multiprocessing.Process创建的,下面查看一下Process类的源码定义:
from multiprocessing.pool import Pool class Process(object): def __init__(self, group=None, target=None, name=None, args=(), kwargs={}): def run(self): def start(self): def terminate(self): def join(self, timeout=None): def is_alive(self):多进程
import multiprocessing import time def run(name): time.sleep(2) print('hello', name) if __name__ == '__main__': for i in range(10): # 起了10个进程 p = multiprocessing.Process(target=run, args=('msc%s' % i,)) p.start()输出:会短时间内输出全部10个进程。join所完成的工作就是线程、进程同步,即进程、线程任务结束之后,进入阻塞状态,一直等待其他的子线程,进程执行结束之后,线程,进程在终止
例子1:进程例子
import multiprocessing import time def run(name): time.sleep(2) print('hello', name) if __name__ == '__main__': for i in range(10): # 起了10个进程 p = multiprocessing.Process(target=run, args=('msc%s' % i,)) p.start() p.join()输出:以此输出hello msc0-9.解释:这个例子就是在创建多进程的例子上,加了join方法,此时多进程之间必须等待当前进程结束后在执行其他进程例子2:线程
import threading import time def run(): time.sleep(2) print('当前线程的名字是: ', threading.current_thread().name) time.sleep(2) if __name__ == '__main__': start_time = time.time() print('这是主线程:', threading.current_thread().name) thread_list = [] for i in range(5): t = threading.Thread(target=run) thread_list.append(t) for t in thread_list: t.setDaemon(True) t.start() for t in thread_list: t.join() print('主线程结束了!' , threading.current_thread().name) print('一共用时:', time.time()-start_time)输出:这是主线程: MainThread当前线程的名字是: Thread-3当前线程的名字是: Thread-5当前线程的名字是: Thread-2当前线程的名字是: Thread-1当前线程的名字是: Thread-4主线程结束了! MainThread一共用时: 4.032119035720825
解释:join方法等待当前线程结束后才运行主线程的计算和打印操作
详情可以参考:Python多线程与多线程中join()的用法
os.getpid():获取当前进程的idos.getppid():获取当前进程的父进程id
from multiprocessing import Process import os def info(title): print(title) print('module name:', __name__) print('parent process:', os.getppid()) # 获取父进程id print('process id:', os.getpid()) # 获取自己的进程id print("\n\n") def f(name): info('\033[31;1mfunction f\033[0m') print('hello', name) if __name__ == '__main__': info('\033[32;1mmain process line\033[0m') p = Process(target=f, args=('bob',)) p.start() p.join() 输出: main process line module name: __main__ parent process: 936 process id: 18260 function f module name: __mp_main__ parent process: 18260 process id: 14464 hello bob下面是这3个进程的关系:pycharm进程(id=936,通过Windows的jps命令查询)----》当前程序的主进程(id=18260)---》函数启动的进程(id=14464)
from multiprocessing import Process,Queue
##大写的Queue是进程队列; queue是线程队列 ##大写的Queue需要从multiprocessing导入由于进程之间数据是不共享的,所以不会出现多线程GIL带来的问题。多进程之间的通信通过Queue()或Pipe()来实现:
例子1-Queue()实现进程通信
from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()## 通过子线程put进去数据,然后在主线程get出内容,表明线程之间数据是可以共享的。
例子2-Pipe()实现进程通信
Pipe的本质是进程之间的数据传递,而不是数据共享,这和socket有点像。pipe()返回两个连接对象分别表示管道的两端,每端都有send()和recv()方法。如果两个进程试图在同一时间的同一端进行读取和写入那么,这可能会损坏管道中的数据。
from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()下面的几个例子是对进程和线程通信的实践:
例子3:
import queue from multiprocessing import Process def f(): q.put([66,None,'hello']) #这里的q属于主进程 if __name__ == '__main__': q = queue.Queue() #主进程起的q p = Process(target=f,) ## 在主进程中来定义子进程;如果在主进程中启动了子进程,那么主进程和子进程之间内存是独立的。 ## 因为内存独立,子进程p是无法访问主进程def f()中的q的。 p.start() print (q.get()) p.join() 输出: Process Process-1: Traceback (most recent call last): File "D:\Python36\lib\multiprocessing\process.py", line 249, in _bootstrap self.run() File "D:\Python36\lib\multiprocessing\process.py", line 93, in run self._target(*self._args, **self._kwargs) File "D:\WorkSpace\Python3\cekai\process4.py", line 6, in f q.put([66, None, 'hello']) # 这里的q属于主进程 NameError: name 'q' is not defined##可以看到已经报错,这是因为子进程不能访问主进程的q
例子4:
import queue from multiprocessing import Process def f(qq): qq.put([66, None, 'hello']) if __name__ == '__main__': q = queue.Queue() ##线程的Queque p = Process(target=f, args=(q,)) # 将父进程的线程q传给子进程 p.start() print(q.get()) p.join() 输出: Traceback (most recent call last): File "D:/WorkSpace/Python3/cekai/process4.py", line 13, in <module> p.start() File "D:\Python36\lib\multiprocessing\process.py", line 105, in start self._popen = self._Popen(self) File "D:\Python36\lib\multiprocessing\context.py", line 223, in _Popen return _default_context.get_context().Process._Popen(process_obj) File "D:\Python36\lib\multiprocessing\context.py", line 322, in _Popen return Popen(process_obj) File "D:\Python36\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__ reduction.dump(process_obj, to_child) File "D:\Python36\lib\multiprocessing\reduction.py", line 60, in dump ForkingPickler(file, protocol).dump(obj) TypeError: can't pickle _thread.lock objects ## 这是因为我们将线程的q传给另一个进程,这是不可以的,线程只属于当前进程,不能传给其他进程。 ## 如果想将q传给子进程,那么必须将进程q传进去,而不是线程q。例子5:
from multiprocessing import Process, Queue ##大写的Queue是进程队列; queue是线程队列 ##大写的Queue需要从multiprocessing导入 def f(qq): qq.put([66, None, 'hello']) if __name__ == '__main__': q = Queue()##进程q p = Process(target=f, args=(q,)) # 将父进程q传给子进程 p.start() print(q.get()) # 父进程去get子进程的内容 p.join() 输出: [66, None, 'hello']父进程可以get子进程put进去的内容了;从表面上看感觉是两个进程共享了数据,其实不然。
现在已经实现了进程间的通讯。父进程将q传给子进程,其实是克隆了一份q给子进程,此时子进程就多了一个q进程队列; 但是父进程又为什么能够get子进程put进去的数据呢,这是因为当前两个进程在内存空间依然是独立的,只不过子进程put的数据 通过pickle序列化放到内存中一个中间的位置,然后父进程从这个中间的位置取到数据(而不是从子进程中取的数据)。 所以进程间的通讯不是共享数据,而是一个数据的传递。 例子6: from multiprocessing import Process, Pipe def f(conn): conn.send([66, None, 'hello from child1']) conn.send([66, None, 'hello from child2']) #发送两次数据 conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) p.join() 输出: [66, None, 'hello from child1']## 可以看到这端只接收到了一次数据(发送和接收必须配对)
例子7:
from multiprocessing import Process, Pipe def f(conn): conn.send([66, None, 'hello from child1']) conn.send([66, None, 'hello from child2']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) print(parent_conn.recv()) # 第二次接收数据 p.join() 输出: [66, None, 'hello from child1'] [66, None, 'hello from child2']##对端发送几次,这端就需要接收几次
例子8:
from multiprocessing import Process, Pipe def f(conn): conn.send([66, None, 'hello from child1']) conn.send([66, None, 'hello from child2']) # 发送两次数据 conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) print(parent_conn.recv()) print(parent_conn.recv()) # 对端发送两次,本段接收三次 p.join() 输出:程序一直运行并输出 [66, None, 'hello from child1'] [66, None, 'hello from child2']## 程序卡主了,除非对端在发送一次数据。
例子9:
from multiprocessing import Process, Pipe def f(conn): conn.send([66, None, 'hello from child1']) conn.send([66, None, 'hello from child2']) # 发送两次数据 print(conn.recv()) # 接收数据 conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) print(parent_conn.recv()) parent_conn.send("data from parent_conn") # 发送数据 p.join() 输出: [66, None, 'hello from child1'] [66, None, 'hello from child2'] data from parent_conn##通过管道实现了相互发送接收数据(实现了数据传递)
通过Manager可实现进程间数据的共享。Manager()返回的manager对象会通过一个服务进程,来使其他进程通过代理的方式操作python对象。manager对象支持 list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value ,Array.
例子1:
from multiprocessing import Process, Manager import os def f(d, l, n): d[1] = '1' # 放入key和value到空字典中 d['2'] = 2 d[0.25] = None l.append(n) # 将每个进程的n值放入列表中;每个进程的n值都不同。 print(l) if __name__ == '__main__': with Manager() as manager: # 做一个别名,此时manager就相当于Manager() d = manager.dict() # 生成一个可在多个进程之间传递和共享的字典 l = manager.list(range(5)) # 生成一个可在多个进程之间传递和共享的列表;通过range(5)给列表中生成5个数据 p_list = [] for i in range(10): # 生成10个进程 p = Process(target=f, args=(d, l, i)) # 将字典和列表传给每个进程,每个进程可以进行修改 p.start() p_list.append(p) # 将每个进程放入空列表中 for res in p_list: res.join() print(d) # 所有进程都执行完毕后打印字典 print(l) # 所有进程都执行完毕后打印列表 输出: [0, 1, 2, 3, 4, 2] [0, 1, 2, 3, 4, 2, 6] [0, 1, 2, 3, 4, 2, 6, 3] [0, 1, 2, 3, 4, 2, 6, 3, 1] [0, 1, 2, 3, 4, 2, 6, 3, 1, 0] [0, 1, 2, 3, 4, 2, 6, 3, 1, 0, 7] [0, 1, 2, 3, 4, 2, 6, 3, 1, 0, 7, 8, 9] [0, 1, 2, 3, 4, 2, 6, 3, 1, 0, 7, 8, 9] [0, 1, 2, 3, 4, 2, 6, 3, 1, 0, 7, 8, 9, 4] [0, 1, 2, 3, 4, 2, 6, 3, 1, 0, 7, 8, 9, 4, 5]#第十个进程把每个进程添加的n值都加入到列表 {1: '1', '2': 2, 0.25: None} #最后打印的字典 [0, 1, 2, 3, 4, 2, 6, 3, 1, 0, 7, 8, 9, 4, 5]#列表生成的时候自动加入了0-4这5个数;然后每个进程又把各自的n值加入列表
例子2:
from multiprocessing import Process, Manager import os def f(d, l): d[os.getpid()] = os.getpid() l.append(os.getpid()) print(l) if __name__ == '__main__': with Manager() as manager: d = manager.dict() # 对字典做个调整,也将pid加入到字典中 l = manager.list(range(5)) p_list = [] for i in range(10): p = Process(target=f, args=(d, l)) p.start() p_list.append(p) for res in p_list: res.join() print(d) print(l) 输出: [0, 1, 2, 3, 4, 17200] [0, 1, 2, 3, 4, 17200, 18664] [0, 1, 2, 3, 4, 17200, 18664, 18500] [0, 1, 2, 3, 4, 17200, 18664, 18500, 15264] [0, 1, 2, 3, 4, 17200, 18664, 18500, 15264, 18588] [0, 1, 2, 3, 4, 17200, 18664, 18500, 15264, 18588, 18880] [0, 1, 2, 3, 4, 17200, 18664, 18500, 15264, 18588, 18880, 18932] [0, 1, 2, 3, 4, 17200, 18664, 18500, 15264, 18588, 18880, 18932, 19104] [0, 1, 2, 3, 4, 17200, 18664, 18500, 15264, 18588, 18880, 18932, 19104, 17296] [0, 1, 2, 3, 4, 17200, 18664, 18500, 15264, 18588, 18880, 18932, 19104, 17296, 19084] {17200: 17200, 18664: 18664, 18500: 18500, 15264: 15264, 18588: 18588, 18880: 18880, 18932: 18932, 19104: 19104, 17296: 17296, 19084: 19084} [0, 1, 2, 3, 4, 17200, 18664, 18500, 15264, 18588, 18880, 18932, 19104, 17296, 19084] ##现在我们看到可以实现进程间的数据共享、修改和传递。 ##Manager()自带锁,会控制进程之间同一时间修改数据; ##字典和列表的数据不是一份,而是因为10个进程,所以有10个字典和10个列表。每个进程修改后,都会copy给其他进程,其他进程可以对最新的数据进行修改,所以数据不会被修改乱。可以看到一共10个进程,并不是连续的,说明执行进程的时候说不准先执行哪个进程。进程之间数据是独立的,这里我们为什么又要加锁呢,这是因为所有进程使用同一个屏幕来输出数据。比如 我们现在输出的数据是 hello world x,在输出的过程中很有可能其中一个进程还没输出完(比如只输出了hello wo),另一个进程就执行输出了(可能会在屏幕上看到hello wohello world0201的现象)。所以需要通过锁来控制同一时间只能有一个进程输出数据到屏幕。
执行多进程,子进程会从主进程复制一份完整数据,1个、10个进程可能还没什么感觉,但是如果有100或1000,甚至更多个进程的时候开销就会特别大,就会明显感觉到多进程执行有卡顿现象。
进程池可以设定同一时间有多少个进程可以在CPU上运行。
from multiprocessing import Process, Pool import time, os def Foo(i): time.sleep(2) print("in process", os.getpid()) # 打印进程id return i + 100 def Bar(arg): print('-->exec done:', arg) if __name__ == '__main__': ##这行代码用途是如果主动执行该代码的.py文件,则该代码下面的代码可以被执行;如果该.py模块被导入到其他模块中,从其他模块执行该.py模块,则该行下面的代码不会被执行。 有些时候可以用这种方式用于测试,在该行代码下面写一些测试代码。。 pool = Pool(5) # 同时只能放入5个进程 for i in range(10): # 创建10个进程,但是因为pool的限制,只有放入进程池中的5个进程才会被执行(),其他的被挂起了,如果进程池中其中有两个进程执行完了,就会补进2个进程进去。 pool.apply(func=Foo, args=(i,)) # pool.apply用来将进程放入pool print('end') # 执行完毕 pool.close() # 允许pool中的进程关闭(close必须在join前面,可以理解close相当于一个开关吧) pool.join() # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。 输出:一个一个的输出 in process 14776 in process 18180 in process 13384 in process 19092 in process 16248 in process 14776 in process 18180 in process 13384 in process 19092 in process 16248 end##可以看到通过串行的方式将结果打印出来,这是因为我们使用的是pool.apply。 pool.apply就是通过串行的方式来执行。
例子2:
from multiprocessing import Process, Pool import time, os def Foo(i): time.sleep(2) print("in process", os.getpid()) return i + 100 def Bar(arg): print('-->exec done:', arg) if __name__ == '__main__': pool = Pool(5) for i in range(10): pool.apply_async(func=Foo, args=(i,)) ## 使用pool.apply_async就可以并行了 print('end') pool.close() # pool.join() 注释掉 输出: end## 只执行了print('end')代码,其他进程的结果没有看到,这是因为其他进程还没有执行完成,主进程pool.close()就执行完了,close以后所有其他进程也不会在执行了。
## 要想其他进程执行完成后在关闭,必须使用pool.join() 例子3: from multiprocessing import Process, Pool import time, os def Foo(i): time.sleep(2) print("in process", os.getpid()) return i + 100 def Bar(arg): print('-->exec done:', arg) if __name__ == '__main__': pool = Pool(5) for i in range(10): pool.apply_async(func=Foo, args=(i,)) print('end') pool.close() pool.join() 输出: end in process 13336 in process 15692 in process 19284 in process 17236 in process 17900in process 19284 in process 15692 in process 13336 in process 17236 in process 17900##从执行结果来看,5个 5个的被打印出来。
回调可以用于当执行完代码后做一些后续操作,比如查看完命令后,通过回调进行备份;或者执行完什么动作后,做个日志等。
from multiprocessing import Process, Pool import time, os def Foo(i): time.sleep(2) print("in process", os.getpid()) return i + 100 def Bar(arg): print('-->exec done:', arg, os.getpid()) if __name__ == '__main__': pool = Pool(5) print("主进程:", os.getpid()) # 打印主进程id for i in range(10): pool.apply_async(func=Foo, args=(i,), callback=Bar) ##callback叫做回调,就是当执行完了func=Foo后,才会执行callback=Bar(每个进程执行完了后都会执行回调)。 ## 回调可以用于当执行完代码后做一些后续操作,比如查看完命令后,通过回调进行备份;或者执行完什么动作后,做个日志等。 ## 备份、写日志等在子进程中也可以执行,但是为什么要用回调呢! 这是因为如果用子进程,有10个子进程就得连接数据库十次,而使用回调的话是用主进程连接数据库,所以只连接一次就可以了,这样写能大大提高运行效率。 ##通过主进程建立数据库的连接的话,因为在同一个进程中只能在数据库建立一次连接,所以即使是多次被子进程回调,也不会重复建立连接的,因为数据库会限制同一个进程最大连接数,这都是有数据库设置的。 print('end') pool.close() pool.join()输出: 主进程: 17384 end in process 19168 -->exec done: 100 17384 in process 17468 in process 11568 in process 18772 -->exec done: 103 17384 -->exec done: 101 17384 -->exec done: 102 17384 in process 18300 -->exec done: 104 17384 in process 19168 -->exec done: 105 17384 in process 11568 in process 18772 in process 17468 -->exec done: 106 17384 -->exec done: 108 17384 -->exec done: 107 17384 in process 18300 -->exec done: 109 17384线程和进程的操作是由程序触发系统接口,最后的执行者是系统,它本质上是操作系统提供的功能。而协程的操作则是程序员指定的,在python中通过yield,人为的实现并发处理。协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时。协程,则只使用一个线程,分解一个线程成为多个“微线程”,在一个线程中规定某个代码块的执行顺序。协程的适用场景:当程序中存在大量不需要CPU的操作时(IO)。常用第三方模块gevent和greenlet。(本质上,gevent是对greenlet的高级封装,因此一般用它就行,这是一个相当高效的模块。)
首先安装这2个模块
例子1-greenlet
from greenlet import greenlet def test1(): print(12) gr2.switch() print(34) gr2.switch() def test2(): print(56) gr1.switch() print(78) gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch()输出:12563478
例子2-grevent
from gevent import monkey; monkey.patch_all() import gevent import requests def f(url): print('GET: %s' % url) resp = requests.get(url) data = resp.text print('%d bytes received from %s.' % (len(data), url)) gevent.joinall([ gevent.spawn(f, 'https://www.python.org/'), gevent.spawn(f, 'https://www.yahoo.com/'), gevent.spawn(f, 'https://github.com/'), ]) 输出: GET: https://www.python.org/ GET: https://www.yahoo.com/ GET: https://github.com/ 48868 bytes received from https://www.python.org/. 484789 bytes received from https://www.yahoo.com/. 59564 bytes received from https://github.com/.通过joinall将任务f和它的参数进行统一调度,实现单线程中的协程。代码封装层次很高,实际使用只需要了解它的几个主要方法即可。
参考文章:
http://www.cnblogs.com/whatisfantasy/p/6440585.html?utm_source=itdadao&utm_medium=referral
https://www.cnblogs.com/cnkai/p/7504980.html
转载于:https://www.cnblogs.com/forfreewill/articles/9318249.html