草庐IT

python多处理: some functions do not return when they are complete (queue material too big)

coder 2023-05-24 原文

我正在使用多处理的进程和队列。 我并行启动了几个函数,并且大多数函数都表现良好:它们完成,它们的输出进入它们的队列,它们显示为 .is_alive() == False。但是由于某种原因,一些函数没有运行。它们总是显示 .is_alive() == True,即使在函数的最后一行(打印语句说“完成”)完成之后也是如此。无论我启动了哪些功能,都会发生这种情况,即使它只有一个。如果不并行运行,则函数运行良好并正常返回。什么种类可能是问题?

这是我用来管理作业的通用函数。我没有展示的只是我传递给它的函数。它们很长,经常使用 matplotlib,有时会启动一些 shell 命令,但我不知道失败的命令有什么共同点。

def  runFunctionsInParallel(listOf_FuncAndArgLists):
    """
    Take a list of lists like [function, arg1, arg2, ...]. Run those functions in parallel, wait for them all to finish, and return the list of their return values, in order.   
    """
    from multiprocessing import Process, Queue

    def storeOutputFFF(fff,theArgs,que): #add a argument to function for assigning a queue
        print 'MULTIPROCESSING: Launching %s in parallel '%fff.func_name
        que.put(fff(*theArgs)) #we're putting return value into queue
        print 'MULTIPROCESSING: Finished %s in parallel! '%fff.func_name
        # We get this far even for "bad" functions
        return

    queues=[Queue() for fff in listOf_FuncAndArgLists] #create a queue object for each function
    jobs = [Process(target=storeOutputFFF,args=[funcArgs[0],funcArgs[1:],queues[iii]]) for iii,funcArgs in enumerate(listOf_FuncAndArgLists)]
    for job in jobs: job.start() # Launch them all
    import time
    from math import sqrt
    n=1
    while any([jj.is_alive() for jj in jobs]): # debugging section shows progress updates
        n+=1
        time.sleep(5+sqrt(n)) # Wait a while before next update. Slow down updates for really long runs.
        print('\n---------------------------------------------------\n'+ '\t'.join(['alive?','Job','exitcode','Func',])+ '\n---------------------------------------------------')
        print('\n'.join(['%s:\t%s:\t%s:\t%s'%(job.is_alive()*'Yes',job.name,job.exitcode,listOf_FuncAndArgLists[ii][0].func_name) for ii,job in enumerate(jobs)]))
        print('---------------------------------------------------\n')
    # I never get to the following line when one of the "bad" functions is running.
    for job in jobs: job.join() # Wait for them all to finish... Hm, Is this needed to get at the Queues?
    # And now, collect all the outputs:
    return([queue.get() for queue in queues])

最佳答案

好吧,当函数的输出太大时,用于填充队列的管道似乎被堵塞(我粗略的理解?这是一个未解决/关闭的错误?http://bugs.python.org/issue8237)。我已经修改了我的问题中的代码,以便有一些缓冲(在进程运行时定期清空队列),这解决了我所有的问题。所以现在这需要一组任务(函数及其参数),启动它们,并收集输出。我希望它看起来更简单/更干净。

编辑(2014 年 9 月;2017 年 11 月更新:重写以提高可读性):我正在使用我此后所做的增强来更新代码。新代码(功能相同,但功能更好)在这里: https://gitlab.com/cpbl/cpblUtilities/blob/master/parallel.py

调用说明也在下方。

def runFunctionsInParallel(*args, **kwargs):
    """ This is the main/only interface to class cRunFunctionsInParallel. See its documentation for arguments.
    """
    return cRunFunctionsInParallel(*args, **kwargs).launch_jobs()

###########################################################################################
###
class cRunFunctionsInParallel():
    ###
    #######################################################################################
    """Run any list of functions, each with any arguments and keyword-arguments, in parallel.
The functions/jobs should return (if anything) pickleable results. In order to avoid processes getting stuck due to the output queues overflowing, the queues are regularly collected and emptied.
You can now pass os.system or etc to this as the function, in order to parallelize at the OS level, with no need for a wrapper: I made use of hasattr(builtinfunction,'func_name') to check for a name.
Parameters
----------
listOf_FuncAndArgLists : a list of lists 
    List of up-to-three-element-lists, like [function, args, kwargs],
    specifying the set of functions to be launched in parallel.  If an
    element is just a function, rather than a list, then it is assumed
    to have no arguments or keyword arguments. Thus, possible formats
    for elements of the outer list are:
      function
      [function, list]
      [function, list, dict]
kwargs: dict
    One can also supply the kwargs once, for all jobs (or for those
    without their own non-empty kwargs specified in the list)
names: an optional list of names to identify the processes.
    If omitted, the function name is used, so if all the functions are
    the same (ie merely with different arguments), then they would be
    named indistinguishably
offsetsSeconds: int or list of ints
    delay some functions' start times
expectNonzeroExit: True/False
    Normal behaviour is to not proceed if any function exits with a
    failed exit code. This can be used to override this behaviour.
parallel: True/False
    Whenever the list of functions is longer than one, functions will
    be run in parallel unless this parameter is passed as False
maxAtOnce: int
    If nonzero, this limits how many jobs will be allowed to run at
    once.  By default, this is set according to how many processors
    the hardware has available.
showFinished : int
    Specifies the maximum number of successfully finished jobs to show
    in the text interface (before the last report, which should always
    show them all).
Returns
-------
Returns a tuple of (return codes, return values), each a list in order of the jobs provided.
Issues
-------
Only tested on POSIX OSes.
Examples
--------
See the testParallel() method in this module
    """

关于python多处理: some functions do not return when they are complete (queue material too big),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11854519/

有关python多处理: some functions do not return when they are complete (queue material too big)的更多相关文章

  1. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

  2. ruby - 如何指定 Rack 处理程序 - 2

    Rackup通过Rack的默认处理程序成功运行任何Rack应用程序。例如:classRackAppdefcall(environment)['200',{'Content-Type'=>'text/html'},["Helloworld"]]endendrunRackApp.new但是当最后一行更改为使用Rack的内置CGI处理程序时,rackup给出“NoMethodErrorat/undefinedmethod`call'fornil:NilClass”:Rack::Handler::CGI.runRackApp.newRack的其他内置处理程序也提出了同样的反对意见。例如Rack

  3. Python 相当于 Perl/Ruby ||= - 2

    这个问题在这里已经有了答案:关闭10年前。PossibleDuplicate:Pythonconditionalassignmentoperator对于这样一个简单的问题表示歉意,但是谷歌搜索||=并不是很有帮助;)Python中是否有与Ruby和Perl中的||=语句等效的语句?例如:foo="hey"foo||="what"#assignfooifit'sundefined#fooisstill"hey"bar||="yeah"#baris"yeah"另外,类似这样的东西的通用术语是什么?条件分配是我的第一个猜测,但Wikipediapage跟我想的不太一样。

  4. java - 什么相当于 ruby​​ 的 rack 或 python 的 Java wsgi? - 2

    什么是ruby​​的rack或python的Java的wsgi?还有一个路由库。 最佳答案 来自Python标准PEP333:Bycontrast,althoughJavahasjustasmanywebapplicationframeworksavailable,Java's"servlet"APImakesitpossibleforapplicationswrittenwithanyJavawebapplicationframeworktoruninanywebserverthatsupportstheservletAPI.ht

  5. 华为OD机试用Python实现 -【明明的随机数】 2023Q1A - 2

    华为OD机试题本篇题目:明明的随机数题目输入描述输出描述:示例1输入输出说明代码编写思路最近更新的博客华为od2023|什么是华为od,od薪资待遇,od机试题清单华为OD机试真题大全,用Python解华为机试题|机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为o

  6. python - 如何读取 MIDI 文件、更改其乐器并将其写回? - 2

    我想解析一个已经存在的.mid文件,改变它的乐器,例如从“acousticgrandpiano”到“violin”,然后将它保存回去或作为另一个.mid文件。根据我在文档中看到的内容,该乐器通过program_change或patch_change指令进行了更改,但我找不到任何在已经存在的MIDI文件中执行此操作的库.他们似乎都只支持从头开始创建的MIDI文件。 最佳答案 MIDIpackage会为您完成此操作,但具体方法取决于midi文件的原始内容。一个MIDI文件由一个或多个音轨组成,每个音轨是十六个channel中任何一个上的

  7. 「Python|Selenium|场景案例」如何定位iframe中的元素? - 2

    本文主要介绍在使用Selenium进行自动化测试或者任务时,对于使用了iframe的页面,如何定位iframe中的元素文章目录场景描述解决方案具体代码场景描述当我们在使用Selenium进行自动化测试的时候,可能会遇到一些界面或者窗体是使用HTML的iframe标签进行承载的。对于iframe中的标签,如果直接查找是无法找到的,会抛出没有找到元素的异常。比如近在咫尺的例子就是,CSDN的登录窗体就是使用的iframe,大家可以尝试通过F12开发者模式查看到的tag_name,class_name,id或者xpath来定位中的页面元素,会抛出NoSuchElementException异常。解决

  8. python ffmpeg 使用 pyav 转换 一组图像 到 视频 - 2

    2022/8/4更新支持加入水印水印必须包含透明图像,并且水印图像大小要等于原图像的大小pythonconvert_image_to_video.py-f30-mwatermark.pngim_dirout.mkv2022/6/21更新让命令行参数更加易用新的命令行使用方法pythonconvert_image_to_video.py-f30im_dirout.mkvFFMPEG命令行转换一组JPG图像到视频时,是将这组图像视为MJPG流。我需要转换一组PNG图像到视频,FFMPEG就不认了。pyav内置了ffmpeg库,不需要系统带有ffmpeg工具因此我使用ffmpeg的python包装p

  9. Python 刷Leetcode题库,顺带学英语单词(31) - 2

    ValidPalindromeGivenastring,determineifitisapalindrome,consideringonlyalphanumericcharactersandignoringcases. [#125]Example:"Aman,aplan,acanal:Panama"isapalindrome."raceacar"isnotapalindrome.Haveyouconsiderthatthestringmightbeempty?Thisisagoodquestiontoaskduringaninterview.Forthepurposeofthisproblem

  10. python - 是否可以使用 Ruby 或 Python 禁用 anchor /引用来发出有效的 YAML? - 2

    是否可以在PyYAML或Ruby的Psych引擎中禁用创建anchor和引用(并有效地显式列出冗余数据)?也许我在网上搜索时遗漏了一些东西,但在Psych中似乎没有太多可用的选项,而且我也无法确定PyYAML是否允许这样做.基本原理是我必须序列化一些数据并将其以可读的形式传递给一个不是真正的技术同事进行手动验证。有些数据是多余的,但我需要以最明确的方式列出它们以提高可读性(anchor和引用是提高效率的好概念,但不是人类可读性)。Ruby和Python是我选择的工具,但如果有其他一些相当简单的方法来“展开”YAML文档,它可能就可以了。 最佳答案

随机推荐