我正在参与「启航计划」,今天在外面吃饭,手机里就来了短信。的运营小伙伴邀请我成为定向作者,很侥幸能让运营的小伙伴找到,那这个系列我就先发一些爬虫的实战的文章吧,不知道能不能过审。
第一次宣布实战类型的爬虫文章,假如有那里不明白或许出现bug的能够找我私信,欢迎大家在下面谈论,能够给出我更好的建议,欢迎大家纠正.
网站链接放在这儿了鬼吹灯
主要是以协程为主来爬取小说得章节内容,协程爬取不懂得小伙伴能够先关注我一手,后续会整理理论的知识放在专栏里
整体思路
- 得到鬼吹灯页面的源码
- 解析源码得到每一个章节的url
- 得到书名,这个书名经过切片得到
- 经过url得到一个页面的内容
- 运用并发履行多个使命下载
代码实现
导包
import asyncio
import os
from aiohttp import ClientSession
import requests
import aiofiles
from bs4 import BeautifulSoup
得到页面源码的办法
参数是传入的url
回来出页面的源码
def get_page_source(url):
"""
获取页面源码的办法
:param url: 传入的url
:return: 回来的是页面的源码
"""
headers={
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 Edg/109.0.1518.78'
}
res = requests.get(url,headers=headers)
data = res.content.decode()
return data
解析页面得到url
在这儿,我运用了调集来存储每一个章节的url,运用xpath来得到章节url,我个人是比较喜欢运用xpath,在这儿给出另一种写法,运用的是的beautifulSoup
在页面F12检查,咱们找到的是div下的ul下的li下的a标签的属性href
写法一:运用xpath
def parse_page_source(html):
"""
对页面进行解析,得到咱们每一个章节的url
:param html: 传入的页面源码
:return: 回来的是一个带有一切章节url的调集
"""
book_list=[]
tree = etree.HTML(html)
mulu_list = tree.xpath('//div[@class="mulu-list quanji"]')
for mulu in mulu_list:
# 抓取整个页面下章节的url
a = mulu.xpath('./ul/li/a/@href')
# 注意这儿必定要在for循环里增加调集
book_list.append(a)
return book_list
写法二:bs4
def parse_page_source(html):
"""
对页面进行解析,得到咱们每一个章节的url
:param html: 传入的页面源码
:return: 回来的是一个带有一切章节url的调集
"""
book_list = []
soup = BeautifulSoup(html, 'html.parser')
a_list = soup.find_all('div', attrs={'class': 'mulu-list quanji'})
for a in a_list:
a_list = a.find_all('a')
for href in a_list:
chapter_url = href['href']
book_list.append(chapter_url)
return book_list
得到和章节名
经过传入的章节url,进行切片以下面一个链接为例
https://www.51shucheng.net/daomu/guichuideng/jingjuegucheng/2464.html
咱们按照/切分就有了[‘https:’, ”, ‘www.51shucheng.net’, ‘daomu’, ‘guichuideng’, ‘jingjuegucheng’, ‘2464.html’] 然后咱们取倒数第二个元素就有了b_name
def get_book_name(chapter_url):
"""
得到称号,为了后续下载好分辨
:param chapter_url:
:return:
"""
book_chapter_name = chapter_url.split('/')[-2]
return book_chapter_name
下载一个章节的内容
运用xpath来写
async def aio_download_one_content(chapter_url, single):
"""
下载一个章节内容
:param chapter_url: 传入得到的章节url
:param single: 运用async with single便是10个并发
:return:
"""
c_name = get_book_name(chapter_url)
for i in range(10):
try:
async with single:
async with ClientSession() as session:
async with session.get(chapter_url) as res:
# 得到章节内容的页面的源码
page_source = await res.content.read()
tree = etree.HTML(page_source)
# 章节称号
base_title = tree.xpath('//h1/text()')[0]
if(':'in base_title):
number = base_title.split(':')[0]
con = base_title.split(':')[1]
title = number+con
else:
title=base_title
# 章节内容
content = tree.xpath('//div[@class="neirong"]/p/text()')
chapter_content = '\n'.join(content)
if not os.path.exists(f'{book_name}/{c_name}'):
os.makedirs(f'{book_name}/{c_name}')
async with aiofiles.open(f'{book_name}/{c_name}/{title}.txt', mode="w",
encoding='utf-8') as f:
await f.write(chapter_content)
print(chapter_url, "下载结束!")
return ""
except Exception as e:
print(e)
print(chapter_url, "下载失利!, 从头下载. ")
return chapter_url
这段代码单独拿出来是因为有的章节称号是这样的<<第19章 : 考古队>>,这样的数据是不对的,放在文件里无法命名,这就导致了后续能写入文件的只有章节名没有:的内容,所以我对第一次筛选出的数据进行切片,假如遇到:就把前面和后面的数据切出来在组合,假如没遇到就让一个新的变量来接收base_title
# 章节称号
base_title = tree.xpath('//h1/text()')[0]
if(':'in base_title):
number = base_title.split(':')[0]
con = base_title.split(':')[1]
title = number+con
else:
title=base_title
运用bs4来写
async def aio_download_one(chapter_url, signal):
"""
下载一个章节内容
:param chapter_url: 传入得到的章节url
:param single: 运用async with single便是10个并发
:return:
"""
c_name = get_book_name(chapter_url)
for c in range(10):
try:
async with signal:
async with aiohttp.ClientSession() as session:
async with session.get(chapter_url) as resp:
# 得到章节内容的页面的源码
page_source = await resp.text()
soup = BeautifulSoup(page_source, 'html.parser')
# 章节称号
base_title = soup.find('h1').text
if (':' in base_title):
number = base_title.split(':')[0]
con = base_title.split(':')[1]
title = number + con
else:
title = base_title
# 章节内容
p_content = soup.find('div', attrs={'class': 'neirong'}).find_all('p')
content = [p.text + '\n' for p in p_content]
chapter_content = '\n'.join(content)
if not os.path.exists(f'{book_name}/{c_name}'):
os.makedirs(f'{book_name}/{c_name}')
async with aiofiles.open(f'{book_name}/{c_name}/{title}.txt', mode="w",
encoding='utf-8') as f:
await f.write(chapter_content)
print(chapter_url, "下载结束!")
return ""
except Exception as e:
print(e)
print(chapter_url, "下载失利!, 从头下载. ")
return chapter_url
协程下载
async def aio_download(url_list):
# 创建一个使命列表
tasks = []
# 设置最多10个使命并行运作
semaphore = asyncio.Semaphore(10)
for h in url_list:
tasks.append(asyncio.create_task(aio_download_one_content(h, semaphore)))
await asyncio.wait(tasks)
主函数运转
主函数运转就没什么可说的了,这儿注意一点便是最终不要loop.close(),这样的话会导致你还没有爬取完数据,loop.close()就会关闭,状况如下,还剩一点就爬完了,成果报错了
if __name__ == '__main__':
url = 'https://www.51shucheng.net/daomu/guichuideng'
book_name = '鬼吹灯'
if not os.path.exists(book_name):
os.makedirs(book_name)
source = get_page_source(url)
href_list = parse_page_source(source)
loop = asyncio.get_event_loop()
loop.run_until_complete(aio_download(href_list))
完结效果图
我就不逐个截图了
总结
为什么我在这儿比对了xpath和bs4两种代码,小伙伴能够仔细看一下,在xpath中,我想拿到数据,找到它,大量的运用了//这种,这样的话就会从源码内全局检索,这就导致了我想爬取文章内容会很慢,有些时分还会超时导致报错.所以咱们运用xpath的时分,想让他的速度进步,最好有一个指定的点 到了再//
能够理解为下面这种状况
def getList():
url = 'https://www.xiurenba.cc/XiuRen/'
response = requests.get(url, headers=headers)
data = response.c
ontent.decode()
# print(data)
tree = etree.HTML(data)
li_list = tree.xpath('//ul[@class="update_area_lists cl"]/li')
return li_list
def get_single_url():
li_list = getList()
for li in li_list:
single_url = 'https://www.xiurenba.cc' + li.xpath('./a/@href')[0]
还有便是遇到了特殊符号要把它干掉,或许替换掉,这样就能够正常爬取数据
假如有小伙伴想要直接拿取源码的话,能够顺着代码实现一步步粘贴过去