系统分为四个模块:
考虑到master服务器是整个系统的核心部分,与各个组件都有关联,其效率一定要高,所以网络通信的方式采用异步IO+ 协程方式,即能简化编程,又能提高并发效率,master服务器要进行url的比对过程,要存储大量的爬取过的url,如果存放在磁盘上,要牵扯到大量的磁盘IO的操作,所以这里采用了redis数据库,将数据存放在内存上,提高速度。
solveServer是分布式存储的子节点,这里单个的solveServer节点之间不存在通信的问题,简化了编程,每个solve节点只需要与masterServer节点进行通信即可。
1、给定一个初始的url进行爬取 2、提取出爬取的网页中的url,发送给master,接收经过master处理后的url,迭代进行爬取
采用bs4获取网页的文本的内容采用jieba库提取网页的关键字与关键字在网页中的权重 (爬虫模块采用集群式的爬虫,每个单个的节点采用多线程的并发模式)1、获取用户输入的关键字,将关键字发送给masterServer 2、接收masterServer返回的结果,将结果反馈给用户 这里将webServer于文件存储模块分开,方便webServer的集群的设计,也方便了数据库的分布式部署。这里并没有将webServer设计为集群模式,我只是做了基本的功能实现,webSerber与数据的存储已经分开了,所以webServer的集群设计应该会很容易实现
master监听两个端口,一个负责接受web端的链接,一个负责接受solve端的链接
def createSockte(): serverSocket1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM) serverSocket2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM) host = socket.gethostname() port1 = 6666 serverSocket1.bind((host,port1)) serverSocket1.listen(5) port2 = 6667 serverSocket2.bind((host,port2)) serverSocket2.listen(5) return serverSocket1, serverSocket2master采用多协程的并发模式,简化编程,提高并发
def main(): while True: g1 = gevent.spawn(acceptConn, serverSocket1) g2 = gevent.spawn(acceptConn, serverSocket2) # gevent.sleep(1) g1.join() g2.join()在redis数据库中存储已经爬取过的url,
# 处理url是否重复 def dealSolveResoult(urls): # 查询是否重复 non_urls = [] for url in urls: if not Redis.Sismember(r, url, 1): non_urls.append(url) Redis.Add(r, url, 1) count = len(conn_solve) gap = (len(non_urls) // count) + 1 i = 0 for conn in conn_solve: res_pack, res_pro = Pack.packUrl(non_urls[i:i+gap]) i += gap send_str = res_pro + "#" len_str = str(len(send_str) + len(res_pack)) while len(len_str) < 10: len_str += "#" send_str = len_str + send_str conn.send(send_str.encode("gb2312")) conn.send(res_pack)计算网页的对用户的价值,按照价值的大小排序后返回
将网页中的url提取出来,并且提取出链接的说明文字,建立链接说明文字与链接之间的关系
# 获取网页中的超链接 def getUrl(db, soup, page): links = soup('a') urls = set() # words = [] for link in links: if 'href' in dict(link.attrs): url = urllib.parse.urljoin(page, link['href']) url = url.split('<')[0] url = url.split('#')[0] if url[:4] != 'http': continue urls.add(url) link = str(link) index_i = 0 index_j = 0 for i in range(len(link)): if link[i] == '>': index_i = i+1 if index_i != 0 and link[i] == '<': index_j = i break res = splitWord(link[index_i:index_j]) createLinkWordIndex(db, res, url) return urls负责发送查询的数据到masterServer 接受master的相应数据
def readData(conn): len_pack = conn.recv(10).decode("gb2312") len_pack = int(len_pack.split("#")[0]) li = list() while len_pack != 0: data = conn.recv(len_pack) if len(data) <= len_pack: li.append(data) len_pack -= len(data) resoult = bytes() for l in li: resoult += l resoult = resoult.decode("gb2312").split("#") resoult = Pack.unpackUrl(resoult[0], resoult[1].encode("gb2312")) return resoult def writeData(conn, sentence): len_str = str(len(sentence.encode("gb2312"))) while len(len_str) < 10: len_str += "#" send_str = len_str + str(sentence) print(send_str) conn.send(send_str.encode("gb2312"))主界面 查询结果界面