基础

  1. 协程是实现并发编程的一种方式
  2. 线程/多进程模型,是解决并发问题的经典模式
    • C10K - 线程/进程上下文切换占用大量资源
  3. Nginx Event loop
    • 启动一个统一的调度器,让调度器来决定一个时刻去运行哪个任务
    • 节省了多线程中启动线程管理线程同步锁等各种开销
    • 相比于 Apache,用更低的资源支持更多的并发连接
  4. Callback hell - JavaScript
    • 继承了 Event loop 的优越性,同时还提供 async / await 语法糖,解决了执行性可读性共存的难题
    • 协程开始崭露头角,尝试使用 Node.js 实现后端
  5. Python 3.7 提供基于 asyncioasync / await 方法

同步

简单实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import time


def crawl_page(url):
print('crawling {}'.format(url))
sleep_time = int(url.split('_')[-1])
time.sleep(sleep_time)
print('OK {}'.format(url))


def main(urls):
for url in urls:
crawl_page(url)


main(['url_1', 'url_2', 'url_3', 'url_4'])

async / await - 用异步接口实现了同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import asyncio


async def crawl_page(url):
print('crawling {}'.format(url))
sleep_time = int(url.split('_')[-1])
await asyncio.sleep(sleep_time)
print('OK {}'.format(url))


async def main(urls):
for url in urls:
c = crawl_page(url)
print(type(c)) # <class 'coroutine'>, won't be executed
await c # execute coroutine


asyncio.run(main(['url_1', 'url_2', 'url_3', 'url_4']))
  1. asyncio - 包含了大部分实现协程所需的魔法工具
  2. async - 声明异步函数
    • 调用异步函数,可以得到一个协程对象(coroutine object)
  3. 协程执行
    • 通过 await 调用
      • 与 Python 正常执行是一样的,即程序会阻塞在这里
      • 进入被调用的协程函数,执行完毕后再继续
    • asyncio.create_task - 创建任务
    • asyncio.run 触发运行
      • 从 Python 3.7 引入,简化 Python 协程编程,不必理会 Event loop 的定义和使用
      • 编码规范 - asyncio.run(main()) 作为主程序的入口函数,在程序运行周期内,只调用一次 asyncio.run

并发

asyncio.create_task

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import asyncio


async def crawl_page(url):
print('crawling {}'.format(url))
sleep_time = int(url.split('_')[-1])
await asyncio.sleep(sleep_time)
print('OK {}'.format(url))


async def main(urls):
tasks = [asyncio.create_task(crawl_page(url)) for url in urls]
for task in tasks: # wait for all tasks to complete
await task # task is a coroutine object


asyncio.run(main(['url_1', 'url_2', 'url_3', 'url_4']))

asyncio.gather

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio


async def crawl_page(url):
print('crawling {}'.format(url))
sleep_time = int(url.split('_')[-1])
await asyncio.sleep(sleep_time)
print('OK {}'.format(url))


async def main(urls):
tasks = [asyncio.create_task(crawl_page(url)) for url in urls]
await asyncio.gather(*tasks) # Return a future aggregating results from the given coroutines/futures


asyncio.run(main(['url_1', 'url_2', 'url_3', 'url_4']))
  1. *tasks解包列表,将列表变成函数的参数
  2. **dict解包字典,将字典变成函数的参数

协程运行时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import asyncio


async def worker_1():
print('worker_1 start')
await asyncio.sleep(1)
print('worker_1 done')


async def worker_2():
print('worker_2 start')
await asyncio.sleep(2)
print('worker_2 done')


async def main():
print('before await')
await worker_1()
print('awaited worker_1')
await worker_2()
print('awaited worker_2')


asyncio.run(main())

# Output:
# before await
# worker_1 start
# worker_1 done
# awaited worker_1
# worker_2 start
# worker_2 done
# awaited worker_2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import asyncio


async def worker_1():
print('worker_1 start')
await asyncio.sleep(1)
print('worker_1 done')


async def worker_2():
print('worker_2 start')
await asyncio.sleep(2)
print('worker_2 done')


async def main():
task1 = asyncio.create_task(worker_1())
task2 = asyncio.create_task(worker_2())
print('before await')
await task1
print('awaited worker_1')
await task2
print('awaited worker_2')


asyncio.run(main())

# Output:
# before await
# worker_1 start
# worker_2 start
# worker_1 done
# awaited worker_1
# worker_2 done
# awaited worker_2

详细过程

  1. asyncio.run(main()) - 程序进入 main() 函数,事件循环开启
  2. asyncio.create_task - task1 和 task2 被创建,进入事件循环,等待运行
  3. await task1
    • 用户选择从当前的主任务切出,事件调度器开始调度 worker_1
    • worker_1 开始运行,直到运行到 await asyncio.sleep(1)从当前任务切出
    • 事件调度器开始调度 worker_2
  4. worker_2 开始运行,直到运行到 await asyncio.sleep(2)从当前任务切出
  5. 所有事件的运行时间,都在 1ms ~ 10 ms 之间,此时,事件调度器暂停调度
  6. 1 秒后,worker_1 的 sleep 完成,事件调度器将控制权重新传给 task1,在 task1 完成后,从事件循环中退出
  7. await task1 完成,事件调度器将控制权传给主任务,然后主任务会在 await task2 继续等待
  8. 2 秒后,worker_2 的 sleep 完成,事件调度器将控制权重新传给 task2,在 task2 完成后,从事件循环中退出
  9. 主任务输出 awaited worker_2,协程全任务结束,事件循环结束

异常 + 超时限制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import asyncio


async def worker_1():
await asyncio.sleep(1)
return 1


async def worker_2():
await asyncio.sleep(2)
return 2 / 0


async def worker_3():
await asyncio.sleep(3)
return 3


async def main():
task_1 = asyncio.create_task(worker_1())
task_2 = asyncio.create_task(worker_2())
task_3 = asyncio.create_task(worker_3())

await asyncio.sleep(2)
task_3.cancel()

res = await asyncio.gather(task_1, task_2, task_3, return_exceptions=True)
print(res) # [1, ZeroDivisionError('division by zero'), CancelledError()]


asyncio.run(main())
  1. return_exceptions=False - 错误会完整地抛出到执行层,需要 try except 捕捉
  2. 其它还未被执行的任务会被全部取消掉

生产者消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import asyncio
import random


async def consumer(queue, identity):
while True:
val = await queue.get()
print('{} get a val: {}'.format(identity, val))
await asyncio.sleep(1)


async def producer(queue, identity):
for i in range(5):
val = random.randint(1, 10)
await queue.put(val)
print('{} put a val: {}'.format(identity, val))
await asyncio.sleep(1)


async def main():
queue = asyncio.Queue() # golang channel like

consumer_1 = asyncio.create_task(consumer(queue, 'consumer_1'))
consumer_2 = asyncio.create_task(consumer(queue, 'consumer_2'))

producer_1 = asyncio.create_task(producer(queue, 'producer_1'))
producer_2 = asyncio.create_task(producer(queue, 'producer_2'))

await asyncio.sleep(10)
consumer_1.cancel()
consumer_2.cancel()

await asyncio.gather(consumer_1, consumer_2, producer_1, producer_2, return_exceptions=True)


asyncio.run(main())

小结

  1. 协程 vs 线程
    • 协程为单线程
    • 协程由用户决定,在哪里交出控制权,切换到下一个任务
  2. 协程的写法更简洁清晰
    • async / await + create_task - 完全胜任中小级别的并发需求
  3. 需要深刻理解事件循环