草庐IT

Python:使用额外的输入和输出文件将数据流输入和解析外部程序

coder 2023-08-13 原文

问题: 我有一个设计不佳的 Fortran 程序(我无法更改它,我坚持使用它)它从标准输入和其他输入文件获取文本输入,并将文本输出结果写入标准输出和其他输出文件。 input 和 out 的大小相当大,我想避免写入硬盘(运行缓慢)。我编写了一个函数来遍历多个输入文件的行,而且我还有用于多个输出的解析器。我真的不知道程序是先读取所有输入然后开始输出,还是在读取输入时开始输出。

目标: 拥有一个功能,可以向外部程序提供所需的内容,并在输出来自程序时解析输出,而无需将数据写入硬盘驱动器上的文本文件。

研究: 使用文件的简单方法是:

from subprocess import PIPE, Popen

def execute_simple(cmd, stdin_iter, stdout_parser, input_files, output_files):

    for filename, file_iter in input_files.iteritems():
        with open(filename ,'w') as f:
            for line in file_iter:
                f.write(line + '\n')


    p_sub = Popen(
        shlex.split(cmd),
        stdin = PIPE,
        stdout = open('stdout.txt', 'w'),
        stderr = open('stderr.txt', 'w'),
        bufsize=1
    )
    for line in stdin_iter:
        p_sub.stdin.write(line + '\n')

    p_sub.stdin.close()
    p_sub.wait()

    data = {}
    for filename, parse_func in output_files.iteritems():
        # The stdout.txt and stderr.txt is included here
        with open(filename,'r') as f:
            data[filename] = parse_func(
                    iter(f.readline, b'')
            )
    return data

我已经尝试过 subprocess模块一起执行外部程序。额外的输入/输出文件由命名管道和 multiprocessing 处理。 .我想用迭代器(它返回输入行)来提供标准输入,将标准错误保存在列表中,并解析来自外部程序的标准输出。输入和输出可能非常大,因此使用 communicate 是不可行的。

我有一个格式解析器:

def parser(iterator):
    for line in iterator:
        # Do something
        if condition:
            break
    some_other_function(iterator)
    return data

我看了这个solution使用 select 选择适当的流,但是我不知道如何让它与我的 stdout 解析器一起工作以及如何提供 stdin。

我也看asyncio模块,但正如我所见,我在解析 stout 时也会遇到同样的问题。

最佳答案

您应该对 Fortran 程序的所有输入和输出使用命名管道,以避免写入磁盘。然后,在您的消费者中,您可以使用线程从程序的每个输出源读取信息并将信息添加到队列以进行有序处理。

为了对此建模,我创建了一个 python 应用程序 daemon.py,它从标准输入读取并返回平方根直到 EOF。它将所有输入记录到指定为命令行参数的日志文件中,并将平方根打印到 stdout,将所有错误打印到 stderr。我认为它模拟了你的程序(当然输出文件的数量只有一个,但它可以缩放)。您可以查看此测试应用程序的源代码 here .请注意对 stdout.flush() 的显式调用。默认情况下,标准输出是打印缓冲的,这意味着它在最后输出,消息不会按顺序到达。我希望您的 Fortran 应用程序不会缓冲其输出。我相信我的示例应用程序可能不会在 Windows 上运行,因为 select 仅在 Unix 上使用,这对您来说无关紧要。

我的消费者应用程序将守护程序应用程序作为子进程启动,标准输入、标准输出和标准错误重定向到 subprocess.PIPE。这些管道中的每一个都分配给不同的线程,一个用于提供输入,三个用于分别处理日志文件、错误和标准输出。他们都将他们的消息添加到一个共享的 Queue 中,您的主线程从中读取并发送到您的解析器。

这是我的消费者代码:

import os, random, time
import subprocess
import threading
import Queue
import atexit

def setup():
    # make a named pipe for every file the program should write
    logfilepipe='logpipe'
    os.mkfifo(logfilepipe)

def cleanup():
    # put your named pipes here to get cleaned up
    logfilepipe='logpipe'
    os.remove(logfilepipe)

# run our cleanup code no matter what - avoid leaving pipes laying around
# even if we terminate early with Ctrl-C
atexit.register(cleanup)

# My example iterator that supplies input for the program. You already have an iterator 
# so don't worry about this. It just returns a random input from the sample_data list
# until the maximum number of iterations is reached.
class MyIter():
    sample_data=[0,1,2,4,9,-100,16,25,100,-8,'seven',10000,144,8,47,91,2.4,'^',56,18,77,94]
    def __init__(self, numiterations=1000):
        self.numiterations=numiterations
        self.current = 0

    def __iter__(self):
        return self

    def next(self):
        self.current += 1
        if self.current > self.numiterations:
            raise StopIteration
        else:
            return random.choice(self.__class__.sample_data)

# Your parse_func function - I just print it out with a [tag] showing its source.
def parse_func(source,line):
    print "[%s] %s" % (source,line)

# Generic function for sending standard input to the problem.
# p - a process handle returned by subprocess
def input_func(p, queue):
    # run the command with output redirected
    for line in MyIter(30): # Limit for testing purposes
        time.sleep(0.1) # sleep a tiny bit
        p.stdin.write(str(line)+'\n')
        queue.put(('INPUT', line))
    p.stdin.close()
    p.wait()

    # Once our process has ended, tell the main thread to quit
    queue.put(('QUIT', True))

# Generic function for reading output from the program. source can either be a
# named pipe identified by a string, or subprocess.PIPE for stdout and stderr.
def read_output(source, queue, tag=None):
    print "Starting to read output for %r" % source
    if isinstance(source,str):
        # Is a file or named pipe, so open it
        source=open(source, 'r') # open file with string name
    line = source.readline()
    # enqueue and read lines until EOF
    while line != '':
        queue.put((tag, line.rstrip()))
        line = source.readline()

if __name__=='__main__':
    cmd='daemon.py'

    # set up our FIFOs instead of using files - put file names into setup() and cleanup()
    setup()

    logfilepipe='logpipe'

    # Message queue for handling all output, whether it's stdout, stderr, or a file output by our command
    lq = Queue.Queue()

    # open the subprocess for command
    print "Running command."
    p = subprocess.Popen(['/path/to/'+cmd,logfilepipe],
                                    stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    # Start threads to handle the input and output
    threading.Thread(target=input_func, args=(p, lq)).start()
    threading.Thread(target=read_output, args=(p.stdout, lq, 'OUTPUT')).start()
    threading.Thread(target=read_output, args=(p.stderr, lq, 'ERRORS')).start()

    # open a thread to read any other output files (e.g. log file) as named pipes
    threading.Thread(target=read_output, args=(logfilepipe, lq, 'LOG')).start()

    # Now combine the results from our threads to do what you want
    run=True
    while(run):
        (tag, line) = lq.get()
        if tag == 'QUIT':
            run=False
        else:
            parse_func(tag, line)

我的迭代器返回一个随机输入值(其中一些是导致错误的垃圾)。你的应该是一个直接的替代品。该程序将一直运行到其输入结束,然后等待子进程完成,然后将 QUIT 消息排队到您的主线程。我的 parse_func 显然非常简单,只需打印出消息及其来源,但您应该能够处理一些事情。从输出源读取的函数设计用于 PIPE 和字符串 - 不要在主线程上打开管道,因为它们会阻塞直到输入可用。所以对于文件读取器(例如读取日志文件),最好让子线程打开文件并阻塞。但是,我们在主线程上生成子进程,这样我们就可以将 stdin、stdout 和 stderr 的句柄传递给它们各自的子线程。

部分基于 this Python implementation of multitail .

关于Python:使用额外的输入和输出文件将数据流输入和解析外部程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31654556/

有关Python:使用额外的输入和输出文件将数据流输入和解析外部程序的更多相关文章

  1. ruby - 如何使用 Nokogiri 的 xpath 和 at_xpath 方法 - 2

    我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div

  2. ruby - 使用 RubyZip 生成 ZIP 文件时设置压缩级别 - 2

    我有一个Ruby程序,它使用rubyzip压缩XML文件的目录树。gem。我的问题是文件开始变得很重,我想提高压缩级别,因为压缩时间不是问题。我在rubyzipdocumentation中找不到一种为创建的ZIP文件指定压缩级别的方法。有人知道如何更改此设置吗?是否有另一个允许指定压缩级别的Ruby库? 最佳答案 这是我通过查看ruby​​zip内部创建的代码。level=Zlib::BEST_COMPRESSIONZip::ZipOutputStream.open(zip_file)do|zip|Dir.glob("**/*")d

  3. ruby - 为什么我可以在 Ruby 中使用 Object#send 访问私有(private)/ protected 方法? - 2

    类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc

  4. ruby-on-rails - 使用 Ruby on Rails 进行自动化测试 - 最佳实践 - 2

    很好奇,就使用ruby​​onrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提

  5. ruby - 在 Ruby 中使用匿名模块 - 2

    假设我做了一个模块如下:m=Module.newdoclassCendend三个问题:除了对m的引用之外,还有什么方法可以访问C和m中的其他内容?我可以在创建匿名模块后为其命名吗(就像我输入“module...”一样)?如何在使用完匿名模块后将其删除,使其定义的常量不再存在? 最佳答案 三个答案:是的,使用ObjectSpace.此代码使c引用你的类(class)C不引用m:c=nilObjectSpace.each_object{|obj|c=objif(Class===objandobj.name=~/::C$/)}当然这取决于

  6. ruby - 其他文件中的 Rake 任务 - 2

    我试图在一个项目中使用rake,如果我把所有东西都放到Rakefile中,它会很大并且很难读取/找到东西,所以我试着将每个命名空间放在lib/rake中它自己的文件中,我添加了这个到我的rake文件的顶部:Dir['#{File.dirname(__FILE__)}/lib/rake/*.rake'].map{|f|requiref}它加载文件没问题,但没有任务。我现在只有一个.rake文件作为测试,名为“servers.rake”,它看起来像这样:namespace:serverdotask:testdoputs"test"endend所以当我运行rakeserver:testid时

  7. ruby-on-rails - 在 Rails 中将文件大小字符串转换为等效千字节 - 2

    我的目标是转换表单输入,例如“100兆字节”或“1GB”,并将其转换为我可以存储在数据库中的文件大小(以千字节为单位)。目前,我有这个:defquota_convert@regex=/([0-9]+)(.*)s/@sizes=%w{kilobytemegabytegigabyte}m=self.quota.match(@regex)if@sizes.include?m[2]eval("self.quota=#{m[1]}.#{m[2]}")endend这有效,但前提是输入是倍数(“gigabytes”,而不是“gigabyte”)并且由于使用了eval看起来疯狂不安全。所以,功能正常,

  8. ruby - 使用 ruby​​ 和 savon 的 SOAP 服务 - 2

    我正在尝试使用ruby​​和Savon来使用网络服务。测试服务为http://www.webservicex.net/WS/WSDetails.aspx?WSID=9&CATID=2require'rubygems'require'savon'client=Savon::Client.new"http://www.webservicex.net/stockquote.asmx?WSDL"client.get_quotedo|soap|soap.body={:symbol=>"AAPL"}end返回SOAP异常。检查soap信封,在我看来soap请求没有正确的命名空间。任何人都可以建议我

  9. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

  10. ruby-on-rails - Rails 3 中的多个路由文件 - 2

    Rails2.3可以选择随时使用RouteSet#add_configuration_file添加更多路由。是否可以在Rails3项目中做同样的事情? 最佳答案 在config/application.rb中:config.paths.config.routes在Rails3.2(也可能是Rails3.1)中,使用:config.paths["config/routes"] 关于ruby-on-rails-Rails3中的多个路由文件,我们在StackOverflow上找到一个类似的问题

随机推荐