草庐IT

python异步编程小抄

小餐包 2023-10-11 原文

基础

异步调用的原理,是在单个线程中通过切换任务(就像单线程的函数切换,负担很小,性能很好)来达到并发的效果。相较于线程这种比较重的并发方式,异步调用可以大大提升I/O密集型任务的执行效率,达到非常高的并发量。python中异步调用的最小单位是协程,随着异步编程在python3各个版本中不断的迭代,使用异步编程变得越来越简单,因此,我们有必要好好掌握python中异步编程相关的知识。

两个关键字

async:用来声明一个协程函数(async def),与普通函数不同的是,调用协程函数并不会返回其运行的结果,而是返回一个协程对象(coroutine)。协程对象需要注册到事件循环(event loop)中才能被调度执行到。

await:用来异步等待一个协程的返回,只能在协程函数里使用await时意味着将函数控制权交还给event loop。举个例子,当我们在g()函数内部遇到await f()时,就会将暂时挂起g()的执行直到 f()返回,与此同时,将CPU的执行权让出给event loop中的其他函数继续执行。

awaitable:就像for关键字用于iterable对象, await关键字用于awaitable对象。最常见的两个awaitable对象就是原生的协程对象以及使用asyncio.create_task()方法创建的asyncio.Task对象。值得注意的是,你并不总是需要await一个Task如果你不需要取消或者等待协程运行的结果的话。

例子

Fluent Python 2nd Edition(神作,忍不住再次安利)这本书的示例代码为例:

import asyncio
import socket
from keyword import kwlist

MAX_KEYWORD_LEN = 4 


async def probe(domain: str) -> tuple[str, bool]: 
    loop = asyncio.get_running_loop() 
    try:
        await loop.getaddrinfo(domain, None) 
    except socket.gaierror:
        return (domain, False)
    return (domain, True)

async def main() -> None: 
     names = (kw for kw in kwlist if len(kw) <= MAX_KEYWORD_LEN) 
     domains = (f'{name}.dev'.lower() for name in names) 
     coros = [probe(domain) for domain in domains] 
     for coro in asyncio.as_completed(coros): 
     domain, found = await coro 
     mark = '+' if found else ' '
     print(f'{mark} {domain}')
    
    
if __name__ == '__main__':
    asyncio.run(main()) 

作者是这么描述协程的工作流程的,注意加粗部分:

Using the syntax await loop.getaddrinfo(…) avoids blocking because await suspends the current coroutine object—for example, probe('if.dev'). A new coroutine object is created, getaddrinfo('if.dev', None), it starts the low-level addrinfo query and yields control back to the event loop, which can drive other pending coroutine objects, such as probe('or.dev'). When the event loop gets a response for the getaddrinfo('if.dev', None) query, that specific coroutine object resumes and returns control back to the probe('if.dev')—which was suspended at await—and can now handle a possible exception and return the result tuple.

这里注意一下英文中SuspendPending的差异:

Suspend:一个事情已经开始了, 不过现在要停了(可能是暂时地)。 Classes have been suspended for the holidays.

Pending:一个事情还没开始, 因为还在等其他东西。 This project is still pending for approval.

常用对象

Future:

Future对象是用来模仿concurrent.futures包中的Future对象的,除了一小部分API有差异外,他们的API基本上兼容。Future对象代表一个任务的结果,注意这里的结果可以是未执行的结果或者时一个执行异常。源代码中是这样描述这个对象的:

class Future(object):
    """
    This class is *almost* compatible with concurrent.futures.Future.
    
        Differences:
    
        - result() and exception() do not take a timeout argument and
          raise an exception when the future isn't done yet.
    
        - Callbacks registered with add_done_callback() are always called
          via the event loop's call_soon_threadsafe().
    
        - This class is not compatible with the wait() and as_completed()
          methods in the concurrent.futures package.
    """

Task:

一个和Future对象类似的协程对象,非线程安全,查看源代码可以看到TaskFuture的子类,因此 Future对象不一定是一个Task对象, 但Task对象一定是个Future对象。

class Task(Future):
    """ A coroutine wrapped in a Future. """

Task对象在创建时就会注册到事件循环中。

EventLoop:

管理和分配不同Task的执行,Task需要注册到EventLoo以后才能被调度执行到。你可以把它看成是某个监控着协程空闲、可执行等运行状态,并且能根据某个协程等待的事件变为可执行时唤醒这些空闲的协程的While True的循环Loop是可插拔(替换)的,也就是说,你可以自己实现一个事件循环来代替的默认的事件循环,比如Linux系统上非常著名的uvloop,使用下面代码即可替换EventLoop实现:

import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

uvloop是基于libuv库(nodejs)使用Cython编写的,拥有比自带的event loop更高的性能,遗憾的是你只能在*nixpython3.5+的环境中使用它。

常用方法

asyncio.run()

在python3.7中引入的新方法,会自动创建event loop并且执行run_until_complete,同时在执行结束时会自动关闭event loop,在引入该方法之前,你可能需要使用如下代码来执行一个简单的协程:

loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

如果你需要对未完成的任务执行cancel()方法,那么还需要在另外写一些代码来处理它们。而asyncio.run()方法将这些代码范式进行了封装,使得调用协程变得不需要再写这些模板式的代码了。

asyncio.gather(*aws, loop=None, return_exceptions=False)

这个方法将协程(准确的说是awaitable对象,因此也可以是future对象)集合统一放到一个future对象里面,并且将协程的结果统一在一个列表中返回。如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合而成的列表。结果值的顺序与 aws 中可等待对象的顺序一致。

asyncio.ensure_futureasyncio.create_task

asyncio.ensure_future虽然名字里带了future,但通常它返回的对象是一个Task对象(除非传入的obj对象本身就是一个Future对象),这是一个有点反直觉且经常容易混淆的点,看下面的例子:

import asyncio


async def foo():
    print("before foo await")
    await asyncio.sleep(1)
    print("after foo await")
    return "foo"


async def bar():
    print("before bar await")
    await asyncio.sleep(1)
    print("after bar await")
    return "bar"


async def popo():
    print("before popo await")
    await asyncio.sleep(1)
    print("after popo await")
    return "popo"


async def set_after(fut, delay, value):
    # Sleep for *delay* seconds.
    await asyncio.sleep(delay)
    # Set *value* as a result of *fut* Future.
    fut.set_result(value)


async def main():
    print("running main")
    task1 = asyncio.create_task(foo())
    task2 = asyncio.create_task(bar())
    fut1 = asyncio.ensure_future(popo())
    loop = asyncio.get_running_loop()
    fut2 = loop.create_future()
    loop.create_task(
        set_after(fut2, 1, '... world'))
    print(isinstance(task1, asyncio.Future))
    print(isinstance(fut1, asyncio.Task))
    print(isinstance(fut2, asyncio.Task))
    print(isinstance(fut2, asyncio.Future))
    await task1
    await task2
    await fut1
    await fut2
    print("exiting main")


asyncio.run(main())

输出如下, 注意第三行和第四行的输出:

running main
True
True
False
True
before foo await
before bar await
before popo await
after foo await
after popo await
after bar await
exiting main

因此,python 3.7 及之后版本都推荐使用asyncio.create_task方法,这个方法限制了传入的对象必须是一个协程对象。

asyncio.get_running_loopasyncio.get_event_loop

asyncio.get_running_loop函数是在Python 3.7中添加,在协程内部使用以便获取运行着的事件循环的函数,当事件循环不存在时,这个函数可能会返回RuntimeError。它的实现相较于asyncio.get_event_loop (可能会按需开始一个新的事件循环)更加简单和快速.

其他常用类

Asyncio.Queue

对于并发编程,经常需要使用队列来将负载分配到多个任务上,比如经典的生产者-消费者模式,asyncio包同样提了Queue对象来满足这类需求,参考官方的代码示例:

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()

        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)

        # Notify the queue that the "work item" has been processed.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())

其中的几个方法需要单独说明一下:

put_nowait(item): 将item放到队列中,非阻塞性操作,当队列满时,将抛出QueueFull的异常;

put(item):将item放到队列中,阻塞性操作, 当队列满时,将一直等待直到有空位;

get_nowait(): 从队列中获取一个item,非阻塞性操作,当队列为空时,将抛出QueueEmpty的异常。

get(): 从队列中获取一个item,阻塞性操作,当队列为空时,将一直等待直到有item可用;

task_done(): 该方法通常由消费者处理,用来表示从队列获取的任务已经完成。对于每一个通过get()从队列获取的任务,调用该方法会告知队列任务处理完成;

join(): 阻塞直到队列中的所有item都已经被处理过。每个item添加到队列中时,未完成任务的计数就会增加;当这些任务调用task_done()时,这个计数就会减少;当未完成任务计数为0时,join()将不再阻塞。

Asyncio.Semaphore

异步编程处理IO密集型的任务时具有很好的性能,但有时我们也会希望限制一下并发量,这时候就可以使用信号量来达到这个目的。基本用法可以参考官方的代码示例:

sem = asyncio.Semaphore(10)

# ... later
async with sem:
    # work with shared resource

有关python异步编程小抄的更多相关文章

  1. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

  2. ruby-on-rails - 如何在 ruby​​ 中使用两个参数异步运行 exe? - 2

    exe应该在我打开页面时运行。异步进程需要运行。有什么方法可以在ruby​​中使用两个参数异步运行exe吗?我已经尝试过ruby​​命令-system()、exec()但它正在等待过程完成。我需要用参数启动exe,无需等待进程完成是否有任何ruby​​gems会支持我的问题? 最佳答案 您可以使用Process.spawn和Process.wait2:pid=Process.spawn'your.exe','--option'#Later...pid,status=Process.wait2pid您的程序将作为解释器的子进程执行。除

  3. ruby - 寻找通过阅读代码确定编程语言的ruby gem? - 2

    几个月前,我读了一篇关于ruby​​gem的博客文章,它可以通过阅读代码本身来确定编程语言。对于我的生活,我不记得博客或gem的名称。谷歌搜索“ruby编程语言猜测”及其变体也无济于事。有人碰巧知道相关gem的名称吗? 最佳答案 是这个吗:http://github.com/chrislo/sourceclassifier/tree/master 关于ruby-寻找通过阅读代码确定编程语言的rubygem?,我们在StackOverflow上找到一个类似的问题:

  4. Python 相当于 Perl/Ruby ||= - 2

    这个问题在这里已经有了答案:关闭10年前。PossibleDuplicate:Pythonconditionalassignmentoperator对于这样一个简单的问题表示歉意,但是谷歌搜索||=并不是很有帮助;)Python中是否有与Ruby和Perl中的||=语句等效的语句?例如:foo="hey"foo||="what"#assignfooifit'sundefined#fooisstill"hey"bar||="yeah"#baris"yeah"另外,类似这样的东西的通用术语是什么?条件分配是我的第一个猜测,但Wikipediapage跟我想的不太一样。

  5. java - 什么相当于 ruby​​ 的 rack 或 python 的 Java wsgi? - 2

    什么是ruby​​的rack或python的Java的wsgi?还有一个路由库。 最佳答案 来自Python标准PEP333:Bycontrast,althoughJavahasjustasmanywebapplicationframeworksavailable,Java's"servlet"APImakesitpossibleforapplicationswrittenwithanyJavawebapplicationframeworktoruninanywebserverthatsupportstheservletAPI.ht

  6. 华为OD机试用Python实现 -【明明的随机数】 2023Q1A - 2

    华为OD机试题本篇题目:明明的随机数题目输入描述输出描述:示例1输入输出说明代码编写思路最近更新的博客华为od2023|什么是华为od,od薪资待遇,od机试题清单华为OD机试真题大全,用Python解华为机试题|机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为o

  7. python - 如何读取 MIDI 文件、更改其乐器并将其写回? - 2

    我想解析一个已经存在的.mid文件,改变它的乐器,例如从“acousticgrandpiano”到“violin”,然后将它保存回去或作为另一个.mid文件。根据我在文档中看到的内容,该乐器通过program_change或patch_change指令进行了更改,但我找不到任何在已经存在的MIDI文件中执行此操作的库.他们似乎都只支持从头开始创建的MIDI文件。 最佳答案 MIDIpackage会为您完成此操作,但具体方法取决于midi文件的原始内容。一个MIDI文件由一个或多个音轨组成,每个音轨是十六个channel中任何一个上的

  8. 网络编程套接字 - 2

    网络编程套接字网络编程基础知识理解源`IP`地址和目的`IP`地址理解源MAC地址和目的MAC地址认识端口号理解端口号和进程ID理解源端口号和目的端口号认识`TCP`协议认识`UDP`协议网络字节序socket编程接口`sockaddr``UDP`网络程序服务器端代码逻辑:需要用到的接口服务器端代码`udp`客户端代码逻辑`udp`客户端代码`TCP`网络程序服务器代码逻辑多个版本服务器单进程版本多进程版本多线程版本线程池版本服务器端代码客户端代码逻辑客户端代码TCP协议通讯流程TCP协议的客户端/服务器程序流程三次握手(建立连接)数据传输四次挥手(断开连接)TCP和UDP对比网络编程基础知识

  9. 「Python|Selenium|场景案例」如何定位iframe中的元素? - 2

    本文主要介绍在使用Selenium进行自动化测试或者任务时,对于使用了iframe的页面,如何定位iframe中的元素文章目录场景描述解决方案具体代码场景描述当我们在使用Selenium进行自动化测试的时候,可能会遇到一些界面或者窗体是使用HTML的iframe标签进行承载的。对于iframe中的标签,如果直接查找是无法找到的,会抛出没有找到元素的异常。比如近在咫尺的例子就是,CSDN的登录窗体就是使用的iframe,大家可以尝试通过F12开发者模式查看到的tag_name,class_name,id或者xpath来定位中的页面元素,会抛出NoSuchElementException异常。解决

  10. python ffmpeg 使用 pyav 转换 一组图像 到 视频 - 2

    2022/8/4更新支持加入水印水印必须包含透明图像,并且水印图像大小要等于原图像的大小pythonconvert_image_to_video.py-f30-mwatermark.pngim_dirout.mkv2022/6/21更新让命令行参数更加易用新的命令行使用方法pythonconvert_image_to_video.py-f30im_dirout.mkvFFMPEG命令行转换一组JPG图像到视频时,是将这组图像视为MJPG流。我需要转换一组PNG图像到视频,FFMPEG就不认了。pyav内置了ffmpeg库,不需要系统带有ffmpeg工具因此我使用ffmpeg的python包装p

随机推荐