异步编程和并发
深入理解 Python 的异步编程模型,对比 JavaScript 的 async/await
📖 学习目标
- 理解 Python 的异步编程模型
- 掌握 async/await 语法
- 理解协程的概念和使用
- 学会使用 asyncio 库
- 理解并发 vs 并行的区别
- 了解 GIL(全局解释器锁)的影响
- 掌握线程和进程的使用
预计学习时间:2-3 天
难度:⭐⭐⭐⭐
1. 异步编程基础
1.1 同步 vs 异步对比
python
import time
# 同步版本
def sync_fetch(url):
"""同步获取数据(阻塞)"""
print(f"Fetching {url}...")
time.sleep(2) # 模拟网络请求
return f"Data from {url}"
def sync_main():
"""同步执行"""
start = time.time()
result1 = sync_fetch("https://api1.com")
result2 = sync_fetch("https://api2.com")
result3 = sync_fetch("https://api3.com")
end = time.time()
print(f"Sync took {end - start:.2f}s") # 约 6 秒
# 异步版本
import asyncio
async def async_fetch(url):
"""异步获取数据(非阻塞)"""
print(f"Fetching {url}...")
await asyncio.sleep(2) # 模拟异步网络请求
return f"Data from {url}"
async def async_main():
"""异步并发执行"""
start = time.time()
# 并发执行三个请求
results = await asyncio.gather(
async_fetch("https://api1.com"),
async_fetch("https://api2.com"),
async_fetch("https://api3.com")
)
end = time.time()
print(f"Async took {end - start:.2f}s") # 约 2 秒
# 运行异步函数
# asyncio.run(async_main())1.2 Python vs JavaScript 异步对比
python
# Python 异步
import asyncio
async def fetch_user(user_id):
await asyncio.sleep(1)
return {"id": user_id, "name": "Alice"}
async def fetch_posts(user_id):
await asyncio.sleep(1)
return [{"id": 1, "title": "Post 1"}]
async def get_user_data(user_id):
"""串行执行"""
user = await fetch_user(user_id)
posts = await fetch_posts(user_id)
return {"user": user, "posts": posts}
async def get_user_data_parallel(user_id):
"""并行执行"""
user, posts = await asyncio.gather(
fetch_user(user_id),
fetch_posts(user_id)
)
return {"user": user, "posts": posts}
# asyncio.run(get_user_data_parallel(1))javascript
// JavaScript 异步(对比)
async function fetchUser(userId) {
await new Promise(resolve => setTimeout(resolve, 1000));
return { id: userId, name: "Alice" };
}
async function fetchPosts(userId) {
await new Promise(resolve => setTimeout(resolve, 1000));
return [{ id: 1, title: "Post 1" }];
}
async function getUserData(userId) {
// 串行执行
const user = await fetchUser(userId);
const posts = await fetchPosts(userId);
return { user, posts };
}
async function getUserDataParallel(userId) {
// 并行执行
const [user, posts] = await Promise.all([
fetchUser(userId),
fetchPosts(userId)
]);
return { user, posts };
}关键差异:
- Python 使用
asyncio.gather(),JS 使用Promise.all() - Python 需要
asyncio.run()来运行,JS 可以直接await(在 async 函数中) - Python 的事件循环需要显式管理,JS 由运行时管理
2. 协程(Coroutines)
2.1 协程基础
python
import asyncio
# 定义协程函数(使用 async def)
async def say_hello():
"""这是一个协程函数"""
print("Hello")
await asyncio.sleep(1)
print("World")
# 调用协程函数返回协程对象
coro = say_hello()
print(type(coro)) # <class 'coroutine'>
# 运行协程
asyncio.run(say_hello())
# 协程必须被 await 或通过事件循环运行
async def main():
await say_hello()
asyncio.run(main())2.2 协程的状态
python
import asyncio
import inspect
async def example_coro():
print("Start")
await asyncio.sleep(1)
print("End")
return "Done"
# 创建协程对象
coro = example_coro()
# 检查协程状态
print(inspect.getcoroutinestate(coro)) # CORO_CREATED
# 运行后会变成 CORO_CLOSED2.3 协程链
python
async def step1():
print("Step 1")
await asyncio.sleep(1)
return "Result 1"
async def step2(prev_result):
print(f"Step 2 with {prev_result}")
await asyncio.sleep(1)
return "Result 2"
async def step3(prev_result):
print(f"Step 3 with {prev_result}")
await asyncio.sleep(1)
return "Final Result"
async def pipeline():
"""协程链式调用"""
result1 = await step1()
result2 = await step2(result1)
result3 = await step3(result2)
return result3
# asyncio.run(pipeline())3. asyncio 核心概念
3.1 事件循环(Event Loop)
python
import asyncio
# 方式1:使用 asyncio.run()(推荐,Python 3.7+)
async def main():
print("Hello")
await asyncio.sleep(1)
print("World")
asyncio.run(main())
# 方式2:手动管理事件循环(了解即可)
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
# 获取当前运行的事件循环
async def get_loop_example():
loop = asyncio.get_running_loop()
print(f"Running on loop: {loop}")3.2 Task(任务)
python
import asyncio
async def task_func(name, delay):
print(f"Task {name} starting")
await asyncio.sleep(delay)
print(f"Task {name} done")
return f"Result from {name}"
async def main():
# 创建 Task
task1 = asyncio.create_task(task_func("A", 2))
task2 = asyncio.create_task(task_func("B", 1))
# Task 立即开始执行(不等待)
print("Tasks created")
# 等待 Task 完成
result1 = await task1
result2 = await task2
print(f"Results: {result1}, {result2}")
# asyncio.run(main())
# 批量创建和等待 Task
async def batch_tasks():
tasks = [
asyncio.create_task(task_func(f"Task-{i}", i))
for i in range(5)
]
results = await asyncio.gather(*tasks)
return results3.3 asyncio.gather() vs asyncio.wait()
python
import asyncio
async def task(n):
await asyncio.sleep(n)
return n
async def gather_example():
"""gather: 按顺序返回结果"""
results = await asyncio.gather(
task(2),
task(1),
task(3)
)
print(results) # [2, 1, 3] 按调用顺序
async def wait_example():
"""wait: 可以控制等待策略"""
tasks = [
asyncio.create_task(task(2)),
asyncio.create_task(task(1)),
asyncio.create_task(task(3))
]
# 等待所有完成
done, pending = await asyncio.wait(tasks)
results = [t.result() for t in done]
# 或者只等待第一个完成
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)3.4 超时控制
python
import asyncio
async def slow_task():
await asyncio.sleep(10)
return "Done"
async def timeout_example():
try:
# 设置3秒超时
result = await asyncio.wait_for(slow_task(), timeout=3.0)
except asyncio.TimeoutError:
print("Task timed out!")
# 或使用上下文管理器
async def timeout_context():
try:
async with asyncio.timeout(3.0): # Python 3.11+
await slow_task()
except TimeoutError:
print("Timed out!")4. 异步 HTTP 请求
4.1 使用 httpx(推荐)
python
import httpx
import asyncio
async def fetch_url(url):
"""异步获取 URL 内容"""
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response.text
async def fetch_multiple():
"""并发获取多个 URL"""
urls = [
"https://api.github.com",
"https://api.github.com/users/github",
"https://api.github.com/repos/python/cpython"
]
async with httpx.AsyncClient() as client:
tasks = [client.get(url) for url in urls]
responses = await asyncio.gather(*tasks)
return [r.json() for r in responses]
# asyncio.run(fetch_multiple())4.2 Dify 中的异步 HTTP 请求
python
# Dify 中实际使用的异步 HTTP 客户端
import httpx
from typing import Optional, Dict, Any
class AsyncHTTPClient:
"""异步 HTTP 客户端"""
def __init__(self, timeout: float = 30.0):
self.timeout = timeout
self.client: Optional[httpx.AsyncClient] = None
async def __aenter__(self):
"""异步上下文管理器入口"""
self.client = httpx.AsyncClient(timeout=self.timeout)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""异步上下文管理器出口"""
if self.client:
await self.client.aclose()
async def post(
self,
url: str,
data: Optional[Dict[str, Any]] = None,
json: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None
) -> Dict[str, Any]:
"""异步 POST 请求"""
response = await self.client.post(
url,
data=data,
json=json,
headers=headers
)
response.raise_for_status()
return response.json()
# 使用
async def call_llm_api():
async with AsyncHTTPClient() as client:
result = await client.post(
"https://api.openai.com/v1/chat/completions",
json={
"model": "gpt-4",
"messages": [{"role": "user", "content": "Hello"}]
},
headers={"Authorization": "Bearer xxx"}
)
return result5. 异步生成器和迭代器
5.1 异步生成器
python
import asyncio
# 异步生成器(使用 async def 和 yield)
async def async_range(n):
"""异步生成器"""
for i in range(n):
await asyncio.sleep(0.1) # 模拟异步操作
yield i
async def consume_async_gen():
"""使用 async for 遍历异步生成器"""
async for num in async_range(5):
print(num)
# asyncio.run(consume_async_gen())
# 流式响应示例(Dify 中的实际应用)
async def stream_llm_response(messages):
"""流式生成 LLM 响应"""
for i in range(10):
await asyncio.sleep(0.1)
chunk = f"Chunk {i}"
yield chunk
async def process_stream():
"""处理流式响应"""
async for chunk in stream_llm_response([]):
print(chunk, end='', flush=True)5.2 异步迭代器
python
class AsyncIterator:
"""异步迭代器"""
def __init__(self, n):
self.n = n
self.current = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.n:
raise StopAsyncIteration
await asyncio.sleep(0.1)
self.current += 1
return self.current - 1
async def use_async_iterator():
async for num in AsyncIterator(5):
print(num)5.3 异步上下文管理器
python
class AsyncDatabaseConnection:
"""异步上下文管理器"""
async def __aenter__(self):
"""异步进入"""
print("Connecting to database...")
await asyncio.sleep(1)
self.connection = "DB Connection"
return self.connection
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""异步退出"""
print("Closing database connection...")
await asyncio.sleep(0.5)
self.connection = None
async def use_async_context():
async with AsyncDatabaseConnection() as conn:
print(f"Using {conn}")
await asyncio.sleep(1)
# asyncio.run(use_async_context())
# 使用 asynccontextmanager
from contextlib import asynccontextmanager
@asynccontextmanager
async def async_resource():
"""使用装饰器创建异步上下文管理器"""
print("Acquiring resource")
await asyncio.sleep(0.5)
resource = "Resource"
try:
yield resource
finally:
print("Releasing resource")
await asyncio.sleep(0.5)
async def use_decorator_context():
async with async_resource() as res:
print(f"Using {res}")6. 并发 vs 并行
6.1 概念区分
python
"""
并发(Concurrency):
- 多个任务交替执行
- 适合 I/O 密集型任务
- Python: asyncio, threading
并行(Parallelism):
- 多个任务同时执行
- 适合 CPU 密集型任务
- Python: multiprocessing
"""
import time
import asyncio
import threading
import multiprocessing
# I/O 密集型任务(适合异步/并发)
async def io_bound_task(n):
"""I/O 密集型:大量等待时间"""
await asyncio.sleep(1) # 模拟网络请求
return n * 2
# CPU 密集型任务(适合多进程/并行)
def cpu_bound_task(n):
"""CPU 密集型:大量计算"""
total = 0
for i in range(10**7):
total += i
return total
# 测试不同方式
async def test_async():
"""异步方式(适合 I/O 密集型)"""
start = time.time()
results = await asyncio.gather(*[io_bound_task(i) for i in range(10)])
print(f"Async: {time.time() - start:.2f}s")
def test_threads():
"""线程方式(GIL 限制,不适合 CPU 密集型)"""
start = time.time()
threads = []
for i in range(4):
t = threading.Thread(target=cpu_bound_task, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"Threads: {time.time() - start:.2f}s")
def test_processes():
"""多进程方式(适合 CPU 密集型)"""
start = time.time()
with multiprocessing.Pool(4) as pool:
results = pool.map(cpu_bound_task, range(4))
print(f"Processes: {time.time() - start:.2f}s")6.2 GIL(全局解释器锁)
python
"""
GIL(Global Interpreter Lock):
- CPython 的特性,同一时刻只有一个线程执行 Python 字节码
- 影响:多线程不能真正并行执行 CPU 密集型任务
- 解决方案:
1. 使用 multiprocessing(多进程)
2. 使用 C 扩展(绕过 GIL)
3. 使用其他 Python 实现(如 Jython, IronPython)
"""
import threading
import time
counter = 0
lock = threading.Lock()
def increment_with_gil():
"""有 GIL 保护,但性能受限"""
global counter
for _ in range(1000000):
with lock:
counter += 1
def increment_no_protection():
"""没有保护,会有竞态条件"""
global counter
for _ in range(1000000):
counter += 1 # 不是原子操作7. 实际应用场景
7.1 并发数据库查询
python
import asyncio
from typing import List, Dict, Any
# 假设使用异步数据库驱动
async def fetch_user(db, user_id: int) -> Dict[str, Any]:
"""异步获取用户"""
# await db.execute(...)
await asyncio.sleep(0.1)
return {"id": user_id, "name": f"User {user_id}"}
async def fetch_posts(db, user_id: int) -> List[Dict[str, Any]]:
"""异步获取帖子"""
await asyncio.sleep(0.1)
return [{"id": 1, "user_id": user_id, "title": "Post 1"}]
async def fetch_comments(db, post_id: int) -> List[Dict[str, Any]]:
"""异步获取评论"""
await asyncio.sleep(0.1)
return [{"id": 1, "post_id": post_id, "content": "Comment 1"}]
async def get_user_with_data(user_id: int):
"""并发获取用户的所有数据"""
db = None # 数据库连接
# 并发获取用户和帖子
user, posts = await asyncio.gather(
fetch_user(db, user_id),
fetch_posts(db, user_id)
)
# 并发获取所有帖子的评论
comment_tasks = [
fetch_comments(db, post["id"])
for post in posts
]
comments_list = await asyncio.gather(*comment_tasks)
# 组装数据
for post, comments in zip(posts, comments_list):
post["comments"] = comments
return {
"user": user,
"posts": posts
}7.2 Dify 中的异步应用
python
# Dify 中的实际异步应用场景
# 1. 流式模型调用
async def invoke_model_stream(model_config, messages):
"""异步流式调用模型"""
async for chunk in model.astream(messages):
yield chunk
# 2. 并发处理文档
async def process_documents_batch(documents):
"""并发处理多个文档"""
tasks = [
process_single_document(doc)
for doc in documents
]
return await asyncio.gather(*tasks)
# 3. 异步向量化
async def batch_embedding(texts, batch_size=10):
"""批量异步向量化"""
batches = [
texts[i:i+batch_size]
for i in range(0, len(texts), batch_size)
]
tasks = [
embedding_model.aembed(batch)
for batch in batches
]
results = await asyncio.gather(*tasks)
# 展平结果
return [emb for batch in results for emb in batch]
# 4. 异步工作流节点执行
async def execute_workflow_nodes(nodes):
"""并发执行独立的工作流节点"""
independent_nodes = get_independent_nodes(nodes)
tasks = [
execute_node(node)
for node in independent_nodes
]
return await asyncio.gather(*tasks)8. 实践项目
项目 1:异步网络爬虫
python
import asyncio
import httpx
from typing import List
async def fetch_page(client: httpx.AsyncClient, url: str) -> str:
"""获取单个页面"""
response = await client.get(url)
return response.text
async def crawl_website(urls: List[str]) -> List[str]:
"""并发爬取多个页面"""
async with httpx.AsyncClient() as client:
tasks = [fetch_page(client, url) for url in urls]
return await asyncio.gather(*tasks)
# 使用
urls = [
"https://example.com/page1",
"https://example.com/page2",
"https://example.com/page3"
]
# results = asyncio.run(crawl_website(urls))项目 2:实现异步队列处理
python
import asyncio
from asyncio import Queue
async def producer(queue: Queue, n: int):
"""生产者"""
for i in range(n):
await asyncio.sleep(0.1)
await queue.put(i)
print(f"Produced {i}")
await queue.put(None) # 结束信号
async def consumer(queue: Queue, name: str):
"""消费者"""
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
await asyncio.sleep(0.2)
print(f"{name} consumed {item}")
queue.task_done()
async def main():
queue = Queue()
# 启动生产者和消费者
await asyncio.gather(
producer(queue, 10),
consumer(queue, "Consumer-1"),
consumer(queue, "Consumer-2")
)
# asyncio.run(main())9. 思考题
- 为什么异步编程能提高 I/O 密集型任务的性能?
- Python 的 async/await 和 JavaScript 的有什么区别?
- 什么时候应该使用异步,什么时候使用多线程,什么时候使用多进程?
- GIL 对异步编程有影响吗?为什么?
- 如何在 Flask(同步框架)中使用异步代码?
10. 检查清单
- [ ] 理解异步编程的概念和优势
- [ ] 掌握 async/await 语法
- [ ] 能够使用 asyncio 编写异步程序
- [ ] 理解并发和并行的区别
- [ ] 了解 GIL 的影响
- [ ] 能够选择合适的并发方案