在我们爬取数据时,经常会遇到I/O操作,比如下载数据、等待服务器响应等,此时我们可以使用异步爬虫实现高效率数据爬取,而不用原地等待结果的返回。
一、异步爬虫的方式
1.1、多线程/进程
优点
可为阻塞的操作单独开启线程或者进程,阻塞操作就可以异步执行。
缺点
不能无限制开启多线程、进程,这样会严重影响系统的正常运行。
1.2、线程/进程池:
优点
可以降低系统对进程和线程的创建、销毁的频率,从而低系统资源的开销。
缺点
池中线程或进程的数量是有上限。
1.3、同步异步的差距
在未使用异步的情况下,同样的任务需要6秒,而使用异步后仅需要2秒。
同步
import time
start_time = time.time()
def print_str(string):
print('正在下载', string)
time.sleep(2) # 模拟下载延迟
print(string, '下载完成')
test_list = ['sanxi', 'andy', 'hello']
for i in test_list:
print_str(i)
end_time = time.time() - start_time
print(end_time)
# 执行结果
正在下载 sanxi
sanxi 下载完成
正在下载 andy
andy 下载完成
正在下载 hello
hello 下载完成
6.005851984024048
异步
from multiprocessing.dummy import Pool # 导入线程池
import time
start_time = time.time()
def print_str(string):
print('正在下载', string)
time.sleep(2)
print(string, '下载完成')
test_list = ['sanxi', 'andy', 'hello']
pool = Pool(3) # 实例化生成Pool对象,参数为池大小,不填默认不限制。
pool.map(print_str, test_list) #
end_time = time.time() - start_time
print(end_time)
# 执行结果
正在下载 sanxi
正在下载 andy
正在下载 hello
andy 下载完成
sanxi 下载完成
hello 下载完成
2.0048611164093018
1.4、实例:异步爬取视频
异步和使用原则:异步只应该用在I/O操作,即有阻塞且耗时较长的操作,不能乱用!
我这里异步和同步实际花费时间是一样的,因为我带宽很小,无论你是异步还是同步,我带宽只有2M一秒,而这个视频是超过两兆,这意味着无论我同时开启多少个线程下载都没用,带宽就那么点。
from multiprocessing.dummy import Pool
import requests
from lxml import etree
import os
import time
if not os.path.exists('video'):
os.mkdir('video')
url = 'https://www.2amok.com/integral.html'
headers = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.182 Safari/537.36'
}
video_list = []
response = requests.get(url=url, headers=headers).text
link_list = etree.HTML(response).xpath('/html/body/div[8]/div[1]/div[@class="Recommended-main-li recent-height amok-vdeo"]')
for u in link_list:
start_time = time.time()
detail_url = u.xpath('./a/@href')[0]
detail = requests.get(url=detail_url, headers=headers).text
video_url = etree.HTML(detail).xpath('/html/body/div[6]/div[1]/div[3]/div/video/@src')[0]
video_name = etree.HTML(detail).xpath('/html/body/div[6]/div[1]/div[3]/div/video/@alt')[0] + '.mp4'
video_dict = {
'name': video_name,
'url': video_url
}
video_list.append(video_dict)
def get_video(dicts):
start = time.time()
video = requests.get(url=video_url, headers=headers).content
with open(f'video/{video_name}', 'wb') as file:
file.write(video)
end = time.time() - start
print(video_name, '下载完毕!耗时:', end)
start_time = time.time()
pool = Pool(3)
pool.map(get_video, video_list)
pool.close()
pool.join()
end_time = time.time() - start_time
print('总耗时:', end_time)
二、协程基础
针对爬虫,这里推荐单线程+异步协程的方式,来简单了解一下。
协程是Python 3.4+出现的,3.6后对协程的使用进行了简化封装,详情见Python官方文档:https://docs.python.org/zh-cn/3/library/asyncio-task.html。
2.1、协程
协程 通过 async/await 语法进行声明,是编写 asyncio 异步应用的推荐方式。
- async:定义一个协程.
- await:用来挂起阻塞方法的执行.
2.2、事件循环
get_event_loop,相当于一个无限循环,我们可以把一些函数注册到这个事件循环上,当满足某些条件的时候,函数就会被循环执行,跟Linux的crontab有点像。
import asyncio
async def print_str(name):
print(name)
print(666)
loop = asyncio.get_event_loop() # 创建事件循环对象
loop.run_until_complete(print_str('sanxi')) # 将该对象注册到loop中并启动,返回值即函数返回值
# 执行结果
sanxi
666
2.3、协程对象
这是Python官方3.9.4文档中所描述文档,非常容易理解。
- 协程函数: 定义形式为
async def
的函数; - 协程对象: 调用 协程函数 所返回的对象。
2.4、任务
任务是对协程对象的进一步封装,包含了任务的各个状态信息,被用来“并行地”调度协程,当一个协程通过 asyncio.create_task()
等函数被封装为一个 任务,该协程会被自动调度执行:
import asyncio
async def print_str():
print('sanxi666')
loop = asyncio.get_event_loop()
task = loop.create_task(print_str()) # 创建任务
loop.run_until_complete(task) # 执行
# 执行结果
sanxi666
2.5、Future 对象
Future
是一种特殊的 低层级 可等待对象,表示一个异步操作的 最终结果。当一个 Future 对象 被等待,这意味着协程将保持等待直到该 Future 对象在其他地方操作完毕。
因此它代表将来执行或还未执行的任务,和task没有本质区别!在 asyncio 中需要 Future 对象以便允许通过 async/await 使用基于回调的代码,但通常情况下 没有必要 在应用层级的代码中创建 Future 对象。
import asyncio
async def print_str():
print('sanxi666')
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(print_str()) # 等待执行
print(task)
loop.run_until_complete(task) # 开始执行
print(task)
# 执行结果,第一条pending即future对象未执行,第三条finished执行完毕!
<Task pending name='Task-1' coro=<print_str() running at /home/sanxi/PycharmProjects/study/spider.py:4>>
sanxi666
<Task finished name='Task-1' coro=<print_str() done, defined at /home/sanxi/PycharmProjects/study/spider.py:4> result=None>
2.6、绑定回调
添加一个在 Future 完成 时运行的回调函数,调用 callback 时,Future 对象是它的唯一参数,调用这个方法时 Future 已经 完成 , 回调函数已被 loop.call_soon()
调度。
它用在什么地方?
一个回调,它将在 Task 对象 完成 时被运行,此方法应该仅在低层级的基于回调的代码中使用。
import asyncio
async def print_str():
print('sanxi666')
def callback(task):
print(task.result) # result返回值即任务对象中封装的协程对象所对应的函数返回值
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(print_str())
task.add_done_callback(callback) # 将回调函数绑定到任务对象中
loop.run_until_complete(task)
# 执行结果
sanxi666
<built-in method result of _asyncio.Task object at 0x7f738d5c2860>
2.7、多任务协程
import asyncio
import time
async def print_str(string):
print(string, '正在跑步')
# 当异步协程中出现同步(阻塞)操作,那么就无法实现异步;在asyncio中出现阻塞,需要手动挂起!
await asyncio.sleep(2)
print(string, '运动完毕!')
start_time = time.time()
task_list = []
name_list = ['sanxi', 'andy', 'haha']
for name in name_list:
task = asyncio.ensure_future(print_str(name))
task_list.append(task)
loop = asyncio.get_event_loop()
# 将任务列表传递给wait,此处类似于join(),等待给定的future和协程完成。
loop.run_until_complete(asyncio.wait(task_list))
print('结束时间:', time.time() - start_time)
# 执行结果
sanxi 正在跑步
andy 正在跑步
haha 正在跑步
sanxi 运动完毕!
andy 运动完毕!
haha 运动完毕!
结束时间: 2.0026283264160156
2.8、异步网络请求模块
一定要记得挂起I/O的任务,不然拿不到返回结果,如果是post请求,将get改成post即可,用法跟requests差不多,如果要设置IP代理,用proxy而不是requests的proxies,proxy后面直接跟字符串而不是字典。
import asyncioimport aiohttpurl_list = ['https://www.baidu.com/', 'https://github.com']async def get_page(url): async with aiohttp.ClientSession() as session: async with await session.get(url) as response: page_text = await response.text() print('下载完成')task_list = []for url in url_list: task = asyncio.ensure_future(get_page(url)) task_list.append(task)loop = asyncio.get_event_loop()loop.run_until_complete(asyncio.wait(task_list))