总结:我想在命令行上编写类似于 bash 脚本的 python 脚本,但我也想在 python 中轻松地将它们组合在一起。我遇到麻烦的地方是使后者发生的胶水。
想象一下,我写了两个脚本,script1.py 和 script2.py,我可以像这样将它们组合在一起:
echo input_string | ./script1.py -a -b | ./script2.py -c -d
如何从另一个 python 文件中获取此行为? 这是我知道的方式,但我不喜欢:
arg_string_1 = convert_to_args(param_1, param_2)
arg_string_2 = convert_to_args(param_3, param_4)
output_string = subprocess.check_output("echo " + input_string + " | ./script1.py " + arg_string_1 + " | ./script2.py " + arg_string_2)
如果我不想利用多线程,我可以这样做(?):
input1 = StringIO(input_string)
output1 = StringIO()
script1.main(param_1, param_2, input1, output1)
input2 = StringIO(output1.get_value())
output2 = StringIO()
script2.main(param_3, param_4, input2, output2)
这是我尝试过的方法,但我在编写胶水时遇到了困难。我很感激学习如何完成下面的方法,或者提出更好的设计/方法的建议!
我的方法:我编写的 script1.py 和 script2.py 看起来像:
#!/usr/bin/python3
... # import sys and define "parse_args"
def main(param_1, param_2, input, output):
for line in input:
...
print(stuff, file=output)
if __name__ == "__main__":
parameter_1, parameter_2 = parse_args(sys.argv)
main(parameter_1, parameter_2, sys.stdin, sys.stdout)
然后我想写这样的东西,但不知道如何完成:
pipe_out, pipe_in = ????
output = StringIO()
thread_1 = Thread(target=script1.main, args=(param_1, param_2, StreamIO(input_string), pipe_out))
thread_2 = Thread(target=script2.main, args=(param_3, param_4, pipe_in, output)
thread_1.start()
thread_2.start()
thread_1.join()
thread_2.join()
output_str = output.get_value()
最佳答案
对于“管道输入”,使用 sys.stdin 和 readlines() 方法。 (使用方法 read() 将一次读取一个字符。)
要将信息从一个线程传递到另一个线程,您可以使用Queue。您必须定义一种方式来表示数据结束。在我的示例中,由于在线程之间传递的所有数据都是 str,我只是使用 None 对象来表示数据结束(因为它不能出现在传输的数据中)。
也可以使用更多的线程,或者在线程中使用不同的函数。
为了简单起见,我没有在示例中包含 sys.argv。修改它以获取参数 (parameter1, ...) 应该很容易。
import sys
from threading import Thread
from Queue import Queue
import fileinput
def stdin_to_queue( output_queue ):
for inp_line in sys.stdin.readlines(): # input one line at at time
output_queue.put( inp_line, True, None ) # blocking, no timeout
output_queue.put( None, True, None ) # signal the end of data
def main1(input_queue, output_queue, arg1, arg2):
do_loop = True
while do_loop:
inp_data = input_queue.get(True)
if inp_data is None:
do_loop = False
output_queue.put( None, True, None ) # signal end of data
else:
out_data = arg1 + inp_data.strip('\r\n').upper() + arg2 # or whatever transformation...
output_queue.put( out_data, True, None )
def queue_to_stdout(input_queue):
do_loop = True
while do_loop:
inp_data = input_queue.get(True)
if inp_data is None:
do_loop = False
else:
sys.stdout.write( inp_data )
def main():
q12 = Queue()
q23 = Queue()
q34 = Queue()
t1 = Thread(target=stdin_to_queue, args=(q12,) )
t2 = Thread(target=main1, args=(q12,q23,'(',')') )
t3 = Thread(target=main1, args=(q23,q34,'[',']') )
t4 = Thread(target=queue_to_stdout, args=(q34,))
t1.start()
t2.start()
t3.start()
t4.start()
main()
最后,我用一个文本文件测试了这个程序(python2)。
head sometextfile.txt | python script.py
关于python - 如何使 python 脚本在 bash 和 python 中都可以通过管道传输,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34459274/
我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div
总的来说,我对ruby还比较陌生,我正在为我正在创建的对象编写一些rspec测试用例。许多测试用例都非常基础,我只是想确保正确填充和返回值。我想知道是否有办法使用循环结构来执行此操作。不必为我要测试的每个方法都设置一个assertEquals。例如:describeitem,"TestingtheItem"doit"willhaveanullvaluetostart"doitem=Item.new#HereIcoulddotheitem.name.shouldbe_nil#thenIcoulddoitem.category.shouldbe_nilendend但我想要一些方法来使用
类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc
关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。
给定这段代码defcreate@upgrades=User.update_all(["role=?","upgraded"],:id=>params[:upgrade])redirect_toadmin_upgrades_path,:notice=>"Successfullyupgradeduser."end我如何在该操作中实际验证它们是否已保存或未重定向到适当的页面和消息? 最佳答案 在Rails3中,update_all不返回任何有意义的信息,除了已更新的记录数(这可能取决于您的DBMS是否返回该信息)。http://ar.ru
我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t
我正在寻找执行以下操作的正确语法(在Perl、Shell或Ruby中):#variabletoaccessthedatalinesappendedasafileEND_OF_SCRIPT_MARKERrawdatastartshereanditcontinues. 最佳答案 Perl用__DATA__做这个:#!/usr/bin/perlusestrict;usewarnings;while(){print;}__DATA__Texttoprintgoeshere 关于ruby-如何将脚
Rackup通过Rack的默认处理程序成功运行任何Rack应用程序。例如:classRackAppdefcall(environment)['200',{'Content-Type'=>'text/html'},["Helloworld"]]endendrunRackApp.new但是当最后一行更改为使用Rack的内置CGI处理程序时,rackup给出“NoMethodErrorat/undefinedmethod`call'fornil:NilClass”:Rack::Handler::CGI.runRackApp.newRack的其他内置处理程序也提出了同样的反对意见。例如Rack
使用带有Rails插件的vim,您可以创建一个迁移文件,然后一次性打开该文件吗?textmate也可以这样吗? 最佳答案 你可以使用rails.vim然后做类似的事情::Rgeneratemigratonadd_foo_to_bar插件将打开迁移生成的文件,这正是您想要的。我不能代表textmate。 关于ruby-使用VimRails,您可以创建一个新的迁移文件并一次性打开它吗?,我们在StackOverflow上找到一个类似的问题: https://sta
在选择我想要运行操作的频率时,唯一的选项是“每天”、“每小时”和“每10分钟”。谢谢!我想为我的Rails3.1应用程序运行调度程序。 最佳答案 这不是一个优雅的解决方案,但您可以安排它每天运行,并在实际开始工作之前检查日期是否为当月的第一天。 关于ruby-如何每月在Heroku运行一次Scheduler插件?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/8692687/