5、异步爬虫

爬虫快速入门 / 2021-06-02

在我们爬取数据时,经常会遇到I/O操作,比如下载数据、等待服务器响应等,此时我们可以使用异步爬虫实现高效率数据爬取,而不用原地等待结果的返回。

一、异步爬虫的方式

1.1、多线程/进程

优点

可为阻塞的操作单独开启线程或者进程,阻塞操作就可以异步执行。

缺点

不能无限制开启多线程、进程,这样会严重影响系统的正常运行。

1.2、线程/进程池:

优点

可以降低系统对进程和线程的创建、销毁的频率,从而低系统资源的开销。

缺点

池中线程或进程的数量是有上限。

1.3、同步异步的差距

在未使用异步的情况下,同样的任务需要6秒,而使用异步后仅需要2秒。

同步
import time


start_time = time.time()


def print_str(string):
    print('正在下载', string)
    time.sleep(2)  # 模拟下载延迟
    print(string, '下载完成')


test_list = ['sanxi', 'andy', 'hello']

for i in test_list:
    print_str(i)
end_time = time.time() - start_time
print(end_time)

# 执行结果
正在下载 sanxi
sanxi 下载完成
正在下载 andy
andy 下载完成
正在下载 hello
hello 下载完成
6.005851984024048
异步
from multiprocessing.dummy import Pool  # 导入线程池
import time


start_time = time.time()


def print_str(string):
    print('正在下载', string)
    time.sleep(2)
    print(string, '下载完成')


test_list = ['sanxi', 'andy', 'hello']

pool = Pool(3)  # 实例化生成Pool对象,参数为池大小,不填默认不限制。
pool.map(print_str, test_list)  # 
end_time = time.time() - start_time
print(end_time)

# 执行结果
正在下载 sanxi
正在下载 andy
正在下载 hello
andy 下载完成
sanxi 下载完成
hello 下载完成
2.0048611164093018

1.4、实例:异步爬取视频

异步和使用原则:异步只应该用在I/O操作,即有阻塞且耗时较长的操作,不能乱用!

我这里异步和同步实际花费时间是一样的,因为我带宽很小,无论你是异步还是同步,我带宽只有2M一秒,而这个视频是超过两兆,这意味着无论我同时开启多少个线程下载都没用,带宽就那么点。

from multiprocessing.dummy import Pool
import requests
from lxml import etree
import os
import time


if not os.path.exists('video'):
    os.mkdir('video')
url = 'https://www.2amok.com/integral.html'
headers = {
    'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.182 Safari/537.36'
}

video_list = []
response = requests.get(url=url, headers=headers).text
link_list = etree.HTML(response).xpath('/html/body/div[8]/div[1]/div[@class="Recommended-main-li recent-height amok-vdeo"]')


for u in link_list:
    start_time = time.time()
    detail_url = u.xpath('./a/@href')[0]

    detail = requests.get(url=detail_url, headers=headers).text
    video_url = etree.HTML(detail).xpath('/html/body/div[6]/div[1]/div[3]/div/video/@src')[0]
    video_name = etree.HTML(detail).xpath('/html/body/div[6]/div[1]/div[3]/div/video/@alt')[0] + '.mp4'
    video_dict = {
        'name': video_name,
        'url': video_url
    }
    video_list.append(video_dict)


def get_video(dicts):
    start = time.time()
    video = requests.get(url=video_url, headers=headers).content
    with open(f'video/{video_name}', 'wb') as file:
        file.write(video)
        end = time.time() - start
        print(video_name, '下载完毕!耗时:', end)


start_time = time.time()
pool = Pool(3)
pool.map(get_video, video_list)
pool.close()
pool.join()
end_time = time.time() - start_time
print('总耗时:', end_time)

二、协程基础

针对爬虫,这里推荐单线程+异步协程的方式,来简单了解一下。

协程是Python 3.4+出现的,3.6后对协程的使用进行了简化封装,详情见Python官方文档:https://docs.python.org/zh-cn/3/library/asyncio-task.html。

2.1、协程

协程 通过 async/await 语法进行声明,是编写 asyncio 异步应用的推荐方式。

  • async:定义一个协程.
  • await:用来挂起阻塞方法的执行.

2.2、事件循环

get_event_loop,相当于一个无限循环,我们可以把一些函数注册到这个事件循环上,当满足某些条件的时候,函数就会被循环执行,跟Linux的crontab有点像。

import asyncio


async def print_str(name):
    print(name)
    print(666)

loop = asyncio.get_event_loop()  # 创建事件循环对象
loop.run_until_complete(print_str('sanxi'))  # 将该对象注册到loop中并启动,返回值即函数返回值

# 执行结果
sanxi
666

2.3、协程对象

这是Python官方3.9.4文档中所描述文档,非常容易理解。

  • 协程函数: 定义形式为 async def的函数;
  • 协程对象: 调用 协程函数 所返回的对象。

2.4、任务

任务是对协程对象的进一步封装,包含了任务的各个状态信息,被用来“并行地”调度协程,当一个协程通过 asyncio.create_task() 等函数被封装为一个 任务,该协程会被自动调度执行:

import asyncio


async def print_str():
    print('sanxi666')


loop = asyncio.get_event_loop()
task = loop.create_task(print_str())  # 创建任务
loop.run_until_complete(task)  # 执行

# 执行结果
sanxi666

2.5、Future 对象

Future 是一种特殊的 低层级 可等待对象,表示一个异步操作的 最终结果。当一个 Future 对象 被等待,这意味着协程将保持等待直到该 Future 对象在其他地方操作完毕。

因此它代表将来执行或还未执行的任务,和task没有本质区别!在 asyncio 中需要 Future 对象以便允许通过 async/await 使用基于回调的代码,但通常情况下 没有必要 在应用层级的代码中创建 Future 对象。

import asyncio


async def print_str():
    print('sanxi666')


loop = asyncio.get_event_loop()
task = asyncio.ensure_future(print_str())  # 等待执行
print(task)
loop.run_until_complete(task)  # 开始执行
print(task)

# 执行结果,第一条pending即future对象未执行,第三条finished执行完毕!
<Task pending name='Task-1' coro=<print_str() running at /home/sanxi/PycharmProjects/study/spider.py:4>>
sanxi666
<Task finished name='Task-1' coro=<print_str() done, defined at /home/sanxi/PycharmProjects/study/spider.py:4> result=None>

2.6、绑定回调

添加一个在 Future 完成 时运行的回调函数,调用 callback 时,Future 对象是它的唯一参数,调用这个方法时 Future 已经 完成 , 回调函数已被 loop.call_soon()调度。

它用在什么地方?

一个回调,它将在 Task 对象 完成 时被运行,此方法应该仅在低层级的基于回调的代码中使用。

import asyncio


async def print_str():
    print('sanxi666')


def callback(task):
    print(task.result)  # result返回值即任务对象中封装的协程对象所对应的函数返回值


loop = asyncio.get_event_loop()
task = asyncio.ensure_future(print_str())
task.add_done_callback(callback)  # 将回调函数绑定到任务对象中
loop.run_until_complete(task)

# 执行结果
sanxi666
<built-in method result of _asyncio.Task object at 0x7f738d5c2860>

2.7、多任务协程

import asyncio
import time


async def print_str(string):
    print(string, '正在跑步')
    # 当异步协程中出现同步(阻塞)操作,那么就无法实现异步;在asyncio中出现阻塞,需要手动挂起!
    await asyncio.sleep(2)  
    print(string, '运动完毕!')


start_time = time.time()
task_list = []
name_list = ['sanxi', 'andy', 'haha']
for name in name_list:
    task = asyncio.ensure_future(print_str(name))
    task_list.append(task)


loop = asyncio.get_event_loop()
# 将任务列表传递给wait,此处类似于join(),等待给定的future和协程完成。
loop.run_until_complete(asyncio.wait(task_list))  
print('结束时间:', time.time() - start_time)

# 执行结果
sanxi 正在跑步
andy 正在跑步
haha 正在跑步
sanxi 运动完毕!
andy 运动完毕!
haha 运动完毕!
结束时间: 2.0026283264160156

2.8、异步网络请求模块

一定要记得挂起I/O的任务,不然拿不到返回结果,如果是post请求,将get改成post即可,用法跟requests差不多,如果要设置IP代理,用proxy而不是requests的proxies,proxy后面直接跟字符串而不是字典。

import asyncioimport aiohttpurl_list = ['https://www.baidu.com/', 'https://github.com']async def get_page(url):    async with aiohttp.ClientSession() as session:        async with await session.get(url) as response:            page_text = await response.text()            print('下载完成')task_list = []for url in url_list:    task = asyncio.ensure_future(get_page(url))    task_list.append(task)loop = asyncio.get_event_loop()loop.run_until_complete(asyncio.wait(task_list))
世间微尘里 独爱茶酒中