草庐IT

Python 原生协程和 send()

coder 2023-08-13 原文

基于生成器的协程有一个 send() 方法,它允许调用者和被调用者之间进行双向通信,并从调用者恢复生成的生成器协程。这是将生成器变成协程的功能。

虽然新的原生 async/await 协程为异步 I/O 提供了出色的支持,但我看不出如何使用它们获得与 send() 等效的功能。明确禁止在 async 函数中使用 yield,因此 native 协程只能使用 return 语句返回一次。虽然 await 表达式将新值带入协程,但这些值来自被调用者,而不是调用者,并且等待的调用每次都从头开始计算,而不是从它停止的地方计算。

有没有办法从中断处恢复返回的协程并可能发送新值? 我如何模仿 David Beazley 的 Curious Course on Coroutines and Concurrency 中的技术?使用原生协程?

我想到的一般代码模式是这样的

def myCoroutine():
  ...
  while True:
    ...
    ping = yield(pong)
    ...

在调用者中

while True:
  ...
  buzz = myCoroutineGen.send(bizz)
  ...

编辑

我接受了凯文的回答,但我注意到 PEP says

Coroutines are based on generators internally, thus they share the implementation. Similarly to generator objects, coroutines have throw() , send() and close() methods.

...

throw() , send() methods for coroutines are used to push values and raise errors into Future-like objects.

显然原生协程确实有一个send()?如果没有 yield 表达式如何在协程中接收值?

最佳答案

在完成了由 Beazley 开设的同一门(我必须说很棒)协程类(class)后,我问了自己一个非常相同的问题——如何调整代码以与 Python 3.5 中引入的原生协程一起工作?

事实证明,可以通过对代码进行相对较小的更改来完成。我会假设读者熟悉类(class) Material ,并将采取 pyos4.py作为基础的版本 - 第一个支持“系统调用”的 Scheduler 版本。

TIP: A full runnable example can be found in Appendix A at the end.

目标

目标是转以下协程代码:

def foo():
    mytid = yield GetTid()  # a "system call"
    for i in xrange(3):
        print "I'm foo", mytid
        yield  # a "trap"

...进入原生协程并仍然像以前一样使用:

async def foo():
    mytid = await GetTid()  # a "system call"
    for i in range(3):
        print("I'm foo", mytid)
        await ???  # a "trap" (will explain the missing bit later)

我们想在没有 asyncio 的情况下运行它,因为我们已经有了自己的事件循环来驱动整个过程 - 它是 Scheduler 类。

等待对象

原生协程无法立即运行,以下代码会导致错误:

async def foo():
    mytid = await GetTid()
    print("I'm foo", mytid)

sched = Scheduler()
sched.new(foo())
sched.mainloop()
Traceback (most recent call last):
    ...
    mytid = await GetTid()
TypeError: object GetTid can't be used in 'await' expression

PEP 492 explains what kind of objects can be awaited on. One of the options is "an object with an __await__ method returning an iterator".

Just like yield from, if you are familiar with it, await acts as a tunnel between the object awaited on and the outermost code that drives the coroutine (usually an event loop). This is best demonstrated with an example:

class Awaitable:
    def __await__(self):
        value = yield 1
        print("Awaitable received:", value)
        value = yield 2
        print("Awaitable received:", value)
        value = yield 3
        print("Awaitable received:", value)
        return 42


async def foo():
    print("foo start")
    result = await Awaitable()
    print("foo received result:", result)
    print("foo end")

以交互方式驱动 foo() 协程产生以下内容:

>>> f_coro = foo()  # calling foo() returns a coroutine object
>>> f_coro
<coroutine object foo at 0x7fa7f74046d0>
>>> f_coro.send(None)
foo start
1
>>> f_coro.send("one")
Awaitable received: one
2
>>> f_coro.send("two")
Awaitable received: two
3
>>> f_coro.send("three")
Awaitable received: three
foo received result: 42
foo end
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

发送到 f_coro 的任何内容都会向下传送到 Awaitable 实例。同样,无论 Awaitable.__await__() 产生什么,都会冒泡到发送值的最顶层代码。

整个过程对f_coro协程是透明的,不直接参与,看不到上下传递的值。然而,当 Awaitable 的迭代器耗尽时,它的 return 值变成了 await 表达式的结果(在我们的例子中是 42),这就是f_coro 终于恢复了。

请注意,协程中的 await 表达式也可以链接。一个协程可以等待另一个协程,另一个协程等待另一个协程...直到整个链以 yield 结束。

将值发送到协程本身

这些知识如何帮助我们?好吧,在类(class) Material 中,协程可以生成 SystemCall 实例。调度程序理解这些并让系统调用处理请求的操作。

为了让协程将 SystemCall 带到调度程序,SystemCall 实例可以简单地yield 本身,它将是如上一节所述,向上引导到调度程序。

因此,第一个需要的更改是将此逻辑添加到基础 SystemCall 类中:

class SystemCall:
    ...
    def __await__(self):
        yield self

随着 SystemCall 实例变为可等待的,下面的代码现在实际运行了:

async def foo():
    mytid = await GetTid()
    print("I'm foo", mytid)

>>> sched = Scheduler()
>>> sched.new(foo())
>>> sched.mainloop()

输出:

I'm foo None
Task 1 terminated

Great, it does not crash anymore!

However, the coroutine did not receive the task ID, and got None instead. This is because the value set by the system call's handle() method and sent by the Task.run() method:

# in Task.run()
self.target.send(self.sendval)

... 结束于 SystemCall.__await__() 方法。如果我们想把值带入协程,系统调用必须返回它,这样它就成为协程中await表达式的值。

class SystemCall:
    ...
    def __await__(self):
        return (yield self)

使用修改后的 SystemCall 运行相同的代码会产生所需的输出:

I'm foo 1
Task 1 terminated

Running the coroutines concurrently

We still need a way to suspend a coroutine, i.e. to have a system "trap" code. In the course material, this is done with a plain yield inside a coroutine, but an attempt to use a plain await is actually a syntax error:

async def foo():
    mytid = await GetTid()
    for i in range(3):
        print("I'm foo", mytid)
        await  # SyntaxError here

幸运的是,解决方法很简单。由于我们已经有工作系统调用,我们可以添加一个虚拟的无操作系统调用,它的唯一工作是暂停协程并立即重新安排它:

class YieldControl(SystemCall):
    def handle(self):
        self.task.sendval = None   # setting sendval is optional
        self.sched.schedule(self.task)

在任务上设置 sendval 是可选的,因为预计此系统调用不会产生任何有意义的值,但我们选择明确这一点。

我们现在拥有运行多任务操作系统的一切条件!

async def foo():
    mytid = await GetTid()
    for i in range(3):
        print("I'm foo", mytid)
        await YieldControl()


async def bar():
    mytid = await GetTid()
    for i in range(5):
        print("I'm bar", mytid)
        await YieldControl()


sched = Scheduler()
sched.new(foo())
sched.new(bar())
sched.mainloop()

输出:

I'm foo 1
I'm bar 2
I'm foo 1
I'm bar 2
I'm foo 1
I'm bar 2
Task 1 terminated
I'm bar 2
I'm bar 2
Task 2 terminated

Footnotes

The Scheduler code is completely unchanged.

It. Just. Works.

This shows the beauty of the original design where the scheduler and the tasks that run in it are not coupled to each other, and we were able to change the coroutine implementation without the Scheduler knowing about it. Even the Task class that wraps the coroutines did not have to change.

Trampolining is not needed.

In the pyos8.py version of the system, a concept of a trampoline is implemented. It allows the coroutines to delegate a part of their work to another coroutine with the help of the shceduler (the scheduler calls the sub-coroutine on behalf of the parent coroutine and sends the former's result into the parent).

This mechanism is not needed, since await (and its older companion, yield from) already make such chaining possible as explained at the beginning.

Appendix A - a full runnable example (requires Python 3.5+)

example_full.py
from queue import Queue


# ------------------------------------------------------------
#                       === Tasks ===
# ------------------------------------------------------------
class Task:
    taskid = 0
    def __init__(self,target):
        Task.taskid += 1
        self.tid = Task.taskid   # Task ID
        self.target = target        # Target coroutine
        self.sendval = None          # Value to send

    # Run a task until it hits the next yield statement
    def run(self):
        return self.target.send(self.sendval)


# ------------------------------------------------------------
#                      === Scheduler ===
# ------------------------------------------------------------
class Scheduler:
    def __init__(self):
        self.ready = Queue()   
        self.taskmap = {}        

    def new(self,target):
        newtask = Task(target)
        self.taskmap[newtask.tid] = newtask
        self.schedule(newtask)
        return newtask.tid

    def exit(self,task):
        print("Task %d terminated" % task.tid)
        del self.taskmap[task.tid]

    def schedule(self,task):
        self.ready.put(task)

    def mainloop(self):
         while self.taskmap:
            task = self.ready.get()
            try:
                result = task.run()
                if isinstance(result,SystemCall):
                    result.task  = task
                    result.sched = self
                    result.handle()
                    continue
            except StopIteration:
                self.exit(task)
                continue
            self.schedule(task)


# ------------------------------------------------------------
#                   === System Calls ===
# ------------------------------------------------------------
class SystemCall:
    def handle(self):
        pass

    def __await__(self):
        return (yield self)


# Return a task's ID number
class GetTid(SystemCall):
    def handle(self):
        self.task.sendval = self.task.tid
        self.sched.schedule(self.task)


class YieldControl(SystemCall):
    def handle(self):
        self.task.sendval = None   # setting sendval is optional
        self.sched.schedule(self.task)


# ------------------------------------------------------------
#                      === Example ===
# ------------------------------------------------------------
if __name__ == '__main__':
    async def foo():
        mytid = await GetTid()
        for i in range(3):
            print("I'm foo", mytid)
            await YieldControl()


    async def bar():
        mytid = await GetTid()
        for i in range(5):
            print("I'm bar", mytid)
            await YieldControl()

    sched = Scheduler()
    sched.new(foo())
    sched.new(bar())
    sched.mainloop()

关于Python 原生协程和 send(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34469060/

有关Python 原生协程和 send()的更多相关文章

  1. ruby - 为什么我可以在 Ruby 中使用 Object#send 访问私有(private)/ protected 方法? - 2

    类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc

  2. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

  3. ruby - 为什么人们使用 `Module.send(:prepend, …)` ? - 2

    我正在学习如何在我的Ruby代码中使用Module.prepend而不是alias_method_chain,我注意到有些人使用send调用它(example):ActionView::TemplateRenderer.send(:prepend,ActionViewTemplateRendererWithCurrentTemplate)而其他人直接调用它(example):ActionView::TemplateRenderer.prepend(ActionViewTemplateRendererWithCurrentTemplate)而且,虽然我还没有看到任何人使用这种风格,但我从

  4. Python 相当于 Perl/Ruby ||= - 2

    这个问题在这里已经有了答案:关闭10年前。PossibleDuplicate:Pythonconditionalassignmentoperator对于这样一个简单的问题表示歉意,但是谷歌搜索||=并不是很有帮助;)Python中是否有与Ruby和Perl中的||=语句等效的语句?例如:foo="hey"foo||="what"#assignfooifit'sundefined#fooisstill"hey"bar||="yeah"#baris"yeah"另外,类似这样的东西的通用术语是什么?条件分配是我的第一个猜测,但Wikipediapage跟我想的不太一样。

  5. java - 什么相当于 ruby​​ 的 rack 或 python 的 Java wsgi? - 2

    什么是ruby​​的rack或python的Java的wsgi?还有一个路由库。 最佳答案 来自Python标准PEP333:Bycontrast,althoughJavahasjustasmanywebapplicationframeworksavailable,Java's"servlet"APImakesitpossibleforapplicationswrittenwithanyJavawebapplicationframeworktoruninanywebserverthatsupportstheservletAPI.ht

  6. 华为OD机试用Python实现 -【明明的随机数】 2023Q1A - 2

    华为OD机试题本篇题目:明明的随机数题目输入描述输出描述:示例1输入输出说明代码编写思路最近更新的博客华为od2023|什么是华为od,od薪资待遇,od机试题清单华为OD机试真题大全,用Python解华为机试题|机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为o

  7. ruby - 使用 `+=` 和 `send` 方法 - 2

    如何将send与+=一起使用?a=20;a.send"+=",10undefinedmethod`+='for20:Fixnuma=20;a+=10=>30 最佳答案 恐怕你不能。+=不是方法,而是语法糖。参见http://www.ruby-doc.org/docs/ProgrammingRuby/html/tut_expressions.html它说Incommonwithmanyotherlanguages,Rubyhasasyntacticshortcut:a=a+2maybewrittenasa+=2.你能做的最好的事情是:

  8. python - 如何读取 MIDI 文件、更改其乐器并将其写回? - 2

    我想解析一个已经存在的.mid文件,改变它的乐器,例如从“acousticgrandpiano”到“violin”,然后将它保存回去或作为另一个.mid文件。根据我在文档中看到的内容,该乐器通过program_change或patch_change指令进行了更改,但我找不到任何在已经存在的MIDI文件中执行此操作的库.他们似乎都只支持从头开始创建的MIDI文件。 最佳答案 MIDIpackage会为您完成此操作,但具体方法取决于midi文件的原始内容。一个MIDI文件由一个或多个音轨组成,每个音轨是十六个channel中任何一个上的

  9. 「Python|Selenium|场景案例」如何定位iframe中的元素? - 2

    本文主要介绍在使用Selenium进行自动化测试或者任务时,对于使用了iframe的页面,如何定位iframe中的元素文章目录场景描述解决方案具体代码场景描述当我们在使用Selenium进行自动化测试的时候,可能会遇到一些界面或者窗体是使用HTML的iframe标签进行承载的。对于iframe中的标签,如果直接查找是无法找到的,会抛出没有找到元素的异常。比如近在咫尺的例子就是,CSDN的登录窗体就是使用的iframe,大家可以尝试通过F12开发者模式查看到的tag_name,class_name,id或者xpath来定位中的页面元素,会抛出NoSuchElementException异常。解决

  10. python ffmpeg 使用 pyav 转换 一组图像 到 视频 - 2

    2022/8/4更新支持加入水印水印必须包含透明图像,并且水印图像大小要等于原图像的大小pythonconvert_image_to_video.py-f30-mwatermark.pngim_dirout.mkv2022/6/21更新让命令行参数更加易用新的命令行使用方法pythonconvert_image_to_video.py-f30im_dirout.mkvFFMPEG命令行转换一组JPG图像到视频时,是将这组图像视为MJPG流。我需要转换一组PNG图像到视频,FFMPEG就不认了。pyav内置了ffmpeg库,不需要系统带有ffmpeg工具因此我使用ffmpeg的python包装p

随机推荐