Python 异步编程的 9 个级别

对于初学者来说,异步编程似乎既复杂又令人生畏。

但它却是你可以添加到 Python 工具包中的最强大的工具之一。

想象一下,你编写的代码在等待响应时永远不会闲置–你的程序会变得更快、反应更灵敏,并能同时处理多个任务。

在本文中,我将带你逐步了解从基础到高级并发技术的 9 个层次。无论您是刚接触 async 还是希望提高自己的技能,本指南都将为您提供实用的知识和示例,帮助您掌握 async Python 编程。

第 0 层:了解异步编程的必要性

考虑一个从多个网站获取数据的脚本。在同步编程中,每个请求都会阻塞程序,直到请求完成:

import requests
import time

# The urls list could be much longer
urls = ["http://example.com",
        "http://example.org",
        "http://example.net/",]

start_time = time.time()

for url in urls:
    response = requests.get(url)
    print(response.status_code)

print(f"Sync code cost {time.time() - start_time:.2f} seconds")
# Sync code cost 0.64 seconds

上述代码逐个同步处理了 3 个 URL–按顺序处理每个 URL 并转到下一个,直到当前 URL 处理完毕。

总共耗时 0.64 秒。

看起来可以接受?

试想一下,10 个 URL 每个需要等待 3 秒钟–最终需要 30 秒的处理时间。更不用说 100 个 URL、100000 个 URL 等等。程序将非常耗时。

 

元素周期表

这种情况,即所谓的 I/O 受限情况,就是异步编程的表演时间。

import aiohttp
import asyncio
import time

async def fetch_url(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            print(f"Status: {response.status}")

async def main():
    urls = ["http://example.com",
            "http://example.org",
            "http://example.net/",]
    start_time = time.time()
    await asyncio.gather(*(fetch_url(url) for url in urls))
    print(f"Async code cost {time.time() - start_time:.2f} seconds")

asyncio.run(main())
# Async code cost 0.22 seconds

以上是实现相同任务的异步版本代码。它只花了 0.22 秒!

为什么以及如何做到这一点?

因为通过使用异步编码技术,可以同时运行多个任务(在本例中,同时触发所有请求),从而大幅缩短等待时间。

如果你还不能完全理解上面的异步代码,不用担心,现在让我们深入学习 Python 异步技术。

第 1 层:了解事件循环

Python 异步编程的核心是事件循环。

我们可以将它视为主调度器,在不暂停整个程序的情况下协调任务的执行,从而使异步魔法发挥作用。

在引擎盖下,事件循环使非阻塞执行成为可能,这意味着当一个任务在等待时(例如等待耗时的 I/O 操作),其他任务可以继续运行。

非阻塞是异步编程的核心优势。它与同步编程中的阻塞操作形成鲜明对比,在同步编程中,整个程序必须等待一个任务完成后才能进入下一个任务。而这正是不必要的时间成本的根源。

Python 的 asyncio 模块提供了一种实现事件循环的简单方法。事件循环管理每个 coroutine(coroutine 是一种特殊函数,可以暂停和恢复,从而实现无阻塞操作)的执行和恢复时间,确保无阻塞操作。

空谈误国,让我们看看代码吧:

import asyncio

async def task_1():
    print("Starting task 1")
    await asyncio.sleep(2) # simulate a slow I/O operation
    print("Task 1 done")

async def task_2():
    print("Starting task 2")
    await asyncio.sleep(1) # simulate another slow I/O operation
    print("Task 2 done")

async def main():
    await asyncio.gather(task_1(), task_2())

asyncio.run(main())
# Starting task 1
# Starting task 2
# Task 2 done
# Task 1 done

在上述代码中,asyncio.run(main()) 启动了事件循环,该循环管理着 coroutines task_1task_2。事件循环同时运行它们,允许task_2task_1 完成之前启动并完成。

这就是为什么异步程序要快得多的原因。事件循环从不等待任何人。如果有慢速操作,它就会开始做其他事情,然后再回来处理慢速操作。它会一直为你努力工作)

但是,事件循环如何知道何时停止当前的例程并跳转到另一个例程呢?

这就是 async await 关键字。

第二级:熟练使用 async 和 await

在异步世界中,有两个 Python 关键字随处可见–async await

  • async 用于定义一个 coroutine,它是一个可以暂停和恢复而不阻塞其他操作的函数。当你用 async 定义一个函数时,就意味着它可以使用 await 将控制权交还给事件循环。
  • await 用于在 async 函数中暂停执行,直到被等待的 coroutine 完成。使用 await 时,它会告诉事件循环当前的例行程序正在等待某个结果,从而允许事件循环在此期间运行其他例行程序。

因此,有了 async await 关键字,事件循环就能知道如何正确处理 coroutine。

事件循环通过暂停当前程序并继续执行其他任务来协调等待调用。这确保了对多个例行程序的有效管理,使异步编程成为处理 I/O 绑定和高延迟操作的理想选择。从本质上讲,async 定义了可以发生异步行为的位置,而 await 则规定了何时应将控制权交还给事件循环,从而实现高效的多任务处理。

第 3 级:使用 asyncio

像大师一样管理正则表达式 你已经看过前面使用 asyncio 模块的示例。从本质上讲,它是 Python 中实现异步程序的核心模块。

让我们回到之前的例子,进行更深入的探索:

import asyncio

async def task_1():
    print("Task 1 started")
    await asyncio.sleep(2)
    print("Task 1 finished")
    return "Result 1"

async def task_2():
    print("Task 2 started")
    await asyncio.sleep(1)
    print("Task 2 finished")
    return "Result 2"

async def main():
    await asyncio.gather(task_1(), task_2())

asyncio.run(main())
# Task 1 started
# Task 2 started
# Task 2 finished
# Task 1 finished

asyncio.gather() 是并发运行多个任务(coroutines)的常用方法。

它简单明了,但如果需要对每个任务进行更多单独控制。可以通过 asyncio.create_task() 对它们进行显式管理:

import asyncio

async def task_1():
    print("Task 1 started")
    await asyncio.sleep(2)
    print("Task 1 finished")
    return "Result 1"

async def task_2():
    print("Task 2 started")
    await asyncio.sleep(1)
    print("Task 2 finished")
    return "Result 2"

async def main():
    t1 = asyncio.create_task(task_1())
    t2 = asyncio.create_task(task_2())

    # Wait for both tasks to finish
    await t1
    await t2

asyncio.run(main())
# Task 1 started
# Task 2 started
# Task 2 finished
# Task 1 finished

这两种方法的结果是一样的。不过,在以第二种方式执行例行程序时,可以应用更多的个性化控制,例如在任务 1 完成前取消它。

第 4 层:轻松取消过长的异步任务

等待每个 coroutine 结束并不一定是最好的解决方案。在某些情况下,你可能想直接取消过长的任务。可以直观地通过 cancel() 方法来实现:

import asyncio

async def task_1():
    print("Task 1 started")
    try:
        await asyncio.sleep(2)
    except asyncio.CancelledError:
        print("Task 1 was cancelled")
        raise
    print("Task 1 finished")
    return "Result 1"

async def task_2():
    print("Task 2 started")
    await asyncio.sleep(1)
    print("Task 2 finished")
    return "Result 2"

async def main():
    t1 = asyncio.create_task(task_1())
    t2 = asyncio.create_task(task_2())

    # Wait for Task 2 to finish
    await t2

    # Cancel Task 1 before it finishes
    t1.cancel()

    # Wait for Task 1 to handle the cancellation
    try:
        await t1
    except asyncio.CancelledError:
        print("Handled cancellation of Task 1")

asyncio.run(main())
# Task 1 started
# Task 2 started
# Task 2 finished
# Task 1 was cancelled
# Handled cancellation of Task 1

如上图所示,任务 1 还未完成就被取消了。

注意:任务 1 是在 asyncio.create_task(task_1()) 方法执行后立即启动的。await t1 是为了完成任务 1,因此调用 t1.cancel() 的正确时间是在这两个命令之间。

第 5 层:使用 asyncio.wait_for() 处理超时任务

也许在某些情况下取消一个 coroutine 太粗暴了,我们应该给每个 coroutine 足够的等待时间来处理它,以免它变得太长。

是的,超时限制正是我们所需要的,asyncio.wait_for() 方法允许设置一个完成 coroutine 的最大时间限制。

import asyncio

async def slow_task():
    await asyncio.sleep(5)
    return "Task finished"

async def main():
    try:
        result = await asyncio.wait_for(slow_task(), timeout=2)
        print(result)
    except asyncio.TimeoutError:
        print("Task timed out!")

asyncio.run(main())
# Task timed out!

如上例所示,我们应用了 asyncio.wait_for() 方法,并将最长等待时间设置为 2 秒。慢速任务由于耗时 5 秒而超时。

第 6 层:使用 asyncio.Semaphore 限制并发性,防止资源过载

异步程序并不总是完美无缺的。有时甚至很危险。

比如,如果你同时运行太多的例程,相对资源就会被过度使用,你的服务器就会卡住。

这就是你需要了解 asyncio.Semaphore 对象的原因。它有助于限制同时运行的并发任务数量,这在访问共享资源或无法处理无限制同时请求的外部服务时尤为重要。

例如,下面的程序就应用了 asyncio.Semaphore() 技巧,限制最多只能有 5 个程序同时运行:

import asyncio

semaphore = asyncio.Semaphore(5)

async def limited_task(n):
    async with semaphore:
        print(f'Task {n} started')
        await asyncio.sleep(1)
        print(f'Task {n} finished')

async def main():
    tasks = [limited_task(i) for i in range(10)]
    await asyncio.gather(*tasks)

asyncio.run(main())
# Task 0 started
# Task 1 started
# Task 2 started
# Task 3 started
# Task 4 started
# Task 0 finished
# Task 1 finished
# Task 2 finished
# Task 3 finished
# Task 4 finished
# Task 5 started
# Task 6 started
# Task 7 started
# Task 8 started
# Task 9 started
# Task 5 finished
# Task 6 finished
# Task 7 finished
# Task 8 finished
# Task 9 finished

根据结果,我们可以清楚地看到,前 5 个任务先同时执行,然后再执行后 5 个任务。

这种对资源使用的控制看似微妙,却能在大规模生产环境中避免意外问题的发生。

第七层:异步 Python 代码的错误处理

正确的错误处理是代码健壮的关键。

在大多数情况下,异步程序使用与同步代码相同的错误处理方法。

只是对于某些异常,您需要了解并使用特定的异步版本错误对象,如 asyncio.CancelledError。(有关 async Python 错误的完整列表请参见此处。)

除了特定的错误外,还有一个错误易发点值得一提:

当使用 asyncio.create_task() 并发运行任务时,我们应该注意错误处理策略。由于任务是独立运行的,因此必须在任务内部或通过等待任务并在任务结束后捕获异常来处理异常。

在生产中,我们可能不知道任务是否有 try-catch。因此,最佳做法是将 await task放在 try-catch 结构中,如下所示:

import asyncio

async def task_1():
    print("Task 1 started")
    await asyncio.sleep(2)
    print("Task 1 finished")
    return "Result 1"

async def main():
    t1 = asyncio.create_task(task_1())
    t1.cancel()
    try:
        await t1
    except asyncio.CancelledError:
        print("Handled cancellation of Task 1")

asyncio.run(main())
# Handled cancellation of Task 1

第 8 层次:异步队列: 更快的生产者-消费者模式

队列通常用于生产者-消费者模式。它充当了生产者和消费者之间的缓冲区,有助于将他们的操作分离开来,并在一个整洁的数据结构中管理通信。

Python 有一个内置的队列实现 queue.Queue,它还提供了一个异步版本 asyncio.Queue

它们都应用了队列的思想。不过,异步版本使产生过程和消耗过程都是异步的,这可以大大提高代码的性能。

例如,下面的代码段采用了异步队列,并用它来处理生产和消费:

import asyncio

async def producer(queue):
    for i in range(3):
        print(f"Producing {i}")
        await queue.put(i)
        await asyncio.sleep(2)  # simulate a delay for a time-consuming process

async def consumer(queue):
    while True:
        item = await queue.get()
        print(f"Consuming {item}")
        queue.task_done()

async def main():
    queue = asyncio.Queue()
    prod = asyncio.create_task(producer(queue))
    cons = asyncio.create_task(consumer(queue))

    await asyncio.gather(prod)
    await queue.join()
    cons.cancel()

asyncio.run(main())
# Producing 0
# Consuming 0
# Producing 1
# Consuming 1
# Producing 2
# Consuming 2

它演示了一个典型的生产者-消费者场景,其中一个例行程序(生产者)生产项目,另一个例行程序(消费者)消费项目,两者同时运行。

现在,让我们深入了解使用异步队列的要点:

  • 使用 queue = asyncio.Queue() 创建队列。
  • 生产者和消费者任务都是通过 asyncio.create_task() 同时创建和启动的。
  • await asyncio.gather(prod) 确保主程序等待生产者完成所有项目的生产。
  • await queue.join() 阻止主函数运行,直到队列中的所有项目都已处理完毕。它确保程序在退出前等待消费者消费完所有生产的项目。
  • 消费者每次处理一个项目时,都会使用 queue.task_done() 向队列发出信号,表明该项目已完全处理完毕。这样,queue.join() 才能最终解除阻塞。
  • 当队列为空且所有任务都完成后,cons.cancel() 会取消消费者任务,停止无限循环。

结论

恭喜你读完了这本内容全面的指南,并不是每个人都能投入 10 多分钟完全读懂并学会一种技术。

通过学习 Python 异步编程的九个层次,您已经深入了解了如何在各种场景中实现非阻塞并发任务–从基本的异步函数到更高级的生产者-消费者模式。

无论您是在刮擦(scraping)数据,还是在构建可扩展的并发任务,您都可以在 Python 中轻松实现。

阅读余下内容
 

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注


京ICP备12002735号