我正在运行多个 cat | zgrep 在远程服务器上执行命令并分别收集它们的输出以供进一步处理:
class MainProcessor(mp.Process):
def __init__(self, peaks_array):
super(MainProcessor, self).__init__()
self.peaks_array = peaks_array
def run(self):
for peak_arr in self.peaks_array:
peak_processor = PeakProcessor(peak_arr)
peak_processor.start()
class PeakProcessor(mp.Process):
def __init__(self, peak_arr):
super(PeakProcessor, self).__init__()
self.peak_arr = peak_arr
def run(self):
command = 'ssh remote_host cat files_to_process | zgrep --mmap "regex" '
log_lines = (subprocess.check_output(command, shell=True)).split('\n')
process_data(log_lines)
但是,这会导致顺序执行 subprocess('ssh ... cat ...') 命令。第二个高峰等待第一个完成,依此类推。
我如何修改此代码以便子进程调用并行运行,同时仍然能够单独收集每个子进程的输出?
最佳答案
您不需要multiprocessing 或threading 来并行运行子进程。例如:
#!/usr/bin/env python
from subprocess import Popen
# run commands in parallel
processes = [Popen("echo {i:d}; sleep 2; echo {i:d}".format(i=i), shell=True)
for i in range(5)]
# collect statuses
exitcodes = [p.wait() for p in processes]
它同时运行 5 个 shell 命令。注意:这里既没有使用线程也没有使用 multiprocessing 模块。向 shell 命令添加符号 & 没有意义:Popen 不等待命令完成。您需要显式调用 .wait()。
虽然很方便但没必要使用线程来收集子进程的输出:
#!/usr/bin/env python
from multiprocessing.dummy import Pool # thread pool
from subprocess import Popen, PIPE, STDOUT
# run commands in parallel
processes = [Popen("echo {i:d}; sleep 2; echo {i:d}".format(i=i), shell=True,
stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
for i in range(5)]
# collect output in parallel
def get_lines(process):
return process.communicate()[0].splitlines()
outputs = Pool(len(processes)).map(get_lines, processes)
相关:Python threading multiple bash subprocesses? .
这是在同一线程中同时从多个子进程获取输出的代码示例 (Python 3.8+):
#!/usr/bin/env python3
import asyncio
import sys
from subprocess import PIPE, STDOUT
async def get_lines(shell_command):
p = await asyncio.create_subprocess_shell(
shell_command, stdin=PIPE, stdout=PIPE, stderr=STDOUT
)
return (await p.communicate())[0].splitlines()
async def main():
# get commands output in parallel
coros = [
get_lines(
f'"{sys.executable}" -c "print({i:d}); import time; time.sleep({i:d})"'
)
for i in range(5)
]
print(await asyncio.gather(*coros))
if __name__ == "__main__":
asyncio.run(main())
旧(2014)答案(Python 3.4?):
#!/usr/bin/env python3
import asyncio
import sys
from asyncio.subprocess import PIPE, STDOUT
@asyncio.coroutine
def get_lines(shell_command):
p = yield from asyncio.create_subprocess_shell(shell_command,
stdin=PIPE, stdout=PIPE, stderr=STDOUT)
return (yield from p.communicate())[0].splitlines()
if sys.platform.startswith('win'):
loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
# get commands output in parallel
coros = [get_lines('"{e}" -c "print({i:d}); import time; time.sleep({i:d})"'
.format(i=i, e=sys.executable)) for i in range(5)]
print(loop.run_until_complete(asyncio.gather(*coros)))
loop.close()
关于Python:并行执行cat子进程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23611396/
关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。
我在使用omniauth/openid时遇到了一些麻烦。在尝试进行身份验证时,我在日志中发现了这一点:OpenID::FetchingError:Errorfetchinghttps://www.google.com/accounts/o8/.well-known/host-meta?hd=profiles.google.com%2Fmy_username:undefinedmethod`io'fornil:NilClass重要的是undefinedmethodio'fornil:NilClass来自openid/fetchers.rb,在下面的代码片段中:moduleNetclass
在MRIRuby中我可以这样做:deftransferinternal_server=self.init_serverpid=forkdointernal_server.runend#Maketheserverprocessrunindependently.Process.detach(pid)internal_client=self.init_client#Dootherstuffwithconnectingtointernal_server...internal_client.post('somedata')ensure#KillserverProcess.kill('KILL',
我正在编写一个gem,我必须在其中fork两个启动两个webrick服务器的进程。我想通过基类的类方法启动这个服务器,因为应该只有这两个服务器在运行,而不是多个。在运行时,我想调用这两个服务器上的一些方法来更改变量。我的问题是,我无法通过基类的类方法访问fork的实例变量。此外,我不能在我的基类中使用线程,因为在幕后我正在使用另一个不是线程安全的库。所以我必须将每个服务器派生到它自己的进程。我用类变量试过了,比如@@server。但是当我试图通过基类访问这个变量时,它是nil。我读到在Ruby中不可能在分支之间共享类变量,对吗?那么,还有其他解决办法吗?我考虑过使用单例,但我不确定这是
我遵循了教程http://gettingstartedwithchef.com/,第1章。我的运行list是"run_list":["recipe[apt]","recipe[phpap]"]我的phpapRecipe默认Recipeinclude_recipe"apache2"include_recipe"build-essential"include_recipe"openssl"include_recipe"mysql::client"include_recipe"mysql::server"include_recipe"php"include_recipe"php::modul
我在用Ruby执行简单任务时遇到了一件奇怪的事情。我只想用每个方法迭代字母表,但迭代在执行中先进行:alfawit=("a".."z")puts"That'sanalphabet:\n\n#{alfawit.each{|litera|putslitera}}"这段代码的结果是:(缩写)abc⋮xyzThat'sanalphabet:a..z知道为什么它会这样工作或者我做错了什么吗?提前致谢。 最佳答案 因为您的each调用被插入到在固定字符串之前执行的字符串文字中。此外,each返回一个Enumerable,实际上您甚至打印它。试试
这个问题在这里已经有了答案:关闭10年前。PossibleDuplicate:Pythonconditionalassignmentoperator对于这样一个简单的问题表示歉意,但是谷歌搜索||=并不是很有帮助;)Python中是否有与Ruby和Perl中的||=语句等效的语句?例如:foo="hey"foo||="what"#assignfooifit'sundefined#fooisstill"hey"bar||="yeah"#baris"yeah"另外,类似这样的东西的通用术语是什么?条件分配是我的第一个猜测,但Wikipediapage跟我想的不太一样。
如何检查Ruby文件是否是通过“require”或“load”导入的,而不是简单地从命令行执行的?例如:foo.rb的内容:puts"Hello"bar.rb的内容require'foo'输出:$./foo.rbHello$./bar.rbHello基本上,我想调用bar.rb以不执行puts调用。 最佳答案 将foo.rb改为:if__FILE__==$0puts"Hello"end检查__FILE__-当前ruby文件的名称-与$0-正在运行的脚本的名称。 关于ruby-检查是否
什么是ruby的rack或python的Java的wsgi?还有一个路由库。 最佳答案 来自Python标准PEP333:Bycontrast,althoughJavahasjustasmanywebapplicationframeworksavailable,Java's"servlet"APImakesitpossibleforapplicationswrittenwithanyJavawebapplicationframeworktoruninanywebserverthatsupportstheservletAPI.ht
华为OD机试题本篇题目:明明的随机数题目输入描述输出描述:示例1输入输出说明代码编写思路最近更新的博客华为od2023|什么是华为od,od薪资待遇,od机试题清单华为OD机试真题大全,用Python解华为机试题|机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为o