草庐IT

python 多进程队列批量处理数据

Chipeyown 2023-06-25 原文

问题

做生信分析时,会遇到很多样品需要用同一个脚本去分析的情况。这些脚本除了样本名字需要更改,其他的都一样。有时候就会遇到这样的尴尬情况:
(1)如果这些样品一次性提交,然后在后台一个一个的依次分析比较费时间。
(2)如果同时把这些样品全部分析,服务器又带不动。
(3)每次同时分析3个,结束了再提交3个,这个样子就会一直在忙着查看程序进程及提交新的程序,非常累。

用一个比较容易理解的例子来讲就是:服务器出了一系列bug。如果只招一个程序员,一个一个的去修改,太慢了。如果招上和bug数量相等的程序员,每人解决一个bug,经费又不够。如果招上几个程序员,把bug平均分配,每人负责几个,那么必定有人很快把分到的bug解决完了,但是有人就是很慢。服务器想运行就得等着最后那个程序员把他负责的bug解决完。

解决思路

借鉴集群提交任务的思路,借助python的队列和多进程模块,写了一个非常实用的自动化运行小脚本。
能够实现:
(1)把任务全部存到队列里.
(2)任何时间都只有自己规定数量的任务在并行运行,这样防止任务过多服务器带不动。
(3)某个进程运行的任务结束后,就从队列中获取新的任务继续运行。

简单点理解就是说。服务器出了一系列bug。现在招上几个程序员,他们共同处理这几个bug。一个程序员在处理完拿到的任务后,就到队列拿到新的任务去处理。这个可以很快把所有问题都解决完。

实现

from multiprocessing import Process,Queue
import os

def Worker(q):
	while True:
		task=q.get()#task就是你的sample名字
		if task is None:break
		os.system('sh do_task.sh {}.fa'.format(task))#以linux环境为例,这里输入你具体的程序运行命令行
q=Queue()
fa=open('task.txt')
for line in fa:
	sample=line.rstrip()
	q.put(sample)#把要处理的样品加入队列
fa.close()
workers=[]
for i in range(3):#任何时间三个任务并行
	q.put(None)#在队列queue最后加上None,使样品处理完后程序结束运行。
	worker=Process(target=Worker,args=(q,))
	worker.start()
	workers.append(worker)

for worker in workers:
	worker.join()#进程阻塞,保证任务全部完成后再处理后续命令
print('Finished successfully')

task.txt内容举例:
sample1
sample2
sample3
sample4
sample5
sample6

实际在使用时,只需要保证样品名称成功load到队列中及更改程序运行的命令那一行即可,使用起来非常方便。

有关python 多进程队列批量处理数据的更多相关文章

  1. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

  2. ruby - 解析 RDFa、微数据等的最佳方式是什么,使用统一的模式/词汇(例如 schema.org)存储和显示信息 - 2

    我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i

  3. ruby - 如何指定 Rack 处理程序 - 2

    Rackup通过Rack的默认处理程序成功运行任何Rack应用程序。例如:classRackAppdefcall(environment)['200',{'Content-Type'=>'text/html'},["Helloworld"]]endendrunRackApp.new但是当最后一行更改为使用Rack的内置CGI处理程序时,rackup给出“NoMethodErrorat/undefinedmethod`call'fornil:NilClass”:Rack::Handler::CGI.runRackApp.newRack的其他内置处理程序也提出了同样的反对意见。例如Rack

  4. ruby - 在 jRuby 中使用 'fork' 生成进程的替代方案? - 2

    在MRIRuby中我可以这样做:deftransferinternal_server=self.init_serverpid=forkdointernal_server.runend#Maketheserverprocessrunindependently.Process.detach(pid)internal_client=self.init_client#Dootherstuffwithconnectingtointernal_server...internal_client.post('somedata')ensure#KillserverProcess.kill('KILL',

  5. ruby - 通过 ruby​​ 进程共享变量 - 2

    我正在编写一个gem,我必须在其中fork两个启动两个webrick服务器的进程。我想通过基类的类方法启动这个服务器,因为应该只有这两个服务器在运行,而不是多个。在运行时,我想调用这两个服务器上的一些方法来更改变量。我的问题是,我无法通过基类的类方法访问fork的实例变量。此外,我不能在我的基类中使用线程,因为在幕后我正在使用另一个不是线程安全的库。所以我必须将每个服务器派生到它自己的进程。我用类变量试过了,比如@@server。但是当我试图通过基类访问这个变量时,它是nil。我读到在Ruby中不可能在分支之间共享类变量,对吗?那么,还有其他解决办法吗?我考虑过使用单例,但我不确定这是

  6. ruby - Ruby 有 `Pair` 数据类型吗? - 2

    有时我需要处理键/值数据。我不喜欢使用数组,因为它们在大小上没有限制(很容易不小心添加超过2个项目,而且您最终需要稍后验证大小)。此外,0和1的索引变成了魔数(MagicNumber),并且在传达含义方面做得很差(“当我说0时,我的意思是head...”)。散列也不合适,因为可能会不小心添加额外的条目。我写了下面的类来解决这个问题:classPairattr_accessor:head,:taildefinitialize(h,t)@head,@tail=h,tendend它工作得很好并且解决了问题,但我很想知道:Ruby标准库是否已经带有这样一个类? 最佳

  7. ruby - 分布式事务和队列,ruby,erlang,scala - 2

    我有一个涉及多台机器、消息队列和事务的问题。因此,例如用户点击网页,点击将消息发送到另一台机器,该机器将付款添加到用户的帐户。每秒可能有数千次点击。事务的所有方面都应该是容错的。我以前从未遇到过这样的事情,但一些阅读表明这是一个众所周知的问题。所以我的问题。我假设安全的方法是使用两阶段提交,但协议(protocol)是阻塞的,所以我不会获得所需的性能,我是否正确?我通常写Ruby,但似乎Redis之类的数据库和Rescue、RabbitMQ等消息队列系统对我的帮助不大——即使我实现某种两阶段提交,如果Redis崩溃,数据也会丢失,因为它本质上只是内存。所有这些让我开始关注erlang和

  8. Python 相当于 Perl/Ruby ||= - 2

    这个问题在这里已经有了答案:关闭10年前。PossibleDuplicate:Pythonconditionalassignmentoperator对于这样一个简单的问题表示歉意,但是谷歌搜索||=并不是很有帮助;)Python中是否有与Ruby和Perl中的||=语句等效的语句?例如:foo="hey"foo||="what"#assignfooifit'sundefined#fooisstill"hey"bar||="yeah"#baris"yeah"另外,类似这样的东西的通用术语是什么?条件分配是我的第一个猜测,但Wikipediapage跟我想的不太一样。

  9. java - 什么相当于 ruby​​ 的 rack 或 python 的 Java wsgi? - 2

    什么是ruby​​的rack或python的Java的wsgi?还有一个路由库。 最佳答案 来自Python标准PEP333:Bycontrast,althoughJavahasjustasmanywebapplicationframeworksavailable,Java's"servlet"APImakesitpossibleforapplicationswrittenwithanyJavawebapplicationframeworktoruninanywebserverthatsupportstheservletAPI.ht

  10. ruby - 我如何添加二进制数据来遏制 POST - 2

    我正在尝试使用Curbgem执行以下POST以解析云curl-XPOST\-H"X-Parse-Application-Id:PARSE_APP_ID"\-H"X-Parse-REST-API-Key:PARSE_API_KEY"\-H"Content-Type:image/jpeg"\--data-binary'@myPicture.jpg'\https://api.parse.com/1/files/pic.jpg用这个:curl=Curl::Easy.new("https://api.parse.com/1/files/lion.jpg")curl.multipart_form_

随机推荐