Skip to content

异步编程和并发

深入理解 Python 的异步编程模型,对比 JavaScript 的 async/await

📖 学习目标

  • 理解 Python 的异步编程模型
  • 掌握 async/await 语法
  • 理解协程的概念和使用
  • 学会使用 asyncio 库
  • 理解并发 vs 并行的区别
  • 了解 GIL(全局解释器锁)的影响
  • 掌握线程和进程的使用

预计学习时间:2-3 天
难度:⭐⭐⭐⭐

1. 异步编程基础

1.1 同步 vs 异步对比

python
import time

# 同步版本
def sync_fetch(url):
    """同步获取数据(阻塞)"""
    print(f"Fetching {url}...")
    time.sleep(2)  # 模拟网络请求
    return f"Data from {url}"

def sync_main():
    """同步执行"""
    start = time.time()
    
    result1 = sync_fetch("https://api1.com")
    result2 = sync_fetch("https://api2.com")
    result3 = sync_fetch("https://api3.com")
    
    end = time.time()
    print(f"Sync took {end - start:.2f}s")  # 约 6 秒

# 异步版本
import asyncio

async def async_fetch(url):
    """异步获取数据(非阻塞)"""
    print(f"Fetching {url}...")
    await asyncio.sleep(2)  # 模拟异步网络请求
    return f"Data from {url}"

async def async_main():
    """异步并发执行"""
    start = time.time()
    
    # 并发执行三个请求
    results = await asyncio.gather(
        async_fetch("https://api1.com"),
        async_fetch("https://api2.com"),
        async_fetch("https://api3.com")
    )
    
    end = time.time()
    print(f"Async took {end - start:.2f}s")  # 约 2 秒

# 运行异步函数
# asyncio.run(async_main())

1.2 Python vs JavaScript 异步对比

python
# Python 异步
import asyncio

async def fetch_user(user_id):
    await asyncio.sleep(1)
    return {"id": user_id, "name": "Alice"}

async def fetch_posts(user_id):
    await asyncio.sleep(1)
    return [{"id": 1, "title": "Post 1"}]

async def get_user_data(user_id):
    """串行执行"""
    user = await fetch_user(user_id)
    posts = await fetch_posts(user_id)
    return {"user": user, "posts": posts}

async def get_user_data_parallel(user_id):
    """并行执行"""
    user, posts = await asyncio.gather(
        fetch_user(user_id),
        fetch_posts(user_id)
    )
    return {"user": user, "posts": posts}

# asyncio.run(get_user_data_parallel(1))
javascript
// JavaScript 异步(对比)
async function fetchUser(userId) {
  await new Promise(resolve => setTimeout(resolve, 1000));
  return { id: userId, name: "Alice" };
}

async function fetchPosts(userId) {
  await new Promise(resolve => setTimeout(resolve, 1000));
  return [{ id: 1, title: "Post 1" }];
}

async function getUserData(userId) {
  // 串行执行
  const user = await fetchUser(userId);
  const posts = await fetchPosts(userId);
  return { user, posts };
}

async function getUserDataParallel(userId) {
  // 并行执行
  const [user, posts] = await Promise.all([
    fetchUser(userId),
    fetchPosts(userId)
  ]);
  return { user, posts };
}

关键差异

  • Python 使用 asyncio.gather(),JS 使用 Promise.all()
  • Python 需要 asyncio.run() 来运行,JS 可以直接 await(在 async 函数中)
  • Python 的事件循环需要显式管理,JS 由运行时管理

2. 协程(Coroutines)

2.1 协程基础

python
import asyncio

# 定义协程函数(使用 async def)
async def say_hello():
    """这是一个协程函数"""
    print("Hello")
    await asyncio.sleep(1)
    print("World")

# 调用协程函数返回协程对象
coro = say_hello()
print(type(coro))  # <class 'coroutine'>

# 运行协程
asyncio.run(say_hello())

# 协程必须被 await 或通过事件循环运行
async def main():
    await say_hello()

asyncio.run(main())

2.2 协程的状态

python
import asyncio
import inspect

async def example_coro():
    print("Start")
    await asyncio.sleep(1)
    print("End")
    return "Done"

# 创建协程对象
coro = example_coro()

# 检查协程状态
print(inspect.getcoroutinestate(coro))  # CORO_CREATED

# 运行后会变成 CORO_CLOSED

2.3 协程链

python
async def step1():
    print("Step 1")
    await asyncio.sleep(1)
    return "Result 1"

async def step2(prev_result):
    print(f"Step 2 with {prev_result}")
    await asyncio.sleep(1)
    return "Result 2"

async def step3(prev_result):
    print(f"Step 3 with {prev_result}")
    await asyncio.sleep(1)
    return "Final Result"

async def pipeline():
    """协程链式调用"""
    result1 = await step1()
    result2 = await step2(result1)
    result3 = await step3(result2)
    return result3

# asyncio.run(pipeline())

3. asyncio 核心概念

3.1 事件循环(Event Loop)

python
import asyncio

# 方式1:使用 asyncio.run()(推荐,Python 3.7+)
async def main():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

asyncio.run(main())

# 方式2:手动管理事件循环(了解即可)
loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

# 获取当前运行的事件循环
async def get_loop_example():
    loop = asyncio.get_running_loop()
    print(f"Running on loop: {loop}")

3.2 Task(任务)

python
import asyncio

async def task_func(name, delay):
    print(f"Task {name} starting")
    await asyncio.sleep(delay)
    print(f"Task {name} done")
    return f"Result from {name}"

async def main():
    # 创建 Task
    task1 = asyncio.create_task(task_func("A", 2))
    task2 = asyncio.create_task(task_func("B", 1))
    
    # Task 立即开始执行(不等待)
    print("Tasks created")
    
    # 等待 Task 完成
    result1 = await task1
    result2 = await task2
    
    print(f"Results: {result1}, {result2}")

# asyncio.run(main())

# 批量创建和等待 Task
async def batch_tasks():
    tasks = [
        asyncio.create_task(task_func(f"Task-{i}", i))
        for i in range(5)
    ]
    results = await asyncio.gather(*tasks)
    return results

3.3 asyncio.gather() vs asyncio.wait()

python
import asyncio

async def task(n):
    await asyncio.sleep(n)
    return n

async def gather_example():
    """gather: 按顺序返回结果"""
    results = await asyncio.gather(
        task(2),
        task(1),
        task(3)
    )
    print(results)  # [2, 1, 3] 按调用顺序

async def wait_example():
    """wait: 可以控制等待策略"""
    tasks = [
        asyncio.create_task(task(2)),
        asyncio.create_task(task(1)),
        asyncio.create_task(task(3))
    ]
    
    # 等待所有完成
    done, pending = await asyncio.wait(tasks)
    results = [t.result() for t in done]
    
    # 或者只等待第一个完成
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )

3.4 超时控制

python
import asyncio

async def slow_task():
    await asyncio.sleep(10)
    return "Done"

async def timeout_example():
    try:
        # 设置3秒超时
        result = await asyncio.wait_for(slow_task(), timeout=3.0)
    except asyncio.TimeoutError:
        print("Task timed out!")

# 或使用上下文管理器
async def timeout_context():
    try:
        async with asyncio.timeout(3.0):  # Python 3.11+
            await slow_task()
    except TimeoutError:
        print("Timed out!")

4. 异步 HTTP 请求

4.1 使用 httpx(推荐)

python
import httpx
import asyncio

async def fetch_url(url):
    """异步获取 URL 内容"""
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
        return response.text

async def fetch_multiple():
    """并发获取多个 URL"""
    urls = [
        "https://api.github.com",
        "https://api.github.com/users/github",
        "https://api.github.com/repos/python/cpython"
    ]
    
    async with httpx.AsyncClient() as client:
        tasks = [client.get(url) for url in urls]
        responses = await asyncio.gather(*tasks)
        return [r.json() for r in responses]

# asyncio.run(fetch_multiple())

4.2 Dify 中的异步 HTTP 请求

python
# Dify 中实际使用的异步 HTTP 客户端
import httpx
from typing import Optional, Dict, Any

class AsyncHTTPClient:
    """异步 HTTP 客户端"""
    
    def __init__(self, timeout: float = 30.0):
        self.timeout = timeout
        self.client: Optional[httpx.AsyncClient] = None
    
    async def __aenter__(self):
        """异步上下文管理器入口"""
        self.client = httpx.AsyncClient(timeout=self.timeout)
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """异步上下文管理器出口"""
        if self.client:
            await self.client.aclose()
    
    async def post(
        self,
        url: str,
        data: Optional[Dict[str, Any]] = None,
        json: Optional[Dict[str, Any]] = None,
        headers: Optional[Dict[str, str]] = None
    ) -> Dict[str, Any]:
        """异步 POST 请求"""
        response = await self.client.post(
            url,
            data=data,
            json=json,
            headers=headers
        )
        response.raise_for_status()
        return response.json()

# 使用
async def call_llm_api():
    async with AsyncHTTPClient() as client:
        result = await client.post(
            "https://api.openai.com/v1/chat/completions",
            json={
                "model": "gpt-4",
                "messages": [{"role": "user", "content": "Hello"}]
            },
            headers={"Authorization": "Bearer xxx"}
        )
        return result

5. 异步生成器和迭代器

5.1 异步生成器

python
import asyncio

# 异步生成器(使用 async def 和 yield)
async def async_range(n):
    """异步生成器"""
    for i in range(n):
        await asyncio.sleep(0.1)  # 模拟异步操作
        yield i

async def consume_async_gen():
    """使用 async for 遍历异步生成器"""
    async for num in async_range(5):
        print(num)

# asyncio.run(consume_async_gen())

# 流式响应示例(Dify 中的实际应用)
async def stream_llm_response(messages):
    """流式生成 LLM 响应"""
    for i in range(10):
        await asyncio.sleep(0.1)
        chunk = f"Chunk {i}"
        yield chunk

async def process_stream():
    """处理流式响应"""
    async for chunk in stream_llm_response([]):
        print(chunk, end='', flush=True)

5.2 异步迭代器

python
class AsyncIterator:
    """异步迭代器"""
    
    def __init__(self, n):
        self.n = n
        self.current = 0
    
    def __aiter__(self):
        return self
    
    async def __anext__(self):
        if self.current >= self.n:
            raise StopAsyncIteration
        
        await asyncio.sleep(0.1)
        self.current += 1
        return self.current - 1

async def use_async_iterator():
    async for num in AsyncIterator(5):
        print(num)

5.3 异步上下文管理器

python
class AsyncDatabaseConnection:
    """异步上下文管理器"""
    
    async def __aenter__(self):
        """异步进入"""
        print("Connecting to database...")
        await asyncio.sleep(1)
        self.connection = "DB Connection"
        return self.connection
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """异步退出"""
        print("Closing database connection...")
        await asyncio.sleep(0.5)
        self.connection = None

async def use_async_context():
    async with AsyncDatabaseConnection() as conn:
        print(f"Using {conn}")
        await asyncio.sleep(1)

# asyncio.run(use_async_context())

# 使用 asynccontextmanager
from contextlib import asynccontextmanager

@asynccontextmanager
async def async_resource():
    """使用装饰器创建异步上下文管理器"""
    print("Acquiring resource")
    await asyncio.sleep(0.5)
    resource = "Resource"
    try:
        yield resource
    finally:
        print("Releasing resource")
        await asyncio.sleep(0.5)

async def use_decorator_context():
    async with async_resource() as res:
        print(f"Using {res}")

6. 并发 vs 并行

6.1 概念区分

python
"""
并发(Concurrency):
- 多个任务交替执行
- 适合 I/O 密集型任务
- Python: asyncio, threading

并行(Parallelism):
- 多个任务同时执行
- 适合 CPU 密集型任务
- Python: multiprocessing
"""

import time
import asyncio
import threading
import multiprocessing

# I/O 密集型任务(适合异步/并发)
async def io_bound_task(n):
    """I/O 密集型:大量等待时间"""
    await asyncio.sleep(1)  # 模拟网络请求
    return n * 2

# CPU 密集型任务(适合多进程/并行)
def cpu_bound_task(n):
    """CPU 密集型:大量计算"""
    total = 0
    for i in range(10**7):
        total += i
    return total

# 测试不同方式
async def test_async():
    """异步方式(适合 I/O 密集型)"""
    start = time.time()
    results = await asyncio.gather(*[io_bound_task(i) for i in range(10)])
    print(f"Async: {time.time() - start:.2f}s")

def test_threads():
    """线程方式(GIL 限制,不适合 CPU 密集型)"""
    start = time.time()
    threads = []
    for i in range(4):
        t = threading.Thread(target=cpu_bound_task, args=(i,))
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    print(f"Threads: {time.time() - start:.2f}s")

def test_processes():
    """多进程方式(适合 CPU 密集型)"""
    start = time.time()
    with multiprocessing.Pool(4) as pool:
        results = pool.map(cpu_bound_task, range(4))
    print(f"Processes: {time.time() - start:.2f}s")

6.2 GIL(全局解释器锁)

python
"""
GIL(Global Interpreter Lock):
- CPython 的特性,同一时刻只有一个线程执行 Python 字节码
- 影响:多线程不能真正并行执行 CPU 密集型任务
- 解决方案:
  1. 使用 multiprocessing(多进程)
  2. 使用 C 扩展(绕过 GIL)
  3. 使用其他 Python 实现(如 Jython, IronPython)
"""

import threading
import time

counter = 0
lock = threading.Lock()

def increment_with_gil():
    """有 GIL 保护,但性能受限"""
    global counter
    for _ in range(1000000):
        with lock:
            counter += 1

def increment_no_protection():
    """没有保护,会有竞态条件"""
    global counter
    for _ in range(1000000):
        counter += 1  # 不是原子操作

7. 实际应用场景

7.1 并发数据库查询

python
import asyncio
from typing import List, Dict, Any

# 假设使用异步数据库驱动
async def fetch_user(db, user_id: int) -> Dict[str, Any]:
    """异步获取用户"""
    # await db.execute(...)
    await asyncio.sleep(0.1)
    return {"id": user_id, "name": f"User {user_id}"}

async def fetch_posts(db, user_id: int) -> List[Dict[str, Any]]:
    """异步获取帖子"""
    await asyncio.sleep(0.1)
    return [{"id": 1, "user_id": user_id, "title": "Post 1"}]

async def fetch_comments(db, post_id: int) -> List[Dict[str, Any]]:
    """异步获取评论"""
    await asyncio.sleep(0.1)
    return [{"id": 1, "post_id": post_id, "content": "Comment 1"}]

async def get_user_with_data(user_id: int):
    """并发获取用户的所有数据"""
    db = None  # 数据库连接
    
    # 并发获取用户和帖子
    user, posts = await asyncio.gather(
        fetch_user(db, user_id),
        fetch_posts(db, user_id)
    )
    
    # 并发获取所有帖子的评论
    comment_tasks = [
        fetch_comments(db, post["id"])
        for post in posts
    ]
    comments_list = await asyncio.gather(*comment_tasks)
    
    # 组装数据
    for post, comments in zip(posts, comments_list):
        post["comments"] = comments
    
    return {
        "user": user,
        "posts": posts
    }

7.2 Dify 中的异步应用

python
# Dify 中的实际异步应用场景

# 1. 流式模型调用
async def invoke_model_stream(model_config, messages):
    """异步流式调用模型"""
    async for chunk in model.astream(messages):
        yield chunk

# 2. 并发处理文档
async def process_documents_batch(documents):
    """并发处理多个文档"""
    tasks = [
        process_single_document(doc)
        for doc in documents
    ]
    return await asyncio.gather(*tasks)

# 3. 异步向量化
async def batch_embedding(texts, batch_size=10):
    """批量异步向量化"""
    batches = [
        texts[i:i+batch_size]
        for i in range(0, len(texts), batch_size)
    ]
    
    tasks = [
        embedding_model.aembed(batch)
        for batch in batches
    ]
    results = await asyncio.gather(*tasks)
    
    # 展平结果
    return [emb for batch in results for emb in batch]

# 4. 异步工作流节点执行
async def execute_workflow_nodes(nodes):
    """并发执行独立的工作流节点"""
    independent_nodes = get_independent_nodes(nodes)
    
    tasks = [
        execute_node(node)
        for node in independent_nodes
    ]
    
    return await asyncio.gather(*tasks)

8. 实践项目

项目 1:异步网络爬虫

python
import asyncio
import httpx
from typing import List

async def fetch_page(client: httpx.AsyncClient, url: str) -> str:
    """获取单个页面"""
    response = await client.get(url)
    return response.text

async def crawl_website(urls: List[str]) -> List[str]:
    """并发爬取多个页面"""
    async with httpx.AsyncClient() as client:
        tasks = [fetch_page(client, url) for url in urls]
        return await asyncio.gather(*tasks)

# 使用
urls = [
    "https://example.com/page1",
    "https://example.com/page2",
    "https://example.com/page3"
]
# results = asyncio.run(crawl_website(urls))

项目 2:实现异步队列处理

python
import asyncio
from asyncio import Queue

async def producer(queue: Queue, n: int):
    """生产者"""
    for i in range(n):
        await asyncio.sleep(0.1)
        await queue.put(i)
        print(f"Produced {i}")
    await queue.put(None)  # 结束信号

async def consumer(queue: Queue, name: str):
    """消费者"""
    while True:
        item = await queue.get()
        if item is None:
            queue.task_done()
            break
        await asyncio.sleep(0.2)
        print(f"{name} consumed {item}")
        queue.task_done()

async def main():
    queue = Queue()
    
    # 启动生产者和消费者
    await asyncio.gather(
        producer(queue, 10),
        consumer(queue, "Consumer-1"),
        consumer(queue, "Consumer-2")
    )

# asyncio.run(main())

9. 思考题

  1. 为什么异步编程能提高 I/O 密集型任务的性能?
  2. Python 的 async/await 和 JavaScript 的有什么区别?
  3. 什么时候应该使用异步,什么时候使用多线程,什么时候使用多进程?
  4. GIL 对异步编程有影响吗?为什么?
  5. 如何在 Flask(同步框架)中使用异步代码?

10. 检查清单

  • [ ] 理解异步编程的概念和优势
  • [ ] 掌握 async/await 语法
  • [ ] 能够使用 asyncio 编写异步程序
  • [ ] 理解并发和并行的区别
  • [ ] 了解 GIL 的影响
  • [ ] 能够选择合适的并发方案

11. 下一步

12. 参考资源