Python的线程池实现

it2022-05-05  202

Python的线程池实现 # -*- coding: utf-8 -*- #Python的线程池实现 import Queue import threading import sys import time import urllib #替我们工作的线程池中的线程 class MyThread(threading.Thread): def __init__(self, workQueue, resultQueue,timeout=30, **kwargs): threading.Thread.__init__(self, kwargs=kwargs) #线程在结束前等待任务队列多长时间 self.timeout = timeout self.setDaemon(True) self.workQueue = workQueue self.resultQueue = resultQueue self.start() def run(self): while True: try: #从工作队列中获取一个任务 callable, args, kwargs = self.workQueue.get(timeout=self.timeout) #我们要执行的任务 res = callable(args, kwargs) #报任务返回的结果放在结果队列中 self.resultQueue.put(res+" | "+self.getName()) except Queue.Empty: #任务队列空的时候结束此线程 break except : print sys.exc_info() raise class ThreadPool: def __init__(self, num_of_threads=10): self.workQueue = Queue.Queue() self.resultQueue = Queue.Queue() self.threads = [] self.__createThreadPool(num_of_threads) def __createThreadPool(self, num_of_threads): for i in range(num_of_threads): thread = MyThread(self.workQueue, self.resultQueue) self.threads.append(thread) def wait_for_complete(self): #等待所有线程完成。 while len(self.threads): thread = self.threads.pop() #等待线程结束 if thread.isAlive(): #判断线程是否还存活来决定是否调用join thread.join() def add_job(self, callable, *args, **kwargs): self.workQueue.put((callable,args,kwargs)) def test_job(id, sleep = 0.001): html = "" try: time.sleep(1) conn = urllib.urlopen('http://www.baidu.com/') html = conn.read(20) except: print sys.exc_info() return html def test(): print 'start testing' tp = ThreadPool(10) for i in range(50): time.sleep(0.2) tp.add_job(test_job, i, i*0.001) tp.wait_for_complete() #处理结果 print 'result Queue\'s length == %d '% tp.resultQueue.qsize() while tp.resultQueue.qsize(): print tp.resultQueue.get() print 'end testing' if __name__ == '__main__': test() import sys IS_PY2 = sys.version_info < (3, 0) if IS_PY2: from Queue import Queue else: from queue import Queue from threading import Thread class Worker(Thread): """ Thread executing tasks from a given tasks queue """ def __init__(self, tasks): Thread.__init__(self) self.tasks = tasks self.daemon = True self.start() def run(self): while True: func, args, kargs = self.tasks.get() try: func(*args, **kargs) except Exception as e: # An exception happened in this thread print(e) finally: # Mark this task as done, whether an exception happened or not self.tasks.task_done() class ThreadPool: """ Pool of threads consuming tasks from a queue """ def __init__(self, num_threads): self.tasks = Queue(num_threads) for _ in range(num_threads): Worker(self.tasks) def add_task(self, func, *args, **kargs): """ Add a task to the queue """ self.tasks.put((func, args, kargs)) def map(self, func, args_list): """ Add a list of tasks to the queue """ for args in args_list: self.add_task(func, args) def wait_completion(self): """ Wait for completion of all the tasks in the queue """ self.tasks.join() if __name__ == "__main__": from random import randrange from time import sleep # Function to be executed in a thread def wait_delay(d): print("sleeping for (%d)sec" % d) sleep(d) # Generate random delays delays = [randrange(3, 7) for i in range(50)] # Instantiate a thread pool with 5 worker threads pool = ThreadPool(5) # Add the jobs in bulk to the thread pool. Alternatively you could use # `pool.add_task` to add single jobs. The code will block here, which # makes it possible to cancel the thread pool with an exception when # the currently running batch of workers is finished. pool.map(wait_delay, delays) pool.wait_completion() posted on 2017-06-14 16:39 北京涛子 阅读( ...) 评论( ...) 编辑 收藏

转载于:https://www.cnblogs.com/liujitao79/p/7009666.html


最新回复(0)