一、生成器
在Python中,生成器(Generator)是一种用于创建迭代器的简洁且高效的工具,能够按需生成值而非一次性加载所有数据到内存。举个例子:
g = (x * x for x in range(10))
上述代码定义了一个生成器g,它和列表生成式的用法非常相似,列表生成式:
L = [x * x for x in range(10)]
不同的是,生成器只是保存了一个“算法”,它并不会展开计算最后得到一个列表,生成器中的每一个值都是根据前一个值“推导”出来的,只有用到的时候才会计算。
1、创建生成器
第一种方式是上面提到的创建方式:g = (x * x for x in range(10))
,这种方式是生成器表达式,能够快速创建生成器表达式;
第二种方式是通过创建一个普通函数,然后配合yield
关键字创建,下面的代码创建了一个斐波那契数列生成器:
def fibonacci(n):
index, a, b = 0, 0, 1
while index < n:
yield a
a, b = b, a + b
index = index + 1
return 'done'
2、生成器迭代
第一种方法:next
函数。
比如对于g = (x * x for x in range(10))
生成器,使用next函数遍历代码如下:
g = (x * x for x in range(10))
if __name__ == '__main__':
print(next(g))
print(next(g))
print(next(g))
print(next(g))
print(next(g))
print(next(g))
print(next(g))
print(next(g))
print(next(g))
print(next(g))
print(next(g)) #第11次调用会报错
第11次调用开始报错:
0
1
4
9
16
25
36
49
64
81
Traceback (most recent call last):
File "D:\gitee_my\python-basic-study\02.aio\00.generator_demo.py", line 24, in <module>
print(next(g))
StopIteration
使用next函数迭代,每次调用next(g)
,就计算出g
的下一个元素的值,直到计算到最后一个元素,没有更多的元素时,抛出StopIteration
的错误。这种迭代方式着实过于变态,正确的方法是使用for
循环。
第二种方式: for循环
以斐波那契数列生成器为例:
def fibonacci(n):
index, a, b = 0, 0, 1
while index < n:
yield a
a, b = b, a + b
index = index + 1
return 'done'
遍历该生成器可使用for循环:
g1 = fibonacci(10)
for i in g1:
print(i)
使用def+yield的方式定义的生成器,每次执行到 yield
时暂停,返回当前值;下次调用时从暂停的位置继续执行。当函数执行完毕或遇到 return
语句时,抛出 StopIteration
异常,结束迭代;但是for循环方式会消化掉该异常,同时也无法获取return返回值,如果想获取return返回值,只能通过next函数的方式:
def next_iter():
g1 = fibonacci(6)
print(next(g1))
print(next(g1))
print(next(g1))
print(next(g1))
print(next(g1))
print(next(g1))
try:
print(next(g1)) # 第7次调用会报错
except StopIteration as e:
print("generator return value:", e.value)
输出:
0
1
1
2
3
5
generator return value: done
3、生成器接收值
yield
关键字不仅能返回值,还能接收值。先定义一个生成器:
def value_adjuster():
current = 0
while True:
received = yield current # 暂停并等待接收值
print(f"接收到的值:{received}")
if received is not None:
current = received # 更新内部状态
else:
current += 1 # 默认自增
gen = value_adjuster() #创建生成器实例
该生成器每次接收一个值,如果值不为空,就将其赋值给内部维护的current;否则current值自增。下次循环时将该值返回给调用方。
生成器在调用send方法前需要先初始化,初始化的方法有两种:
方法一:调用next函数初始化,next(gen)
方法二:发送空值,gen.send(None)
测试运行如下代码:
print(next(gen)) # 初始化生成器,输出 0
print(gen.send(10)) # 发送 10,生成器返回 10
print(next(gen)) # 继续执行,输出 11
print(gen.send(5)) # 发送 5,生成器返回 5
print(next(gen)) # 继续执行,输出 6
输出:
0
接收到的值:10
10
接收到的值:None
11
接收到的值:5
5
接收到的值:None
6
可以看到,send方法会向生成器发送一个值,之后接收生成器返回的值;如果没有向生成器发送值,那么yield会接收到None。所以,生成器可以连续的调用send方法:
print(gen.send(10)) # 发送 10,生成器返回 10
print(gen.send(5)) # 发送 5,生成器返回 5
二、协程
协程(Coroutine)是 Python 中实现异步编程的核心工具,用于编写高性能的 I/O 密集型代码。其核心思想是通过协作式多任务(非抢占式),在单线程内实现并发执行。
协程和线程有什么区别呢?
线程/进程:由操作系统调度,存在上下文切换开销,适合 CPU 密集型任务。
协程:由程序员控制切换点(如 await
),无上下文切换开销,适合 I/O 密集型任务(如网络请求、文件读写)。
1、生成器和协程
Python 3.4 前,Python对协程的支持是通过生成器实现的。可以说协程是生成器的一种高级应用,利用 yield
的暂停和参数传递特性实现控制流同步。
实际上上一章节的“生成器接收值”案例正是协程的一个简答案例。接下来以生产者消费者为例说明生成器协程的使用。
单生产者消费者
def consumer():
r = ''
while True:
n = yield r
if not n:
return
print('[CONSUMER] Consuming %s...' % n)
r = '200 OK'
def produce(c):
c.send(None)
n = 0
while n < 5:
n = n + 1
print('[PRODUCER] Producing %s...' % n)
r = c.send(n)
print('[PRODUCER] Consumer return: %s' % r)
c.close()
c = consumer()
produce(c)
if __name__ == '__main__':
pass
输出结果如下:
[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK
这个案例比较简单,生产者生产出来发送给消费者,消费者消费完成返回生产者“200 OK”,生产者接收到消息后继续生产;由于只是一个生产者和一个消费者,所以整体过程比较清晰。唯一美中不足的是整个过程串行化,无法看到协程在并行处理中的特点。
接下来看看协程在多生产者多消费者模型中的表现。
多生产者多消费者
import random
import time
from collections import deque
def producer(name, queue, max_items):
"""生产者协程,生成数据并发送给队列"""
for i in range(max_items):
item = f"{name}-{i}"
print(f"生产者 {name} 生产了 {item}")
time.sleep(random.random() * 0.5) # 模拟生产时间
queue.append(item)
yield # 暂停,让出控制权
def consumer(name, queue):
"""消费者协程,从队列中消费数据"""
while True:
while queue:
item = queue.popleft()
print(f"消费者 {name} 消费了 {item}")
time.sleep(random.random() * 2) # 模拟消费时间
yield # 处理完一个项目后暂停
yield # 如果队列为空,也暂停
def scheduler(producers, consumers, max_items):
"""调度器,协调生产者和消费者的执行"""
producer_done = [False] * len(producers)
remaining_items = max_items * len(producers)
# 初始化所有协程
for p in producers:
next(p)
for c in consumers:
next(c)
# 主调度循环
while remaining_items > 0:
# 随机选择一个生产者
active_producer_idx = random.randint(0, len(producers) - 1)
if not producer_done[active_producer_idx]:
try:
producers[active_producer_idx].send(None)
except StopIteration:
producer_done[active_producer_idx] = True
# 随机选择一个消费者
active_consumer_idx = random.randint(0, len(consumers) - 1)
consumers[active_consumer_idx].send(None)
# 更新剩余项目计数
remaining_items = sum(not done for done in producer_done) * max_items
if __name__ == "__main__":
# 共享队列
queue = deque()
# 配置参数
num_producers = 3
num_consumers = 2
items_per_producer = 5
# 创建生产者和消费者协程
producers = [
producer(f"[PRODUCER]-{i}", queue, items_per_producer)
for i in range(num_producers)
]
consumers = [
consumer(f"[CONSUMER]-{i}", queue)
for i in range(num_consumers)
]
# 启动调度器
scheduler(producers, consumers, items_per_producer)
print("所有生产消费任务完成!")
上面的代码实际上是串行运行的,因为Python的生成器协程(非asyncio
)在单个线程中通过显式调度交替执行,而不是真正的并发。
调度器随机选择要执行的生产者或消费者,当队列为空时,消费者会暂停,当生产者完成所有项目后会停止。
2、asnyc/await 协程
生成器协程不能实现真正的并发,而且运行效率低,Python 3.5(2015年9月)首次支持async/await
语法,但此时协程仍需通过asyncio.coroutine
装饰器声明;Python 3.7+(2018年6月)进一步优化,协程可通过async def
直接声明(无需装饰器),并改进了事件循环(asyncio
库)和性能。
当前我的python版本是3.11,所以无须担心可以直接使用最新版本的协程语法。
基本语法
使用 async def
定义协程函数,内部通过 await
挂起执行:
async def fetch_data():
print("开始获取数据")
await asyncio.sleep(1) # 模拟 I/O 操作
print("数据获取完成")
return "data"
运行协程
协程必须通过事件循环运行:
import asyncio
async def fetch_data():
print("开始获取数据")
await asyncio.sleep(1) # 模拟 I/O 操作
print("数据获取完成")
return "data"
async def run():
result = await fetch_data()
print("结果:", result)
if __name__ == '__main__':
asyncio.run(run())
运行结果:
开始获取数据
(等待一秒)
数据获取完成
结果: data
多生产者多消费者
现在使用async/await版本的协程重新实现多生产者多消费者
import asyncio
from asyncio import Queue
async def producer(name, queue: Queue, max_items):
"""异步生产者"""
for i in range(max_items):
item = f"{name}-{i}"
print(f"生产者 {name} 生产了 {item}")
await queue.put(item)
await asyncio.sleep(1) # 模拟异步生产
async def consumer(name, queue: Queue):
"""异步消费者"""
while True:
item = await queue.get()
print(f"消费者 {name} 消费了 {item}")
queue.task_done()
await asyncio.sleep(1) # 模拟异步消费
async def run():
queue: Queue = asyncio.Queue(maxsize=3) # 限制队列大小实现背压
producers = [
asyncio.create_task(producer(f"P-{i}", queue, 5))
for i in range(3)
]
consumers = [
asyncio.create_task(consumer(f"C-{i}", queue))
for i in range(2)
]
await asyncio.gather(*producers) # 等待所有生产者完成
await queue.join() # 等待队列清空
for c in consumers: # 取消消费者任务
c.cancel()
if __name__ == '__main__':
asyncio.run(run())
可以看到这段代码比生成器版本的写成要精简的多,而且代码也更清晰。代码中使用了asyncio的很多方法,这之后再聊它们的作用。
上述代码可以尝试调整队列的大小以及生产者、消费者的数量以观察供大于求、供小于求等各种情况下的表现。
三、asyncio
asyncio
是 Python 用于编写并发代码的标准库,使用 async/await 语法。它特别适合 I/O 密集型和高并发的网络应用。
1、基本用法
运行协程(同步方法中运行协程):
async def main():
print('Hello')
await asyncio.sleep(1)
print('World')
# Python 3.7+
asyncio.run(main())
并发运行任务:
async def task(name, seconds):
print(f"{name} starting")
await asyncio.sleep(seconds)
print(f"{name} finished after {seconds}s")
async def main():
# 方式1: gather
await asyncio.gather(
task("A", 2),
task("B", 1),
task("C", 3)
)
# 方式2: create_task + await
t1 = asyncio.create_task(task("X", 2))
t2 = asyncio.create_task(task("Y", 1))
await t1
await t2
需要特别注意的是:await关键字调用必须在async块中。
2、异步生成器
之前说过可以使用生成器实现协程功能,实际上使用async的协程也可以搭配生成器一起实现异步生成器的功能:
async def async_gen(n):
for i in range(n):
await asyncio.sleep(0.5)
yield i
async def main():
async for item in async_gen(3):
print(item)
3、限制并发数
使用semaphore实现限制并发数:
async def worker(semaphore, name):
async with semaphore:
print(f"{name} started")
await asyncio.sleep(2)
print(f"{name} finished")
async def main():
semaphore = asyncio.Semaphore(3) # 最多3个并发
tasks = [
asyncio.create_task(worker(semaphore, f"Worker-{i}"))
for i in range(10)
]
await asyncio.gather(*tasks)
semaphore在Java中是一种共享锁,通过信号量机制实现并发数量控制,在python中也是类似的功能,详情可以查看:详解AQS七:深入理解信号量机制Semaphore
4、同步阻塞队列:Queue
通过queue: Queue = asyncio.Queue(maxsize=3)
可以创建一个长度为3的同步阻塞队列,它和java中的BlokingQueue
有异曲同工之妙。
put方法:将一个元素放入队列,如果队列已满,就阻塞等待队列空闲。
get方法:从队列中获取并移除一个元素,如果队列为空,就一直阻塞等待直到队列中有元素为止。
task_done方法:高速队列之前get方法取出的任务已经执行完成
join方法:等待队列中的任务全部执行完毕,该方法依赖于task_done方法的执行。
END.
注意:本文归作者所有,未经作者允许,不得转载