本文大纲翻译自 medium.com/@superfastp…
Python Asyncio 让咱们能够进行基于协程(coroutine)的异步编程。但从python3.4(2014)开端引进,到python3.7的成熟,asyncio其完成已推出许多年了。尽管如此,异步编程仍然是python中最吸引人但又最令人懊丧的领域。为什么呢?因为它是在是太难上手了,尤其对没有js等自带async的编程语言开发经历的人来说。关于python开发者来说,asyncio不是一个添加到python的特性,而是一个完全新的编程范式(paradigm),这需求咱们从头架构咱们的程序。正是这种不同的思维让开端学习asyncio变的极度懊丧,乃至憎恶asyncio。本文将带你快速上手,趟过这条河。
基本概念
协程
前面咱们提到了Asyncio基于协程,咱们先简略了解一下协程(Coroutine)的概念,协程往往被称为轻量级线程、纤程,许多编程语言都具有这个概念,例如go、js包括比较新的java21。咱们知道线程之间切换是比较耗时的,需求从用户态切换到内核态,而协程并不涉线程切换,自己维护上下文,一个线程能够维护多个协程。 可是协程虽好,可是单线程内仍只要一个协程能够获取履行权,关于cpu密集型使命,它乃至可能会更慢,所以协程首要是解决的是io相关的问题。
事情循环 event loop
asyncio 运用了单线程的事情循环行列来履行协程。整个流程便是,将使命提交到事情行列中,会有一个履行线程不停的从行列中获取并履行使命,当遇到需求io的操作时,会指派给操作体系监控状况,然后继续获取使命,直到一切使命都履行完结。当然,履行使命期间,也会发生新的使命,包括io完结也是使命。 事情循环行列中最重要的是封装了一个对操作体系供给的io多路复用器,不同操作体系完结不同,linux的epoll,windows的iocp等等。简略来说便是,当咱们遇到io时(拜访文件体系、网络恳求等),咱们能够把等候io完结的使命交给操作体系,而等候io这期间,咱们能够不让线程挂起,去干更多的事情,这关于有着紧箍咒(大局解说锁GIL)的python来说,对错常有用的。
1.怎么界说、创立、运转、切换协程
咱们运用 async def
来界说一个协程,这儿能够把async def看作是def的扩张,专门用来界说协程。
一般的def界说的function运转之后会发生一个对应的回来值,而async def运转之后,会回来一个coroutine。
coroutine必须在event loop 内运转,咱们能够经过asyncio.run()运转coroutine。
# define a coroutine
async def custom_coro():
print('Hello there')
# create a coroutine
coro = custom_coro()
# 运转custom_coro()之后回来的是一个coroutine
type(coro)
# run a coroutine in the event loop
asyncio.run(coro)
咱们怎么从一个协程切换到另一个协程呢?咱们能够运用await
关键字。
async def custom_coro():
# 中止当前协程,然后运转other_coro
await other_coro()
当协程遇到await 润饰的函数时,会中止,直到await的协程履行完毕,然后会到当前状况继续继续履行。
2.task与coroutine
task是coroutine的一个封装,相关于coroutine,task具有检查状况、暂停使命等更丰厚的功能。通常来说,咱们最好将协程转成task,许多当地现在支撑coroutine,实践上是python悄悄帮你转为task,将来的某个版别可能不再支撑。
# create and schedule a task
task = asyncio.create_task(other_coro())
在运用asyncio.create_task()后,会将协程封装成一个task,而且会组织它进入事情循环,而且只要机会,就让它立刻履行。
这个调用也回来了一个task object,经过这个回来值,咱们能够回去回来值成果、检查使命状况等等。
咱们能够运用await
来暂停当前线程获取成果,也可根据task的方法来获取成果。
# 等候,直到task履行完,并获取回来值
value=await task
# check task,假如task没有完结直接获取result会抛反常
if task.done():
value=task.result()
下面给一个完整比方:运用coroutine创立一个task,然后过一段时刻后再获取成果,我为大家标出了履行次序,以便更好的了解。
# example of scheduling an async task
import asyncio
# coroutine to perform some useful task
async def task_coro():
#3. report a message
print('The task is running...')
#4. suspend and sleep for a moment
await asyncio.sleep(1)
#6. report a message
print('The task done')
#7. return a result
return 'The answer is 100'
# main coroutine
async def main():
#1. run a task independently
task = asyncio.create_task(task_coro())
#2. suspend a moment, allow the scheduled task to run
await asyncio.sleep(0)
#5. wait for the async task to complete
await task
#8. report the return value
print(f'Got: {task.result()}')
# create the coroutine and run it in the event loop
asyncio.run(main())
这儿又个需求注意的点:从1.2.3步咱们能够看到task创立以后并没有直接履行(默认情况下),而是在await asyncio.sleep(0)
后main协程中止之后,task_coro才开端履行。
3.多个协程怎么并行
咱们能够在aysncio程序内并行的运转多个程序,这关于并行下载多个文件或许高并发场景下都很有用。
咱们能够运用asyncio.gather()来运转协程,它能够承受多个coroutine或许task,然后回来一个asyncio.Future。Future是Task的父类,代表着未来将会有一个成果,Task是一个为了包裹协程的Future。
future = asyncio.gather(coro1(), coro2() coro3())
Future 与Task Coroutine相同,他们的目标都能够被await润饰的目标,都是完结了__await__。在这儿咱们能够await future 来完结等候一切协程或许使命,最终会回来一个迭代器,获取每个协程的运转成果,次序与添加的次序相同。
下面的比方中,准备了100个协程,每个协程随机sleep 0-1秒,一切协程并发履行。
# example of running many coroutines concurrently
import random
import asyncio
import time
# coroutine to perform some useful task
async def task_coro(arg) -> str:
# generate a random value between 0 and 1
value = random.random()
# suspend and sleep for a moment
await asyncio.sleep(value)
# report the argument and value
return f'Task {arg} done after {value} seconds'
# main coroutine
async def main():
# create many coroutines
coros = [task_coro(i) for i in range(100)]
# suspend and run all coroutines
now=time.time()
results=await asyncio.gather(*coros)
print(f"total taken:{time.time()-now}")
for er in results:
print(er)
# create the coroutine and run it in the event loop
asyncio.run(main())
能够看到,总共花费不到1秒就都履行完毕,然后循环输出了,每个协程的回来值。
total taken:0.9943249225616455
Task 0 done after 0.2671076110393307 seconds
Task 1 done after 0.514333336910391 seconds
...
4.怎么等候多个tasks
假如咱们不是仅仅想等候一切task都完结,而是有更杂乱的需求,第一个出现反常就暂停等等操作,咱们能够运用asyncio.wait()。 经过函数的界说,咱们能够看到咱们能够设置超时时刻和何时回来,默认时全部完结时ALL_COMPLETED回来,也能够是第一个完结FIRST_COMPLETED,或许是第一个反常回来FIRST_EXCEPTION。
(function) def wait(
fs: Iterable[Awaitable[_T@wait]],
*,
timeout: float | None = None,
return_when: str = "ALL_COMPLETED")
继续运用上一个的场景进行多个task的等候,只不过这次咱们咱们运用asyncio.wait(),获取第一个完结的成果。
# SuperFastPython.com
# example of waiting for a collection of tasks
import random
import asyncio
# coroutine to perform some useful task
async def task_coro(arg):
# generate a random value between 0 and 1
value = random.random()
# suspend and sleep for a moment
await asyncio.sleep(value)
# return a value unique for this task
return arg * value
# main coroutine
async def main():
# create and schedule many independent tasks
tasks = [asyncio.create_task(task_coro(i)) for i in range(100)]
# suspend and wait for the first task to complete
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
# report the result from the first task
task = done.pop()
print(f'First task got: {task.result()}')
# create the coroutine and run it in the event loop
asyncio.run(main())
5. 按使命完结次序处理成果
当使命完结时,咱们能够处理或许提交成果。与前面按照提交使命次序回来成果不同,当某些使命很快,某些使命完结很慢,假如等候一切成果,将会浪费更多的时刻。咱们能够经过asyncio.as_complete()
按照使命完结次序来处理成果。
asyncio.as_complete()
承受一个awaitable
集合,比方一个List[Task],回来一个迭代器,每次迭代将会yield 一个task,当运用await时,等一个现已完结的task后回来成果。
# get a generator that yields awaitables in completion order then iterate
for task in asyncio.as_completed(tasks):
# 获取第一个现已完结的使命
result = await task
6. 怎么完结协程之间同享数据
协程之间数据同享咱们能够经过asyncio.Queue
,它是一个先进先出、协程安全的行列,咱们无需忧虑Queue的添加、获取的竞争问题。
创立一个行列,能够经过添加maxsize参数设置上限
queue = asyncio.Queue()
咱们能够经过put()来添加数据到行列中,同时会回来一个协程。事实上,咱们put时,必定要await
润饰,因为假如Queue达到上限的时分会阻塞住。相同的获取数据时也需求运用await润饰get()。
# add and retrieve
await queue.put(item)
item = await queue.get()
下面咱们运用asyncio.Queue
来写一个经典的生产者顾客程序。生产者往行列里放十个元素,每放进去一个,顾客就获取一个打印出来,直到最终消费到None为止。
from random import random
import asyncio
# coroutine to generate work
async def producer(queue):
print('Producer: Running')
# generate work
for i in range(10):
value = random()
await asyncio.sleep(value)
await queue.put(value)
await queue.put(None)
print('Producer: Done')
async def main():
# create the shared queue
queue = asyncio.Queue()
# run the consumer as an independent task
asyncio.create_task(producer(queue))
# consume items from the queue until a None is seen
while value:=await queue.get():
# report the value
print(f'Got: {value}')
asyncio.run(main())
7. 怎么写NIO程序
咱们能够经过asyncio.open_connection()
来创立一个TCP client。调用之后会回来一个协程,等候协程完结之后将会回来一个读流StreamReader和一个写流StreamWriter。
# 假如需求ssl,则需求设置ssl=True
reader, writer = await asyncio.open_connection('www.baidu.com', 443, ssl=True)
咱们能够运用encode()将string转为bytes,然后运用将数据写入socket,然后运用await writer.drain()
来等候一切的数据现已发送出去。
# encode string data to byte data
byte_data = string_data.encode()
# write byte data
writer.write(bytes_data)
# wait for data to be transmitted
await writer.drain()
read是与write相反的操作。
# read byte data
byte_data = await reader.read()
# decode bytes to strings
string_data = byte_data.decode()
下面咱们进行一下实战,以NIO的方式拜访百度主页。
import asyncio
async def fetch_baidu_homepage():
host = 'www.baidu.com'
port = 80
# 构建HTTP恳求
request = (
f"GET / HTTP/1.1rn"
f"Host: {host}rn"
"Connection: closern"
"rn"
)
# 运用asyncio.open_connection树立TCP衔接
reader, writer = await asyncio.open_connection(host, port)
# 发送HTTP恳求
writer.write(request.encode())
# 读取响应
response = b""
while True:
data = await reader.read(1024)
if not data:
break
response += data
# 关闭衔接
writer.close()
print(response.decode())
if __name__ == '__main__':
asyncio.run(fetch_baidu_homepage())
最终
这儿简略介绍了asyncio首要的一些操作和概念,实践过程中往往更加杂乱,或许会有一些结构对其进行了封装。总之,经过这些比方,咱们能对python的异步编程有必定的了解,开端向异步编程的国际迈出开端的第一步。