协程

这一章来介绍一下Python最鲜为人知的, (表面上看起来)最无用的特性,也就是协程(Coroutine).
本章的notebook文件见这里.
协程是一种流程控制工具, 可以把它看作一种轻量级的线程,但是它的中断和继续都是由程序控制(而非系统阻塞),因此效率更高.

   这么说可能有些难以理解,下面就一步步深入了解协程. ## 从生成器到协程 在Python2.5以后,生成器API中加入了`send`方法, 该方法可以发送数据并将该数据作为生成器函数中`yield`表达式的值. 自此, 生成器就可以用作协程. 下面介绍一个最简单的协程用法:
1
2
3
4
def simple_coroutine():
print('开始协程')
x = yield
print('协程接收到: ', x)
1
2
my_coro = simple_coroutine()
my_coro
<generator object simple_coroutine at 0x000001EC20B4C468>
1
next(my_coro)
开始协程
1
my_coro.send(58)
协程接收到:  58


---------------------------------------------------------------------------

StopIteration                             Traceback (most recent call last)

<ipython-input-6-fe8fce2fd396> in <module>()
----> 1 my_coro.send(58)


StopIteration: 

上述代码给出了一个用生成器实现的协程, 和之前见过的生成器最大的不同在于yield出现在了右边,而且关键字后面没有表达式. 这说明协程会从调用方接收数据.

定义好该协程后, 我们调用函数得到一个生成器对象my_coro, 因为一开始生成器还没有启动,所以我们需要先调用next函数. 然后执行完打印”开始协程”后, 流程来到yield处, 并在收到send函数发送的数据(58)后执行. 接着协程恢复, 一直运行到下一个yield表达式, 或者到末尾中止(抛出StopIteration).
协程有四种不同的状态(当前状态可以用inspect.getgeneratorstate来获取):

  • GEN_CREATED 等待开始执行
  • GEN_RUNNING 解释器正在执行(仅出现在多线程应用中)
  • GEN_SUSPENDEDyield表达式出暂停
  • GEN_CLOSED 执行结束

仅当协程处于暂停状态时才能调用send方法, 唯一的例外是当协程未激活(等待开始时),可以调用.send(None)来激活它, 其效果等同于.next(my_coro).

下面用一个产出多个值的例子来更好的理解协程:

1
2
3
4
5
6
7
8
9
def simple_coro2(a):
print('开始协程, a=', a)
b = yield a
print('协程接收到: b=', b)
c = yield a + b
print('协程接收到: c=', c)

my_coro2 = simple_coro2(14)
from inspect import getgeneratorstate
1
getgeneratorstate(my_coro2) # 协程处于未启动状态  
'GEN_CREATED'
1
next(my_coro2) # 激活协程, 运行到yield a处, 注意此处产出了a的值并返回
开始协程, a= 14


14
1
getgeneratorstate(my_coro2) # 协程处于暂停状态  
'GEN_SUSPENDED'
1
2
3
 # 发送28给咱定的协程, 计算yield表达式并将28赋给b
# 接着运行到下一个yield, 计算a+b, 然后暂停
my_coro2.send(28)
协程接收到: b= 28


42
1
2
# 过程类似上一步,不过这里最后协程中止,抛出异常  
my_coro2.send(99)
协程接收到: c= 99


---------------------------------------------------------------------------

StopIteration                             Traceback (most recent call last)

<ipython-input-13-627eb114cd58> in <module>()
      1 # 过程类似上一步,不过这里最后协程中止,抛出异常
----> 2 my_coro2.send(99)


StopIteration: 
1
getgeneratorstate(my_coro2) # 终止状态 
'GEN_CLOSED'

上面的代码最难懂的是b = yield a这行, 这里右边的代码yield a会在赋值之前先执行, 然后b的值在下一次激活协程时再设定(而不是直接赋值为a). 这里的代码不是非常直观, 需要一段时间才能习惯.

我们可以注意到,在上述代码中,每行yield表达式既是上一阶段的结束, 又是下一阶段的开始.

使用协程计算移动平均值

下面我们用一个更复杂的示例来说明协程的行为. 这里我们希望用它来计算移动平均值:

1
2
3
4
5
6
7
8
9
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield average
total += term
count += 1
average = total/count

这里的协程用来一个无限循环while True来计算移动平均值, 那么它会一直接受值然后生成结果, 除非调用方调用了close方法来关闭它. 使用协程的好处是不用再用闭包来保存上下文. 用法如下:

1
2
coro_avg = averager()
next(coro_avg)
1
coro_avg.send(10) 
10.0
1
coro_avg.send(30)
20.0
1
coro_avg.send(5)
15.0

由于在启动协程前,我们需要用next预先激活(prime)它, 这一步容易忘记,因此我们可以用一个特殊的装饰器来避免遗漏.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from functools import wraps

def coroutine(func):
@wraps(func)
def primer(*args, **kwargs):
gen =func(*args, **kwargs)
next(gen)
return gen
return primer

@coroutine
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield average
total += term
count += 1
average = total/count
1
2
coro_avg2 = averager()
getgeneratorstate(coro_avg2)
'GEN_SUSPENDED'

上面的输出指明了,协程已经准备好可以接受值了,这是因为我们用装饰器使其在声明后就进行了预激.

终止协程和处理异常

协程中未处理的异常会向上回溯, 传给next或是send方法的调用者:

1
2
coro_avg = averager()
coro_avg.send(40)
40.0

1
coro_avg.send('miao')

TypeError                                 Traceback (most recent call last)

<ipython-input-25-a8ca1776997a> in <module>()
----> 1 coro_avg.send('miao')


<ipython-input-21-93a336033508> in averager()
     16     while True:
     17         term = yield average
---> 18         total += term
     19         count += 1
     20         average = total/count


TypeError: unsupported operand type(s) for +=: 'float' and 'str'

1
coro_avg.send(10)

StopIteration                             Traceback (most recent call last)

<ipython-input-27-2808c6331bad> in <module>()
----> 1 coro_avg.send(10)


StopIteration: 

上面我们故意传入了一个字符串,引发了异常. 由于协程内没有处理异常,所以协程会终止,再次重新激活协程会抛出异常.

其实这就给出了一种终止协程的方法: 发送某个哨符值让协程退出. 常用的特殊值有NoneEllipsis.
我们也可以用两种方法,显式地把异常发给协程:

  • generator.throw
     致使生成器在暂停的 yield 表达式处抛出指定的异常。如果生成器处理了抛出的异 常,代码会向前执行到下一个 yield 表达式,而产出的值会成为调用 generator.throw 方法得到的返回值。如果生成器没有处理抛出的异常,异常会向上冒泡,传到调用方的上下文中。
  • generator.close
     致使生成器在暂停的 yield 表达式处抛出 GeneratorExit 异常。如果生成器没有处 理这个异常,或者抛出了 StopIteration 异常(通常是指运行到结尾),调用方不会报错。如果收到 GeneratorExit 异常,生成器一定不能产出值,否则解释器会抛出 RuntimeError 异常。生成器抛出的其他异常会向上冒泡,传给调用方。

下面举例说明一下这两个函数的用法.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class DemoException(Exception):
"""为演示构建的异常类"""

def demo_exc_handling():
print('-> 协程开始')
while True:
try:
x = yield
except DemoException:
print('*** 处理异常类. 继续执行...')
else:
print('-> 协程接收到 {!r}'.format(x))
raise RuntimeError('这一行应该永远不会被执行.')

对于这样一个协程,正常的调用流程是这样的.

1
2
exc_coro = demo_exc_handling()
next(exc_coro)
-> 协程开始
1
exc_coro.send(5)
-> 协程接收到 5
1
exc_coro.send(10)
-> 协程接收到 10
1
exc_coro.close()
1
getgeneratorstate(exc_coro) # 协程状态为已关闭
'GEN_CLOSED'

现在我们尝试传入异常,看一下协程如何处理.

1
2
exc_coro = demo_exc_handling()
next(exc_coro)
-> 协程开始
1
exc_coro.send(5)
-> 协程接收到 5
1
exc_coro.throw(DemoException)
*** 处理异常类. 继续执行...
1
getgeneratorstate(exc_coro)
'GEN_SUSPENDED'

可以看到这里协程对DemoException做了处理,然后继续运行, 处于GET_SUSPENDED等待状态.

但是当传入的异常没有处理时,协程就会自动停止,变为关闭状态, 如:

1
2
3
exc_coro = demo_exc_handling()
next(exc_coro)
exc_coro.send(5)
-> 协程开始
-> 协程接收到 5

1
exc_coro.throw(ZeroDivisionError)

ZeroDivisionError                         Traceback (most recent call last)

<ipython-input-39-ab264ab92c46> in <module>()
----> 1 exc_coro.throw(ZeroDivisionError)


<ipython-input-28-152a963a63ce> in demo_exc_handling()
      6     while True:
      7         try:
----> 8             x = yield
      9         except DemoException:
     10             print('*** 处理异常类. 继续执行...')


ZeroDivisionError: 
1
getgeneratorstate(exc_coro)
'GEN_CLOSED'

让协程返回值

这里我们用一个新版的移动平均值计算协程来说明 如何让协程返回值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from collections import namedtuple 

Result = namedtuple('Result', 'count average')

def averager():
total = 0.0
count = 0
average = None
while True:
term = yield
if term is None:
break # <1> 加条件判断来终止协程以返回值
total += term
count += 1
average = total/count
return Result(count, average)
1
2
coro_avg = averager()
next(coro_avg)
1
coro_avg.send(10)
1
coro_avg.send(30)
1
coro_avg.send(6.5)

1
coro_avg.send(None)

StopIteration                             Traceback (most recent call last)

<ipython-input-46-af5e8de3e651> in <module>()
----> 1 coro_avg.send(None)


StopIteration: Result(count=3, average=15.5)

可以看到我们的返回值保存在了抛出的异常的value中,那么如何获取返回值呢?

1
2
3
4
5
6
7
8
9
10
coro_avg = averager()
next(coro_avg)
coro_avg.send(10)
coro_avg.send(30)
coro_avg.send(6.5)
try:
coro_avg.send(None)
except StopIteration as exc:
result = exc.value
result
Result(count=3, average=15.5)

这样我们弯弯绕绕就获得了协程的返回值, 下面我们来讨论yield from的结构.

使用yield from

yield from的主要功能是打开双向通道,将最外层的调用方和最内层的子生成器连接起来,这样二者可以直接发送和产出值, 还可以直接传入异常,而不用在位于中间的协程中添加大量处理异常的代码.

在介绍它之前, 我们先来说明一些专用的术语:

委派生成器 包含yield from <iterable>表达式的生成器函数
子生成器yield from表达式中<iterable>部分获取的生成器
调用方 调用委派生成器的客户端代码.

下面给出一个例子来说明yield from结构的用法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
from collections import namedtuple
Result = namedtuple('Result', 'count average')

# 子生成器
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield # 将main中客户代码发送的值绑定到term上
if term is None: # 终止条件
break
total += term
count += 1
average = total/count
return Result(count, average) # 返回值成为yield from表达式的值

# 委派生成器
def grouper(results, key):
while True: # 循环中每次新建一个averager
# 这里grouper发送的值经由yield from处理并传给averager示例
# 然后grouper在yield from处暂停, 等待下一次客户端发来值
results[key] = yield from averager()

# 客户端代码 / 调用方
def main(data):
results = {}
for key, values in data.items():
group = grouper(results, key) # group为生成器对象
next(group) # 预激
for value in values:
# 发送value给grouper, 然后最终达到avergeer函数中
# term = yield那一行
group.send(value)
group.send(None) # 终止averger实例 至关重要的一行

report(results)

# 输出报告
def report(results):
for key, result in sorted(results.items()):
group, unit = key.split(';')
print('{:2} {:5} averaging {:.2f}{}'.format(
result.count, group, result.average, unit))

data = {
'girls;kg':
[40.9, 38.5, 44.3, 42.2, 45.2, 41.7, 44.5, 38.0, 40.6, 44.5],
'girls;m':
[1.6, 1.51, 1.4, 1.3, 1.41, 1.39, 1.33, 1.46, 1.45, 1.43],
'boys;kg':
[39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3],
'boys;m':
[1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46],
}
1
main(data)
 9 boys  averaging 40.42kg
 9 boys  averaging 1.39m
10 girls averaging 42.04kg
10 girls averaging 1.43m

上面的代码的作用是给data字典中的各个字段求平均值.
简要说明一下整个流程:

  1. main函数中,每次外层循环会新建一个grouper实例并赋给group,这里的group就是委派生成器.
  2. 接着,预激委派生成器,此时委派生成器进入while True循环, 调用子生成器averager后在yield from处暂停.
  3. 内层循环调用send把值发给avergaer, 同时group生成器仍然停在yield from那里.
  4. 整个内层循环结束,group生成器仍然停在yield from那里.
  5. 生成器send(None),然后结束averger实例,委派生成器进入循环下一次.
  6. 外层循环重新构建一个grouper实例, 然后绑定到group变量, 前一个实例被垃圾回收程序回收.

这里给出了yield from结构最简单的用法, 即只有一个委派生成器和一个子生成器. 委派生成器相当于管道, 因此我们可以将任意数量的委派生成器连接到一起: 一个委派生成器使用yield from调用一个子生成器, 而那个子生成器本身也是一个委派生成器, 以此类推, 只要链条最终使用yield表达式结束.