整体架构设计
- 任务调度层:负责接收待爬取的网站列表,并将任务分配给不同的进程。可以采用任务队列的方式,将每个网站的爬取任务封装成一个任务对象放入队列。
- 进程层:启动多个进程,每个进程从任务队列中获取任务并执行。进程数量根据系统资源(如CPU核心数)来确定,以充分利用多核优势。
- 页面爬取层:每个进程内部使用异步编程来实现页面的高效爬取。利用异步I/O操作,在等待网络响应时可以切换到其他任务执行,提高资源利用率。
任务分配给不同进程
- 任务队列:使用
multiprocessing.Queue
来创建任务队列。任务调度层将每个网站的爬取任务(可以是一个包含网站URL和相关配置的对象)放入队列。
- 进程启动:通过
multiprocessing.Process
启动多个进程,每个进程在启动时连接到任务队列,不断从队列中获取任务并执行。例如:
import multiprocessing
def worker(task_queue):
while True:
task = task_queue.get()
if task is None:
break
# 执行爬取任务
crawl_task(task)
if __name__ == '__main__':
task_queue = multiprocessing.Queue()
num_processes = multiprocessing.cpu_count()
processes = []
for _ in range(num_processes):
p = multiprocessing.Process(target=worker, args=(task_queue,))
p.start()
processes.append(p)
# 添加任务到队列
for website in websites_to_crawl:
task_queue.put(website)
# 结束进程
for _ in range(num_processes):
task_queue.put(None)
for p in processes:
p.join()
异步操作在每个进程内实现
- 使用异步库:在Python中,可以使用
asyncio
库来实现异步操作。对于每个页面的请求,可以使用 aiohttp
库进行异步HTTP请求。例如:
import asyncio
import aiohttp
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def crawl_page(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
- 事件循环:在每个进程内部,创建一个事件循环来管理异步任务的执行。在
worker
函数中,可以这样使用:
def worker(task_queue):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
while True:
task = task_queue.get()
if task is None:
break
urls = get_page_urls(task) # 获取该网站下所有页面的URL
result = loop.run_until_complete(crawl_page(urls))
# 处理爬取结果
process_result(result)
loop.close()
进程间通信和数据共享
- 进程间通信:
- 使用队列:除了任务队列用于分配任务,还可以使用
multiprocessing.Queue
来传递爬取的结果。每个进程将爬取到的数据放入结果队列,由主进程或其他专门的进程进行统一处理。
- 使用管道:
multiprocessing.Pipe
可以用于两个进程之间的直接通信,例如在主进程和某个特定进程之间传递控制信号。
- 数据共享:
- 共享内存:对于一些需要共享的只读数据(如配置信息),可以使用
multiprocessing.Value
和 multiprocessing.Array
来实现共享内存。例如,如果有一个全局的用户代理字符串需要所有进程使用:
from multiprocessing import Value
ua_string = Value('s', b'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36')
- **分布式存储**:对于大量的爬取数据,采用分布式存储方案,如Redis或数据库(如MySQL、MongoDB)。每个进程将数据直接存储到这些共享存储中,避免在进程间传递大量数据。