流动python - 写port扫描仪和各种并发尝试(多线程多进程geventfutures)

it2025-08-10  7

port扫描仪的原理非常easy。没有什么比操作更socket,能够connect它认为,port打开。

import socket def scan(port): s = socket.socket() if s.connect_ex(('localhost', port)) == 0: print port, 'open' s.close() if __name__ == '__main__': map(scan,range(1,65536)) 这样一个最简单的端口扫描器出来了。

等等喂,半天都没反应,那是由于socket是堵塞的,每次连接要等非常久才超时。

我们自己给它加上的超时。

s.settimeout(0.1)

再跑一遍,感觉快多了。

多线程版本号

import socket import threading def scan(port): s = socket.socket() s.settimeout(0.1) if s.connect_ex(('localhost', port)) == 0: print port, 'open' s.close() if __name__ == '__main__': threads = [threading.Thread(target=scan, args=(i,)) for i in xrange(1,65536)] map(lambda x:x.start(),threads) 执行一下,哇,好快,快到抛出错误了。thread.error: can't start new thread。

想一下,这个进程开启了65535个线程,有两种可能。一种是超过最大线程数了,一种是超过最大socket句柄数了。在linux能够通过ulimit来改动。

假设不改动最大限制,怎么用多线程不报错呢?

加个queue,变成生产者-消费者模式,开固定线程。

多线程+队列版本号

import socket import threading from Queue import Queue def scan(port): s = socket.socket() s.settimeout(0.1) if s.connect_ex(('localhost', port)) == 0: print port, 'open' s.close() def worker(): while not q.empty(): port = q.get() try: scan(port) finally: q.task_done() if __name__ == '__main__': q = Queue() map(q.put,xrange(1,65535)) threads = [threading.Thread(target=worker) for i in xrange(500)] map(lambda x:x.start(),threads) q.join() 这里开500个线程,不停的从队列取任务来做。

multiprocessing+队列版本号

总不能开65535个进程吧?还是用生产者消费者模式

import multiprocessing def scan(port): s = socket.socket() s.settimeout(0.1) if s.connect_ex(('localhost', port)) == 0: print port, 'open' s.close() def worker(q): while not q.empty(): port = q.get() try: scan(port) finally: q.task_done() if __name__ == '__main__': q = multiprocessing.JoinableQueue() map(q.put,xrange(1,65535)) jobs = [multiprocessing.Process(target=worker, args=(q,)) for i in xrange(100)] map(lambda x:x.start(),jobs) 注意这里把队列作为一个參数传入到worker中去,由于是process safe的queue。不然会报错。

还实用的是JoinableQueue()。顾名思义就是能够join()的。

gevent的spawn版本号

from gevent import monkey; monkey.patch_all(); import gevent import socket ... if __name__ == '__main__': threads = [gevent.spawn(scan, i) for i in xrange(1,65536)] gevent.joinall(threads) 注意monkey patch必须在被patch的东西之前import,不然会Exception KeyError.比方不能先import threading,再monkey patch.

gevent的Pool版本号

from gevent import monkey; monkey.patch_all(); import socket from gevent.pool import Pool ... if __name__ == '__main__': pool = Pool(500) pool.map(scan,xrange(1,65536)) pool.join()

concurrent.futures版本号

import socket from Queue import Queue from concurrent.futures import ThreadPoolExecutor ... if __name__ == '__main__': q = Queue() map(q.put,xrange(1,65536)) with ThreadPoolExecutor(max_workers=500) as executor: for i in range(500): executor.submit(worker,q)

版权声明:本文博客原创文章。博客,未经同意,不得转载。

转载于:https://www.cnblogs.com/bhlsheji/p/4737644.html

相关资源:数据结构—成绩单生成器
最新回复(0)