草庐IT

python - 你如何判断 sys.stdin.readline() 是否会阻塞?

coder 2023-08-15 原文

如何确定对 sys.stdin.readline() (或更一般地说,任何基于文件描述符的文件对象上的 readline() )的调用是否会阻塞?

当我在 python 中编写基于行的文本过滤程序时会出现这种情况;
也就是说,程序反复从输入中读取一行文本,可能对其进行转换,然后将其写入输出。

我想实现一个合理的输出缓冲策略。
我的标准是:

  • 处理数百万时应该是高效的
    批量行——主要是缓冲输出,偶尔刷新。
  • 在保持缓冲输出时,它不应该阻塞输入。

  • 因此,无缓冲输出是不好的,因为它违反了 (1)(对操作系统的写入过多)。
    并且行缓冲输出不好,因为它仍然违反(1)
    (将一百万行中的每一行的输出都刷新到操作系统是没有意义的)。
    默认缓冲输出也不好,因为它违反了 (2)(如果输出到文件或管道,它将不适本地保留输出)。

    我认为,在大多数情况下,一个好的解决方案是:
    “每当(其缓冲区已满或)sys.stdin.readline() 即将阻塞时,刷新 sys.stdout”。
    可以实现吗?

    (请注意,我并不声称此策略适用于所有情况。例如,
    在程序受 CPU 限制的情况下,这可能并不理想;在这种情况下,这可能是明智的
    更频繁地刷新,以避免在进行长时间计算时保留输出。)

    为了确定性,假设我正在 python 中实现 unix 的“cat -n”程序。

    (实际上,“cat -n”比一次一行更聪明;也就是说,它知道如何
    在读取整行之前读取和写入行的一部分;
    但是,对于此示例,无论如何我将一次一行地实现它。)

    行缓冲实现

    (表现良好,但违反了标准(1),即由于冲洗过多而慢得不合理):
    #!/usr/bin/python
    # cat-n.linebuffered.py
    import sys
    num_lines_read = 0
    while True:
      line = sys.stdin.readline()
      if line == '': break
      num_lines_read += 1
      print("%d: %s" % (num_lines_read, line))
      sys.stdout.flush()
    

    默认缓冲实现

    (快速但违反标准(2),即不友好的输出扣留)
    #!/usr/bin/python
    # cat-n.defaultbuffered.py
    import sys
    num_lines_read = 0
    while True:
      line = sys.stdin.readline()
      if line == '': break
      num_lines_read += 1
      print("%d: %s" % (num_lines_read, line))
    

    期望的实现:
    #!/usr/bin/python
    num_lines_read = 0
    while True:
      if sys_stdin_readline_is_about_to_block():  # <--- How do I implement this??
        sys.stdout.flush()
      line = sys.stdin.readline()
      if line == '': break
      num_lines_read += 1
      print("%d: %s" % (num_lines_read, line))
    

    所以问题是:是否可以实现 sys_stdin_readline_is_about_to_block()

    我想要一个在 python2 和 python3 中都有效的答案。
    我已经研究了以下每一种技术,但到目前为止还没有任何结果。
  • 使用 select([sys.stdin],[],[],0) 查看从 sys.stdin 读取是否会阻塞。 (当 sys.stdin 是缓冲文件对象时,这不起作用,至少有一个可能有两个原因:(1)如果部分行准备好从底层输入管道读取,它会错误地说“不会阻塞”, (2) 如果 sys.stdin 的缓冲区包含一个完整的输入行,但底层管道还没有准备好进行额外的读取,它会错误地说“将阻塞”......我认为)。
  • 非阻塞 io,使用 os.fdopen(sys.stdin.fileno(), 'r')fcntlO_NONBLOCK(我无法让它在任何 python 版本中与 readline() 一起使用:
    在python2.7中,只要有部分行进来,它就会丢失输入;
    在python3中,似乎无法区分“会阻塞”
    和输入结束。 ??)
  • asyncio(我不清楚这在 python2 中有什么可用;我认为它不适用于 sys.stdin;但是,我仍然对仅在从从subprocess.Popen())。
  • 创建一个线程来做 readline() 循环并将每一行传递给主
    通过 queue.Queue 编程;然后主程序可以轮询队列之前
    从中读取每一行,每当它看到它即将阻塞时,首先刷新标准输出。
    (我试过这个,实际上让它工作了,见下文,但它非常慢,比行缓冲慢得多。)

  • 线程实现:

    请注意,这并没有严格回答“如何判断 sys.stdin.readline() 是否会阻塞”的问题,但它无论如何都设法实现了所需的缓冲策略。不过太慢了。
    #!/usr/bin/python
    # cat-n.threaded.py
    import queue
    import sys
    import threading
    def iter_with_abouttoblock_cb(callable, sentinel, abouttoblock_cb, qsize=100):
      # child will send each item through q to parent.
      q = queue.Queue(qsize)
      def child_fun():
        for item in iter(callable, sentinel):
          q.put(item)
        q.put(sentinel)
      child = threading.Thread(target=child_fun)
      # The child thread normally runs until it sees the sentinel,
      # but we mark it daemon so that it won't prevent the parent
      # from exiting prematurely if it wants.
      child.daemon = True
      child.start()
      while True:
        try:
          item = q.get(block=False)
        except queue.Empty:
          # q is empty; call abouttoblock_cb before blocking
          abouttoblock_cb()
          item = q.get(block=True)
        if item == sentinel:
          break  # do *not* yield sentinel
        yield item
      child.join()
    
    num_lines_read = 0
    for line in iter_with_abouttoblock_cb(sys.stdin.readline,
                                          sentinel='',
                                          abouttoblock_cb=sys.stdout.flush):
      num_lines_read += 1
      sys.stdout.write("%d: %s" % (num_lines_read, line))
    

    验证缓冲行为:

    以下命令(在 linux 上的 bash 中)显示了预期的缓冲行为:“defaultbuffered”缓冲过于激进,而“linebuffered”和“threaded”缓冲恰到好处。

    (请注意,管道末尾的 | cat 是默认情况下制作 python 块缓冲区而不是行缓冲区。)
    for which in defaultbuffered linebuffered threaded; do
      for python in python2.7 python3.5; do
        echo "$python cat-n.$which.py:"
          (echo z; echo -n a; sleep 1; echo b; sleep 1; echo -n c; sleep 1; echo d; echo x; echo y; echo z; sleep 1; echo -n e; sleep 1; echo f) | $python cat-n.$which.py | cat
      done
    done
    

    输出:
    python2.7 cat-n.defaultbuffered.py:
    [... pauses 5 seconds here. Bad! ...]
    1: z
    2: ab
    3: cd
    4: x
    5: y
    6: z
    7: ef
    python3.5 cat-n.defaultbuffered.py:
    [same]
    python2.7 cat-n.linebuffered.py:
    1: z
    [... pauses 1 second here, as expected ...]
    2: ab
    [... pauses 2 seconds here, as expected ...]
    3: cd
    4: x
    5: y
    6: z
    [... pauses 2 seconds here, as expected ...]
    6: ef
    python3.5 cat-n.linebuffered.py:
    [same]
    python2.7 cat-n.threaded.py:
    [same]
    python3.5 cat-n.threaded.py:
    [same]
    

    时间:

    (在 Linux 上的 bash 中):
    for which in defaultbuffered linebuffered threaded; do
      for python in python2.7 python3.5; do
        echo -n "$python cat-n.$which.py:  "
          timings=$(time (yes 01234567890123456789012345678901234567890123456789012345678901234567890123456789 | head -1000000 | $python cat-n.$which.py >| /tmp/REMOVE_ME) 2>&1)
          echo $timings
      done
    done
    /bin/rm /tmp/REMOVE_ME
    

    输出:
    python2.7 cat-n.defaultbuffered.py:  real 0m1.490s user 0m1.191s sys 0m0.386s
    python3.5 cat-n.defaultbuffered.py:  real 0m1.633s user 0m1.007s sys 0m0.311s
    python2.7 cat-n.linebuffered.py:  real 0m5.248s user 0m2.198s sys 0m2.704s
    python3.5 cat-n.linebuffered.py:  real 0m6.462s user 0m3.038s sys 0m3.224s
    python2.7 cat-n.threaded.py:  real 0m25.097s user 0m18.392s sys 0m16.483s
    python3.5 cat-n.threaded.py:  real 0m12.655s user 0m11.722s sys 0m1.540s
    

    重申一下,我想要一个在保持缓冲输出时从不阻塞的解决方案
    (“linebuffered”和“threaded”在这方面都很好),
    这也很快:也就是说,速度与“默认缓冲”相当。

    最佳答案

    你当然可以用 select : 这就是它的用途,它的性能对于少量的文件描述符来说是好的。您必须自己实现行缓冲/中断,以便您可以检测在缓冲(结果是)部分行后是否有更多可用输入。

    你可以自己做所有的缓冲(这是合理的,因为 select 在文件描述符级别操作),或者你可以设置 stdin非阻塞并使用 file.read()BufferedReader.read() (取决于您的 Python 版本)使用任何可用的东西。如果您的输入可能是 Internet 套接字,则无论缓冲如何,您都必须使用非阻塞输入,因为 select 的常见实现可以虚假地指示来自套接字的可读数据。 (在这种情况下,Python 2 版本使用 IOError 引发 EAGAIN ;Python 3 版本返回 None 。)

    ( os.fdopen 在这里没有帮助,因为它不会为 fcntl 使用创建新的文件描述符。在某些系统上,您可以使用 /dev/stdin 打开 O_NONBLOCK 。)

    基于默认(缓冲)的 Python 2 实现 file.read() :

    import sys,os,select,fcntl,errno
    
    fcntl.fcntl(sys.stdin.fileno(),fcntl.F_SETFL,os.O_NONBLOCK)
    
    rfs=[sys.stdin.fileno()]
    xfs=rfs+[sys.stdout.fileno()]
    
    buf=""
    lnum=0
    timeout=None
    rd=True
    while rd:
      rl,_,xl=select.select(rfs,(),xfs,timeout)
      if xl: raise IOError          # "exception" occurred (TCP OOB data?)
      if rl:
        try: rd=sys.stdin.read()    # read whatever we have
        except IOError as e:        # spurious readiness?
          if e.errno!=errno.EAGAIN: raise # die on other errors
        else: buf+=rd
        nl0=0                       # previous newline
        while True:
          nl=buf.find('\n',nl0)
          if nl<0:
            buf=buf[nl0:]           # hold partial line for "processing"
            break
          lnum+=1
          print "%d: %s"%(lnum,buf[nl0:nl])
          timeout=0
          nl0=nl+1
      else:                         # no input yet
        sys.stdout.flush()
        timeout=None
    
    if buf: sys.stdout.write("%d: %s"%(lnum+1,buf)) # write any partial last line
    

    仅用于 cat -n ,我们可以在获得它们后立即写出部分行,但这会保留它们以表示一次处理整行。

    在我的(不起眼的)机器上,你的 yes测试需要“真正的 0m2.454s 用户 0m2.144s 系统 0m0.504s”。

    关于python - 你如何判断 sys.stdin.readline() 是否会阻塞?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52893192/

    有关python - 你如何判断 sys.stdin.readline() 是否会阻塞?的更多相关文章

    1. ruby - 如何使用 Nokogiri 的 xpath 和 at_xpath 方法 - 2

      我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div

    2. ruby - 如何从 ruby​​ 中的字符串运行任意对象方法? - 2

      总的来说,我对ruby​​还比较陌生,我正在为我正在创建的对象编写一些rspec测试用例。许多测试用例都非常基础,我只是想确保正确填充和返回值。我想知道是否有办法使用循环结构来执行此操作。不必为我要测试的每个方法都设置一个assertEquals。例如:describeitem,"TestingtheItem"doit"willhaveanullvaluetostart"doitem=Item.new#HereIcoulddotheitem.name.shouldbe_nil#thenIcoulddoitem.category.shouldbe_nilendend但我想要一些方法来使用

    3. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

      关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

    4. ruby-on-rails - 如何验证 update_all 是否实际在 Rails 中更新 - 2

      给定这段代码defcreate@upgrades=User.update_all(["role=?","upgraded"],:id=>params[:upgrade])redirect_toadmin_upgrades_path,:notice=>"Successfullyupgradeduser."end我如何在该操作中实际验证它们是否已保存或未重定向到适当的页面和消息? 最佳答案 在Rails3中,update_all不返回任何有意义的信息,除了已更新的记录数(这可能取决于您的DBMS是否返回该信息)。http://ar.ru

    5. ruby-on-rails - 'compass watch' 是如何工作的/它是如何与 rails 一起使用的 - 2

      我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t

    6. ruby - 如何将脚本文件的末尾读取为数据文件(Perl 或任何其他语言) - 2

      我正在寻找执行以下操作的正确语法(在Perl、Shell或Ruby中):#variabletoaccessthedatalinesappendedasafileEND_OF_SCRIPT_MARKERrawdatastartshereanditcontinues. 最佳答案 Perl用__DATA__做这个:#!/usr/bin/perlusestrict;usewarnings;while(){print;}__DATA__Texttoprintgoeshere 关于ruby-如何将脚

    7. ruby - 如何指定 Rack 处理程序 - 2

      Rackup通过Rack的默认处理程序成功运行任何Rack应用程序。例如:classRackAppdefcall(environment)['200',{'Content-Type'=>'text/html'},["Helloworld"]]endendrunRackApp.new但是当最后一行更改为使用Rack的内置CGI处理程序时,rackup给出“NoMethodErrorat/undefinedmethod`call'fornil:NilClass”:Rack::Handler::CGI.runRackApp.newRack的其他内置处理程序也提出了同样的反对意见。例如Rack

    8. ruby - 如何每月在 Heroku 运行一次 Scheduler 插件? - 2

      在选择我想要运行操作的频率时,唯一的选项是“每天”、“每小时”和“每10分钟”。谢谢!我想为我的Rails3.1应用程序运行调度程序。 最佳答案 这不是一个优雅的解决方案,但您可以安排它每天运行,并在实际开始工作之前检查日期是否为当月的第一天。 关于ruby-如何每月在Heroku运行一次Scheduler插件?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/8692687/

    9. ruby-on-rails - 如何从 format.xml 中删除 <hash></hash> - 2

      我有一个对象has_many应呈现为xml的子对象。这不是问题。我的问题是我创建了一个Hash包含此数据,就像解析器需要它一样。但是rails自动将整个文件包含在.........我需要摆脱type="array"和我该如何处理?我没有在文档中找到任何内容。 最佳答案 我遇到了同样的问题;这是我的XML:我在用这个:entries.to_xml将散列数据转换为XML,但这会将条目的数据包装到中所以我修改了:entries.to_xml(root:"Contacts")但这仍然将转换后的XML包装在“联系人”中,将我的XML代码修改为

    10. ruby - 如何使用文字标量样式在 YAML 中转储字符串? - 2

      我有一大串格式化数据(例如JSON),我想使用Psychinruby​​同时保留格式转储到YAML。基本上,我希望JSON使用literalstyle出现在YAML中:---json:|{"page":1,"results":["item","another"],"total_pages":0}但是,当我使用YAML.dump时,它不使用文字样式。我得到这样的东西:---json:!"{\n\"page\":1,\n\"results\":[\n\"item\",\"another\"\n],\n\"total_pages\":0\n}\n"我如何告诉Psych以想要的样式转储标量?解

    随机推荐