顶点小说进阶(多进程+协程)
建议:
看之前可以先看我之前发布的文章(异步优化与数据入库: 顶点小说爬虫进阶实战)
这篇文章基于上篇文章:进行了多进程处理,大大加快了爬取速度
案例:顶点小说完善(多进程)
优化思路:
- 导包:from multiprocessing import Pool
- 对于每一页的所有小说采用一个进程,建立进程池,for循环处向进程池添加任务(对于每一页的所有小说的处理封装成一个方法作为任务添加到进程池)
import asyncio
import logging
import time
import requests
from lxml import etree
import aiohttp
import aiomysql
from aiohttp import ContentTypeError
from multiprocessing import PoolCONCURRENCY = 4logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s : %(message)s')class Spider(object):def __init__(self):# 方便设置头部信息、代理IP、cookie信息等self.session = None# 设置协程数量self.semaphore = asyncio.Semaphore(CONCURRENCY)# 限制协程的并发数:# 如果并发数没有达到限制: 那么async with semaphore会瞬间执行完成,进入里面的正式代码中# 如果并发数已经达到了限制,那么其他的协程对象会阻塞在asyn with semaphore这个地方,直到正在运行的某个协程对象完成了,退出了,才会放行一个新的协程对象去替换掉这个已经完成的协程对象# 初始化数据库连接池async def init_pool(self):self.pool = await aiomysql.create_pool(host="127.0.0.1",port=3306,user="root",password="123456",db=f"dingdian",autocommit=True # Ensure autocommit is set to True for aiomysql)# 在 aiomysql.create_pool 方法中,不需要显式传递 loop 参数。aiomysql 会自动使用当前的事件循环(即默认的全局事件循环)。# 关闭数据库连接池async def close_pool(self):if self.pool:self.pool.close()await self.pool.wait_closed()# 获取url源码async def scrape_api(self, url):# 设置协程数量async with self.semaphore:logging.info(f"scraping {url}")try:async with self.session.get(url) as response:# 控制爬取(或请求)的速率,以避免对目标服务器造成过多的负荷或请求频率过高而被封禁或限制访问。await asyncio.sleep(1)# 在异步环境中,可能需要使用 response.content.read() 或 await response.text() 来获取文本内容。return await response.text()except ContentTypeError as e: # aiohttp 的 ContentTypeError 异常: 请求内容类型错误 或者 响应内容类型错误# exc_info=True 参数将导致 logging 模块记录完整的异常信息,包括栈跟踪,这对于调试非常有用。logging.error(f'error occurred while scraping {url}', exc_info=True)# 获取小说分类urlasync def get_type(self):url = "https://www.cdbxs.com/sort/"source = await self.scrape_api(url)href_lists = etree.HTML(source).xpath('//ul[@class="nav"]/li/a/@href')[2:-4]type_lists = []for href in href_lists:type_lists.append(f"{url}{href.split('/')[2]}/1/")# print(type_lists)return type_lists# 获取最大页async def get_max_page(self, first_page_url):source = await self.scrape_api(first_page_url)# print(source)max_page = etree.HTML(source).xpath('//a[13]/text()')return max_page# 获取小说列表页信息async def get_book_info(self, every_page_url):source = await self.scrape_api(every_page_url)book_lists = []lis = etree.HTML(source).xpath("//ul[@class='txt-list txt-list-row5']/li")for li in lis:book_id_url = li.xpath("span[@class='s2']/a/@href")[0]book_id = book_id_url.split('/')[3]# 书名book_name = li.xpath("span[@class='s2']/a/text()")[0]# 最新章节new_chapter = li.xpath("span[@class='s3']/a/text()")[0]# 作者author = li.xpath("span[@class='s4']/text()")[0]# 更新时间update_time = li.xpath("span[@class='s5']/text()")[0]source = await self.scrape_api(f"https://www.cdbxs.com{book_id_url}")# 字数font_num = etree.HTML(source).xpath("//p[6]/span/text()")[0]# 摘要summary = etree.HTML(source).xpath("//div[@class='desc xs-hidden']/text()")[0]# 以元组添加至 book_lists# print((book_id, book_name, new_chapter, author, update_time, font_num, summary))book_lists.append((book_id, book_name, new_chapter, author, update_time, font_num, summary))return book_lists# 获取章节urlsasync def get_chapter_urls(self, chapter_list_url):source = await self.scrape_api(chapter_list_url)# 章节urlchapter_urls = map(lambda x: "https://www.cdbxs.com" + x, etree.HTML(source).xpath("//div[@class='section-box'][2]/ul[@class='section-list fix']/li/a/@href | //div[@class='section-box'][1]/ul[@class='section-list fix']/li/a/@href"))return chapter_urls# 获取章节详情信息async def get_chapter_info(self, chapter_url):source = await self.scrape_api(chapter_url)# 标题title = etree.HTML(source).xpath("//h1[@class='title']/text()")# 正文content = ''.join(etree.HTML(source).xpath("//div[@id='nb_content']/dd//text()"))if title:return f'\'{title[0]}\'', f'\'{content}\''else:return '', f'\'{content}\''# 入库async def save_to_mysql(self, table_name, table_column_str, table_info_str):async with self.pool.acquire() as conn:async with conn.cursor() as cursor:sql = f'insert into {table_name}({table_column_str}) values{table_info_str}'# 执行SQL语句await cursor.execute(sql)await conn.commit()async def main(self):# headersglobal poolheaders = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36 Edg/126.0.0.0"}# 建立异步请求需要的session(主要加header头信息以及代理,cookie等头信息)self.session = aiohttp.ClientSession(headers=headers)# 获取小说分类urltype_lists = await self.get_type()# 分类url默认为第一页for first_page_url in type_lists:# 获取带分类的url的前半截type_url = first_page_url.split('1')[0]# 获取此分类下最大页max_page = await self.get_max_page(first_page_url)# 生成此分类下每一页urlfor every_page in range(1, int(max_page[0]) + 1):every_page_url = f"{type_url}{every_page}/"# 获取小说列表页信息book_info_lists = await self.get_book_info(every_page_url)# 创建进程池pool = Pool(16)for book_info in book_info_lists:# 多进程抓取每本小说pool.apply_async(await self.run(book_info))# 关闭进程池,即停止接受新的任务。pool.close()# 等待所有的子进程执行结束。它会阻塞主进程,直到进程池中所有的任务都被执行完毕,然后才会继续执行主进程后面的代码。# 调用 join() 方法之前,应该先调用 close() 方法来确保不会再有新的任务被提交进来。pool.join()# 关闭连接池self.close_pool()# 关闭连接await self.session.close()# run方法: 抓取每一本小说的所有章节async def run(self, book_info):print(f"爬取小说:{book_info[1]}...")# 初始化数据库连接池await self.init_pool()# 入库小说信息await self.save_to_mysql('books','book_id, book_name, new_chapter, author, update_time, font_num, summary',book_info)# 获取章节urlsbook_id = book_info[0]chapter_urls = await self.get_chapter_urls(f"https://www.cdbxs.com/booklist/b/{book_id}/1")# 多协程抓取小说各个章节# 生成scrape_detail任务列表scrape_detail_tasks = [asyncio.ensure_future(self.get_chapter_info(chapter_url)) for chapter_url inchapter_urls]# 并发执行任务,获取结果chapter_details = list(await asyncio.gather(*scrape_detail_tasks)) # await asyncio.gather(*scrape_detail_tasks生成元组# 入库# 1.添加book_id 到 chapter_detailfor i in range(len(chapter_details)):chapter_detail = list(chapter_details[i])chapter_detail.append(book_id)chapter_detail = tuple(chapter_detail)chapter_details[i] = chapter_detail# 2.保存至数据库[await self.save_to_mysql('chapters', 'chapter_name,chapter_content, bid',chapter_detail) for chapter_detail in chapter_details]
if __name__ == '__main__':# 开始时间start_time = time.time()# 初始化Spiderspider = Spider()# 创建事件循环池loop = asyncio.get_event_loop()# 注册loop.run_until_complete(spider.main())# 结束时间end_time = time.time()logging.info(f'total time: {end_time - start_time}')
后续发布爬虫更多精致内容(按某培训机构爬虫课程顺序发布,欢迎关注后续发布)