草庐IT

python - 多处理 - 读取大输入数据 - 程序挂起

coder 2023-08-24 原文

我想对从文件加载的一些输入数据运行并行计算。 (文件可能非常大,所以我为此使用了一个生成器。)

在一定数量的项目上,我的代码运行正常,但高于此阈值时程序挂起(一些工作进程未结束)。

有什么建议吗? (我用 python2.7、8 个 CPU 运行它;5,000 行仍然可以,7,500 行不起作用。)

首先,您需要一个输入文件。在 bash 中生成它:

for i in {0..10000}; do echo -e "$i"'\r' >> counter.txt; done

然后,运行这个:

python2.7 main.py 100 counter.txt > run_log.txt

ma​​in.py:

#!/usr/bin/python2.7
import os, sys, signal, time
import Queue
import multiprocessing as mp

def eat_queue(job_queue, result_queue):
    """Eats input queue, feeds output queue
    """
    proc_name = mp.current_process().name
    while True:
        try:
            job = job_queue.get(block=False)
            if job == None:
                print(proc_name + " DONE")
                return
            result_queue.put(execute(job))
        except Queue.Empty:
            pass    

def execute(x):
    """Does the computation on the input data
    """
    return x*x

def save_result(result):
    """Saves results in a list
    """
    result_list.append(result)

def load(ifilename):
    """Generator reading the input file and
        yielding it row by row
    """
    ifile = open(ifilename, "r")
    for line in ifile:
        line = line.strip()
        num = int(line)
        yield (num)
    ifile.close()
    print("file closed".upper())

def put_tasks(job_queue, ifilename):
    """Feeds the job queue
    """
    for item in load(ifilename):
        job_queue.put(item)
    for _ in range(get_max_workers()):
        job_queue.put(None)

def get_max_workers():
    """Returns optimal number of processes to run
    """
    max_workers = mp.cpu_count() - 2
    if max_workers < 1:
        return 1
    return max_workers

def run(workers_num, ifilename):
    job_queue = mp.Queue()
    result_queue = mp.Queue()

    # decide how many processes are to be created
    max_workers = get_max_workers()
    print "processes available: %d" % max_workers
    if workers_num < 1 or workers_num > max_workers:
        workers_num = max_workers

    workers_list = []
    # a process for feeding job queue with the input file
    task_gen = mp.Process(target=put_tasks, name="task_gen",
                          args=(job_queue, ifilename))
    workers_list.append(task_gen)

    for i in range(workers_num):
        tmp = mp.Process(target=eat_queue, name="w%d" % (i+1),
                                      args=(job_queue, result_queue))
        workers_list.append(tmp)

    for worker in workers_list:
        worker.start()

    for worker in workers_list:
        worker.join()
        print "worker %s finished!" % worker.name

if __name__ == '__main__':
    result_list = []
    args = sys.argv
    workers_num = int(args[1])
    ifilename = args[2]
    run(workers_num, ifilename)

最佳答案

这是因为您的代码中没有任何内容关闭 result_queue。然后行为取决于内部队列缓冲细节:如果“不是很多”数据在等待,一切看起来都很好,但如果“很多”数据在等待,一切都会卡住。不能再多说了,因为它涉及到内部魔法层;-)但是文档确实警告过它:

Warning

As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe.

This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.

Note that a queue created using a manager does not have this issue. See Programming guidelines.

一个简单的修复方法:首先添加

            result_queue.put(None)

eat_queue() 返回之前。然后添加:

count = 0
while count < workers_num:
    if result_queue.get() is None:
        count += 1

在主程序.join()之前是 worker 。这会耗尽结果队列,然后一切都会干净地关闭。

顺便说一句,这段代码很奇怪:

while True:
    try:
        job = job_queue.get(block=False)
        if job == None:
            print(proc_name + " DONE")
            return
        result_queue.put(execute(job))
    except Queue.Empty:
        pass

为什么要进行非阻塞 get()?只要队列为空,这就会变成 CPU 占用“繁忙循环”。 .get() 的主要目的是提供一种高效 方式来等待工作出现。所以:

while True:
    job = job_queue.get()
    if job is None:
        print(proc_name + " DONE")
        break
    else:
        result_queue.put(execute(job))
result_queue.put(None)

做同样的事情,但效率更高。

队列大小警告

你没有问过这个,但让我们在它咬你之前覆盖它;-) 默认情况下,Queue 的大小没有限制。例如,如果您向 Queue 添加十亿个项目,它将需要足够的 RAM 来容纳十亿个项目。因此,如果您的生产者生成工作项的速度比您的消费者处理它们的速度快,那么内存使用会很快失控。

幸运的是,这很容易修复:指定最大队列大小。例如,

    job_queue = mp.Queue(maxsize=10*workers_num)
                         ^^^^^^^^^^^^^^^^^^^^^^^

然后 job_queue.put(some_work_item) 将阻塞,直到消费者将队列的大小减少到小于最大值。通过这种方式,您可以使用需要普通 RAM 的队列来处理大量问题。

关于python - 多处理 - 读取大输入数据 - 程序挂起,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20574810/

有关python - 多处理 - 读取大输入数据 - 程序挂起的更多相关文章

  1. ruby - 在 Ruby 程序执行时阻止 Windows 7 PC 进入休眠状态 - 2

    我需要在客户计算机上运行Ruby应用程序。通常需要几天才能完成(复制大备份文件)。问题是如果启用sleep,它会中断应用程序。否则,计算机将持续运行数周,直到我下次访问为止。有什么方法可以防止执行期间休眠并让Windows在执行后休眠吗?欢迎任何疯狂的想法;-) 最佳答案 Here建议使用SetThreadExecutionStateWinAPI函数,使应用程序能够通知系统它正在使用中,从而防止系统在应用程序运行时进入休眠状态或关闭显示。像这样的东西:require'Win32API'ES_AWAYMODE_REQUIRED=0x0

  2. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

  3. ruby - 如何将脚本文件的末尾读取为数据文件(Perl 或任何其他语言) - 2

    我正在寻找执行以下操作的正确语法(在Perl、Shell或Ruby中):#variabletoaccessthedatalinesappendedasafileEND_OF_SCRIPT_MARKERrawdatastartshereanditcontinues. 最佳答案 Perl用__DATA__做这个:#!/usr/bin/perlusestrict;usewarnings;while(){print;}__DATA__Texttoprintgoeshere 关于ruby-如何将脚

  4. ruby - 解析 RDFa、微数据等的最佳方式是什么,使用统一的模式/词汇(例如 schema.org)存储和显示信息 - 2

    我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i

  5. ruby - 如何指定 Rack 处理程序 - 2

    Rackup通过Rack的默认处理程序成功运行任何Rack应用程序。例如:classRackAppdefcall(environment)['200',{'Content-Type'=>'text/html'},["Helloworld"]]endendrunRackApp.new但是当最后一行更改为使用Rack的内置CGI处理程序时,rackup给出“NoMethodErrorat/undefinedmethod`call'fornil:NilClass”:Rack::Handler::CGI.runRackApp.newRack的其他内置处理程序也提出了同样的反对意见。例如Rack

  6. ruby - 在 Ruby 中编写命令行实用程序 - 2

    我想用ruby​​编写一个小的命令行实用程序并将其作为gem分发。我知道安装后,Guard、Sass和Thor等某些gem可以从命令行自行运行。为了让gem像二进制文件一样可用,我需要在我的gemspec中指定什么。 最佳答案 Gem::Specification.newdo|s|...s.executable='name_of_executable'...endhttp://docs.rubygems.org/read/chapter/20 关于ruby-在Ruby中编写命令行实用程序

  7. ruby-on-rails - Rails 应用程序之间的通信 - 2

    我构建了两个需要相互通信和发送文件的Rails应用程序。例如,一个Rails应用程序会发送请求以查看其他应用程序数据库中的表。然后另一个应用程序将呈现该表的json并将其发回。我还希望一个应用程序将存储在其公共(public)目录中的文本文件发送到另一个应用程序的公共(public)目录。我从来没有做过这样的事情,所以我什至不知道从哪里开始。任何帮助,将不胜感激。谢谢! 最佳答案 无论Rails是什么,几乎所有Web应用程序都有您的要求,大多数现代Web应用程序都需要相互通信。但是有一个小小的理解需要你坚持下去,网站不应直接访问彼此

  8. ruby - 无法运行 Rails 2.x 应用程序 - 2

    我尝试运行2.x应用程序。我使用rvm并为此应用程序设置其他版本的ruby​​:$rvmuseree-1.8.7-head我尝试运行服务器,然后出现很多错误:$script/serverNOTE:Gem.source_indexisdeprecated,useSpecification.Itwillberemovedonorafter2011-11-01.Gem.source_indexcalledfrom/Users/serg/rails_projects_terminal/work_proj/spohelp/config/../vendor/rails/railties/lib/r

  9. ruby-on-rails - Rails 应用程序中的 Rails : How are you using application_controller. rb 是新手吗? - 2

    刚入门rails,开始慢慢理解。有人可以解释或给我一些关于在application_controller中编码的好处或时间和原因的想法吗?有哪些用例。您如何为Rails应用程序使用应用程序Controller?我不想在那里放太多代码,因为据我了解,每个请求都会调用此Controller。这是真的? 最佳答案 ApplicationController实际上是您应用程序中的每个其他Controller都将从中继承的类(尽管这不是强制性的)。我同意不要用太多代码弄乱它并保持干净整洁的态度,尽管在某些情况下ApplicationContr

  10. Ruby 写入和读取对象到文件 - 2

    好的,所以我的目标是轻松地将一些数据保存到磁盘以备后用。您如何简单地写入然后读取一个对象?所以如果我有一个简单的类classCattr_accessor:a,:bdefinitialize(a,b)@a,@b=a,bendend所以如果我从中非常快地制作一个objobj=C.new("foo","bar")#justgaveitsomerandomvalues然后我可以把它变成一个kindaidstring=obj.to_s#whichreturns""我终于可以将此字符串打印到文件或其他内容中。我的问题是,我该如何再次将这个id变回一个对象?我知道我可以自己挑选信息并制作一个接受该信

随机推荐