异步编程

传统的多线程同步编程模型中函数调用是阻塞(Blocking)的,即一行代码执行完后才能执行下一行,对于IO密集型应用,阻塞会导致浪费大量线程资源,程序效率低下。异步编程则是专为解决这类问题而设计的,异步编程中并发控制通常由事件循环实现,底层则使用操作系统提供的epoll(Linux)、IOCP(Windows)或kqueue(macOS)异步IO机制,异步编程模型下能以少量的线程即可实现高并发的IO密集型程序。

之前生成器章节我们已经介绍过,Python中早期版本就提供了基于生成器的协程机制,但没有提供真正的并发编程实现,并发通常是使用线程模拟或基于其它Native库实现的,不过生成器式协程是后续版本中的异步并发模型的基础,Python3.5后引入了原生的异步并发编程async/await关键字和asyncio库。这篇笔记我们简单介绍如何使用Python实现原生异步并发编程。

协程函数

Python中,协程函数使用async def定义,而等待协程执行完成使用await关键字,下面是一个例子。

async def foo():
    print('Start foo')
    await asyncio.sleep(1)
    print('End foo')

asyncio.sleep(1)本身也是一个协程,即异步版本的“睡眠函数”,我们使用await等待其消耗1秒的时间。

asyncio.run() 执行协程

如何启动协程函数呢?下面代码中,foo()是一个异步协程函数,它会在进入时打印一条信息,然后使用await暂停协程等待1秒,最后再打印一条信息。我们使用asyncio.run()调用协程函数,它会等待协程执行完成。

import asyncio


async def foo():
    print('Start foo')
    await asyncio.sleep(1)
    print('End foo')


if __name__ == '__main__':
    asyncio.run(foo())

asyncio.run()asyncio库中提供的一个函数,用来运行最高层级的协程并且管理事件循环的生命周期,它简化了异步程序的启动和结束过程。上面代码中,asyncio.run()虽然从调用者的角度看是“阻塞”的,它等待某个任务执行完成才会继续向下执行,但它内部其实会创建事件循环并调度和运行协程,这和直接阻塞式的同步调用是有区别的。

asyncio.gather() 并发调度协程

为了更直观的观察什么是异步并发,我们可以并发调度多个协程,这需要使用asyncio.gather(),下面是一个例子。

import asyncio
import threading


async def task(name):
    print(f'[{threading.get_ident()}] Start {name}')
    await asyncio.sleep(1)
    print(f'[{threading.get_ident()}] End {name}')


async def foo():
    print(f'[{threading.get_ident()}] creating tasks')
    await asyncio.gather(task('A'), task('B'), task('C'))


if __name__ == '__main__':
    print(f'[{threading.get_ident()}] Start')
    asyncio.run(foo())

asyncio.gather()函数用来并发执行多个协程,并等待它们全部执行完成。运行上面代码后我们可以看到3个协程是并发执行的,程序总共耗时1秒左右(而非3秒),此外我们还打印了线程号,我们可以看到3个协程的线程号是相同的,这正是异步并发模式的设计初衷,用单线程实现并发控制。

此外,asyncio.gather()也支持读取协程的返回值,下面是一个例子。

async def foo():
    print(f'[{threading.get_ident()}] creating tasks')
    results = await asyncio.gather(task('A'), task('B'), task('C'))
    print(f'[{threading.get_ident()}] results: {results}')

手动创建任务(Task)

除了直接使用asyncio.run()asyncio.gather()启动协程,asyncio库也支持手动创建任务并启动,这种方式适合需要更细粒度控制任务的场景,比如需要提前创建任务对象等情况。

async def foo():
    print(f'[{threading.get_ident()}] creating tasks')
    t1 = asyncio.create_task(task('A'))
    t2 = asyncio.create_task(task('B'))
    t3 = asyncio.create_task(task('C'))
    print(f'[{threading.get_ident()}] executing tasks')
    await asyncio.gather(t1, t2, t3)

asyncio.create_task()函数会基于协程函数创建Task任务对象,任务是对协程的封装,可以用于细粒度的并发控制。上面代码中我们创建了3个任务,然后并发启动了它们。

asyncio.wait_for() 超时控制

asyncio.wait_for()可以给协程指定一个超时时间,如果协程执行超时则中断协程,抛出asyncio.TimeoutError,下面是一个例子。

import asyncio
import threading


async def task(name):
    print(f'[{threading.get_ident()}] Start {name}')
    await asyncio.sleep(3)
    print(f'[{threading.get_ident()}] End {name}')
    return name


async def foo():
    try:
        await asyncio.wait_for(task('A'), timeout=1)
    except asyncio.TimeoutError:
        print('timeout!')


if __name__ == '__main__':
    asyncio.run(foo())

异步生成器

有时我们的协程需要一边运行一边流式的返回数据,这可以使用async for和异步生成器实现,下面是一个例子。

import asyncio


async def stream_data():
    s = 'Hello, world!'
    for c in s:
        yield c
        await asyncio.sleep(0.3)


async def foo():
    async for c in stream_data():
        print(c)


if __name__ == '__main__':
    asyncio.run(foo())

代码中,我们的协程内使用了yield逐个返回字符串中的字符,运行后,我们可以看到字符逐个打印。

异步上下文管理器

Python中,普通的同步函数可以使用with语法管理上下文资源,然而涉及到异步函数后,由于异步函数的调度执行顺序与普通函数不同,此时我们需要使用async with。例如aiofiles读取文件时,我们使用如下方式管理异步上下文。

import aiofiles
import asyncio


async def read_file():
    async with aiofiles.open('example.txt', 'r', encoding='utf-8') as f:
        content = await f.read()
        print(content)


asyncio.run(read_file())

实际上,普通with的上下文管理器用的是同步的__enter__()__exit__()方法,它们会阻塞事件循环;而async with使用的是异步的__aenter__()__aexit__()方法,它们理解为前者的异步版本,不会阻塞事件循环。

封装同步阻塞操作

实际开发中,当你真正决定使用异步编程模型时,通常很快就会发现一个巨大的问题:一旦采用异步编程模型就要求我们程序中所有同步阻塞操作替换为异步版本。例如读写文件需要使用aiofiles,HTTP客户端也要换成aiohttp等。因为如果你在异步协程中使用了同步阻塞操作,事件循环也会被阻塞,程序的并发性能会急剧下降。但有时我们需要用到的一个库又没有异步版本,此时怎么办呢?答案是使用asyncio库的run_in_executor(),将同步操作放到线程池中统一调度执行。

import asyncio
import time


def blocking_io():
    time.sleep(1)
    print('Done!')


async def foo():
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(None, blocking_io)


if __name__ == '__main__':
    asyncio.run(foo())

代码中,asyncio.get_running_loop()用于获取当前的事件循环,run_in_executor()函数用于将同步阻塞函数放入线程池,由当前的事件循环调度执行。run_in_executor()的第一个参数是线程池,传入None表示使用默认线程池(通常是ThreadPoolExecutor),第二个参数是阻塞函数,后面也可以传入可变参数作为阻塞函数的参数。

作者:Gacfox
版权声明:本网站为非盈利性质,文章如非特殊说明均为原创,版权遵循知识共享协议CC BY-NC-ND 4.0进行授权,转载必须署名,禁止用于商业目的或演绎修改后转载。