Python基础:协程

Published on 2025-04-09 17:36 in 分类: 博客 with 狂盗一枝梅
分类: 博客

一、生成器

在Python中,生成器(Generator)是一种用于创建迭代器的简洁且高效的工具,能够按需生成值而非一次性加载所有数据到内存。举个例子:

g = (x * x for x in range(10))

上述代码定义了一个生成器g,它和列表生成式的用法非常相似,列表生成式:

L = [x * x for x in range(10)]

不同的是,生成器只是保存了一个“算法”,它并不会展开计算最后得到一个列表,生成器中的每一个值都是根据前一个值“推导”出来的,只有用到的时候才会计算。‌

1、创建生成器

第一种方式是上面提到的创建方式:g = (x * x for x in range(10)),这种方式是生成器表达式,能够快速创建生成器表达式;

第二种方式是通过创建一个普通函数,然后配合yield关键字创建,下面的代码创建了一个斐波那契数列生成器:

def fibonacci(n):
    index, a, b = 0, 0, 1
    while index < n:
        yield a
        a, b = b, a + b
        index = index + 1
    return 'done'

2、生成器迭代

第一种方法:next函数。

比如对于g = (x * x for x in range(10)) 生成器,使用next函数遍历代码如下:

g = (x * x for x in range(10))

if __name__ == '__main__':
    print(next(g))
    print(next(g))
    print(next(g))
    print(next(g))
    print(next(g))
    print(next(g))
    print(next(g))
    print(next(g))
    print(next(g))
    print(next(g))
    print(next(g)) #第11次调用会报错

第11次调用开始报错:

0
1
4
9
16
25
36
49
64
81
Traceback (most recent call last):
  File "D:\gitee_my\python-basic-study\02.aio\00.generator_demo.py", line 24, in <module>
    print(next(g))
StopIteration

使用next函数迭代,每次调用next(g),就计算出g的下一个元素的值,直到计算到最后一个元素,没有更多的元素时,抛出StopIteration的错误。这种迭代方式着实过于变态,正确的方法是使用for循环。

第二种方式: for循环

以斐波那契数列生成器为例:

def fibonacci(n):
    index, a, b = 0, 0, 1
    while index < n:
        yield a
        a, b = b, a + b
        index = index + 1
    return 'done'

遍历该生成器可使用for循环:

g1 = fibonacci(10)
    for i in g1:
        print(i)

使用def+yield的方式定义的生成器,每次执行到 yield 时暂停,返回当前值;下次调用时从暂停的位置继续执行。当函数执行完毕或遇到 return 语句时,抛出 StopIteration 异常,结束迭代;但是for循环方式会消化掉该异常,同时也无法获取return返回值,如果想获取return返回值,只能通过next函数的方式:

def next_iter():
    g1 = fibonacci(6)
    print(next(g1))
    print(next(g1))
    print(next(g1))
    print(next(g1))
    print(next(g1))
    print(next(g1))
    try:
        print(next(g1))  # 第7次调用会报错
    except StopIteration as e:
        print("generator return value:", e.value)

输出:

0
1
1
2
3
5
generator return value: done

3、生成器接收值

yield关键字不仅能返回值,还能接收值。先定义一个生成器:

def value_adjuster():
    current = 0
    while True:
        received = yield current  # 暂停并等待接收值
        print(f"接收到的值:{received}")
        if received is not None:
            current = received  # 更新内部状态
        else:
            current += 1  # 默认自增
            
gen = value_adjuster() #创建生成器实例

该生成器每次接收一个值,如果值不为空,就将其赋值给内部维护的current;否则current值自增。下次循环时将该值返回给调用方。

生成器在调用send方法前需要先初始化,初始化的方法有两种:

方法一:调用next函数初始化,next(gen)

方法二:发送空值,gen.send(None)

测试运行如下代码:

print(next(gen))  # 初始化生成器,输出 0
print(gen.send(10))  # 发送 10,生成器返回 10
print(next(gen))  # 继续执行,输出 11
print(gen.send(5))  # 发送 5,生成器返回 5
print(next(gen))  # 继续执行,输出 6

输出:

0
接收到的值:10
10
接收到的值:None
11
接收到的值:5
5
接收到的值:None
6

可以看到,send方法会向生成器发送一个值,之后接收生成器返回的值;如果没有向生成器发送值,那么yield会接收到None。所以,生成器可以连续的调用send方法:

print(gen.send(10))  # 发送 10,生成器返回 10
print(gen.send(5))  # 发送 5,生成器返回 5

二、协程

协程(Coroutine)是 Python 中实现‌异步编程‌的核心工具,用于编写高性能的 I/O 密集型代码。其核心思想是通过协作式多任务(非抢占式),在单线程内实现并发执行。

协程和线程有什么区别呢?

线程/进程‌:由操作系统调度,存在上下文切换开销,适合 CPU 密集型任务。

协程‌:由程序员控制切换点(如 await),无上下文切换开销,适合 I/O 密集型任务(如网络请求、文件读写)。

1、生成器和协程

Python 3.4 前,Python对协程的支持是通过生成器实现的。可以说协程是生成器的一种高级应用,利用 yield 的暂停和参数传递特性实现控制流同步。

实际上上一章节的“生成器接收值”案例正是协程的一个简答案例。接下来以生产者消费者为例说明生成器协程的使用。

单生产者消费者

def consumer():
    r = ''
    while True:
        n = yield r
        if not n:
            return
        print('[CONSUMER] Consuming %s...' % n)
        r = '200 OK'


def produce(c):
    c.send(None)
    n = 0
    while n < 5:
        n = n + 1
        print('[PRODUCER] Producing %s...' % n)
        r = c.send(n)
        print('[PRODUCER] Consumer return: %s' % r)
    c.close()


c = consumer()
produce(c)

if __name__ == '__main__':
    pass

输出结果如下:

[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK

这个案例比较简单,生产者生产出来发送给消费者,消费者消费完成返回生产者“200 OK”,生产者接收到消息后继续生产;由于只是一个生产者和一个消费者,所以整体过程比较清晰。唯一美中不足的是整个过程串行化,无法看到协程在并行处理中的特点。

接下来看看协程在多生产者多消费者模型中的表现。

多生产者多消费者

import random
import time
from collections import deque


def producer(name, queue, max_items):
    """生产者协程,生成数据并发送给队列"""
    for i in range(max_items):
        item = f"{name}-{i}"
        print(f"生产者 {name} 生产了 {item}")
        time.sleep(random.random() * 0.5)  # 模拟生产时间
        queue.append(item)
        yield  # 暂停,让出控制权


def consumer(name, queue):
    """消费者协程,从队列中消费数据"""
    while True:
        while queue:
            item = queue.popleft()
            print(f"消费者 {name} 消费了 {item}")
            time.sleep(random.random() * 2)  # 模拟消费时间
            yield  # 处理完一个项目后暂停
        yield  # 如果队列为空,也暂停


def scheduler(producers, consumers, max_items):
    """调度器,协调生产者和消费者的执行"""
    producer_done = [False] * len(producers)
    remaining_items = max_items * len(producers)

    # 初始化所有协程
    for p in producers:
        next(p)
    for c in consumers:
        next(c)

    # 主调度循环
    while remaining_items > 0:
        # 随机选择一个生产者
        active_producer_idx = random.randint(0, len(producers) - 1)
        if not producer_done[active_producer_idx]:
            try:
                producers[active_producer_idx].send(None)
            except StopIteration:
                producer_done[active_producer_idx] = True

        # 随机选择一个消费者
        active_consumer_idx = random.randint(0, len(consumers) - 1)
        consumers[active_consumer_idx].send(None)

        # 更新剩余项目计数
        remaining_items = sum(not done for done in producer_done) * max_items


if __name__ == "__main__":
    # 共享队列
    queue = deque()

    # 配置参数
    num_producers = 3
    num_consumers = 2
    items_per_producer = 5

    # 创建生产者和消费者协程
    producers = [
        producer(f"[PRODUCER]-{i}", queue, items_per_producer)
        for i in range(num_producers)
    ]

    consumers = [
        consumer(f"[CONSUMER]-{i}", queue)
        for i in range(num_consumers)
    ]

    # 启动调度器
    scheduler(producers, consumers, items_per_producer)

    print("所有生产消费任务完成!")

上面的代码实际上是串行运行的,因为Python的生成器协程(非asyncio)在单个线程中通过显式调度交替执行,而不是真正的并发。

调度器随机选择要执行的生产者或消费者,当队列为空时,消费者会暂停,当生产者完成所有项目后会停止。

2、asnyc/await 协程

生成器协程不能实现真正的并发,而且运行效率低,Python 3.5(2015年9月)首次支持async/await语法,但此时协程仍需通过asyncio.coroutine装饰器声明;Python 3.7+(2018年6月)进一步优化,协程可通过async def直接声明(无需装饰器),并改进了事件循环(asyncio库)和性能。

当前我的python版本是3.11,所以无须担心可以直接使用最新版本的协程语法。

基本语法

使用 async def 定义协程函数,内部通过 await 挂起执行:

async def fetch_data():
    print("开始获取数据")
    await asyncio.sleep(1)  # 模拟 I/O 操作
    print("数据获取完成")
    return "data"

运行协程

协程必须通过‌事件循环‌运行:

import asyncio


async def fetch_data():
    print("开始获取数据")
    await asyncio.sleep(1)  # 模拟 I/O 操作
    print("数据获取完成")
    return "data"


async def run():
    result = await fetch_data()
    print("结果:", result)


if __name__ == '__main__':
    asyncio.run(run())

运行结果:

开始获取数据
(等待一秒)
数据获取完成
结果: data

多生产者多消费者

现在使用async/await版本的协程重新实现多生产者多消费者

import asyncio
from asyncio import Queue


async def producer(name, queue: Queue, max_items):
    """异步生产者"""
    for i in range(max_items):
        item = f"{name}-{i}"
        print(f"生产者 {name} 生产了 {item}")
        await queue.put(item)
        await asyncio.sleep(1)  # 模拟异步生产


async def consumer(name, queue: Queue):
    """异步消费者"""
    while True:
        item = await queue.get()
        print(f"消费者 {name} 消费了 {item}")
        queue.task_done()
        await asyncio.sleep(1)  # 模拟异步消费


async def run():
    queue: Queue = asyncio.Queue(maxsize=3)  # 限制队列大小实现背压
    producers = [
        asyncio.create_task(producer(f"P-{i}", queue, 5))
        for i in range(3)
    ]
    consumers = [
        asyncio.create_task(consumer(f"C-{i}", queue))
        for i in range(2)
    ]

    await asyncio.gather(*producers)  # 等待所有生产者完成
    await queue.join()  # 等待队列清空
    for c in consumers:  # 取消消费者任务
        c.cancel()


if __name__ == '__main__':
    asyncio.run(run())

可以看到这段代码比生成器版本的写成要精简的多,而且代码也更清晰。代码中使用了asyncio的很多方法,这之后再聊它们的作用。

上述代码可以尝试调整队列的大小以及生产者、消费者的数量以观察供大于求、供小于求等各种情况下的表现。

三、asyncio

asyncio 是 Python 用于编写并发代码的标准库,使用 async/await 语法。它特别适合 I/O 密集型和高并发的网络应用。

1、基本用法

运行协程(同步方法中运行协程):

async def main():
    print('Hello')
    await asyncio.sleep(1)
    print('World')

# Python 3.7+
asyncio.run(main())

并发运行任务:

async def task(name, seconds):
    print(f"{name} starting")
    await asyncio.sleep(seconds)
    print(f"{name} finished after {seconds}s")

async def main():
    # 方式1: gather
    await asyncio.gather(
        task("A", 2),
        task("B", 1),
        task("C", 3)
    )
    
    # 方式2: create_task + await
    t1 = asyncio.create_task(task("X", 2))
    t2 = asyncio.create_task(task("Y", 1))
    await t1
    await t2

需要特别注意的是:await关键字调用必须在async块中。

2、异步生成器

之前说过可以使用生成器实现协程功能,实际上使用async的协程也可以搭配生成器一起实现异步生成器的功能:

async def async_gen(n):
    for i in range(n):
        await asyncio.sleep(0.5)
        yield i

async def main():
    async for item in async_gen(3):
        print(item)

3、限制并发数

使用semaphore实现限制并发数:

async def worker(semaphore, name):
    async with semaphore:
        print(f"{name} started")
        await asyncio.sleep(2)
        print(f"{name} finished")

async def main():
    semaphore = asyncio.Semaphore(3)  # 最多3个并发
    tasks = [
        asyncio.create_task(worker(semaphore, f"Worker-{i}"))
        for i in range(10)
    ]
    await asyncio.gather(*tasks)

semaphore在Java中是一种共享锁,通过信号量机制实现并发数量控制,在python中也是类似的功能,详情可以查看:详解AQS七:深入理解信号量机制Semaphore

4、同步阻塞队列:Queue

通过queue: Queue = asyncio.Queue(maxsize=3)可以创建一个长度为3的同步阻塞队列,它和java中的BlokingQueue有异曲同工之妙。

put方法:将一个元素放入队列,如果队列已满,就阻塞等待队列空闲。

get方法:从队列中获取并移除一个元素,如果队列为空,就一直阻塞等待直到队列中有元素为止。

task_done方法:高速队列之前get方法取出的任务已经执行完成

join方法:等待队列中的任务全部执行完毕,该方法依赖于task_done方法的执行。


END.


#python #多线程编程 #协程
复制 复制成功