草庐IT

python - 为什么这个 python 多处理脚本会在一段时间后变慢?

coder 2023-08-24 原文

建立在 script from this answer 上,我有以下场景:一个包含 2500 个大文本文件(每个约 55Mb)的文件夹,所有文件均以制表符分隔。基本上是 Web 日志。

我需要对每个文件的每一行中的第二个“列”进行 md5 散列,将修改后的文件保存在别处。源文件在机械磁盘上​​,目标文件在 SSD 上。

脚本处理前 25 个(左右)文件的速度非常快。然后它会慢下来。根据前 25 个文件,它应该在 2 分钟左右的时间内完成所有这些文件。但是,根据之后的表现,全部完成需要 15 分钟左右。

它在具有 32 Gb RAM 的服务器上运行,任务管理器很少显示超过 6 Gb 的使用情况。我将其设置为启动 6 个进程,但内核的 CPU 使用率很低,很少超过 15%。

为什么会变慢?磁盘的读/写问题?垃圾收集器?糟糕的代码?关于如何加快速度的任何想法?

这是脚本

import os

import multiprocessing
from multiprocessing import Process
import threading
import hashlib

class ThreadRunner(threading.Thread):
    """ This class represents a single instance of a running thread"""
    def __init__(self, fileset, filedirectory):
        threading.Thread.__init__(self)
        self.files_to_process = fileset
        self.filedir          = filedirectory

    def run(self):
        for current_file in self.files_to_process:

            # Open the current file as read only
            active_file_name = self.filedir + "/" + current_file
            output_file_name = "D:/hashed_data/" + "hashed_" + current_file

            active_file = open(active_file_name, "r")
            output_file = open(output_file_name, "ab+")

            for line in active_file:
                # Load the line, hash the username, save the line
                lineList = line.split("\t")

                if not lineList[1] == "-":
                    lineList[1] = hashlib.md5(lineList[1]).hexdigest()

                lineOut = '\t'.join(lineList)
                output_file.write(lineOut)

            # Always close files after you open them
            active_file.close()
            output_file.close()

            print "\nCompleted " + current_file

class ProcessRunner:
    """ This class represents a single instance of a running process """
    def runp(self, pid, numThreads, fileset, filedirectory):
        mythreads = []
        for tid in range(numThreads):
            th = ThreadRunner(fileset, filedirectory)
            mythreads.append(th) 
        for i in mythreads:
            i.start()
        for i in mythreads:
            i.join()

class ParallelExtractor:    
    def runInParallel(self, numProcesses, numThreads, filedirectory):
        myprocs = []
        prunner = ProcessRunner()

        # Store the file names from that directory in a list that we can iterate
        file_names = os.listdir(filedirectory)

        file_sets = []
        for i in range(numProcesses):
            file_sets.append([])

        for index, name in enumerate(file_names):
            num = index % numProcesses
            file_sets[num].append(name)


        for pid in range(numProcesses):
            pr = Process(target=prunner.runp, args=(pid, numThreads, file_sets[pid], filedirectory)) 
            myprocs.append(pr) 
        for i in myprocs:
            i.start()

        for i in myprocs:
            i.join()

if __name__ == '__main__':    

    file_directory = "E:/original_data"

    processes = 6
    threads   = 1

    extractor = ParallelExtractor()
    extractor.runInParallel(numProcesses=processes, numThreads=threads, filedirectory=file_directory)

最佳答案

散列是一项相对简单的任务,与旋转磁盘的速度相比,现代 CPU 的速度非常快。 i7 上的快速基准测试表明它可以使用 MD5 散列大约 450 MB/s,或者使用 SHA-1 散列 290 MB/s。相比之下,旋转磁盘的典型(顺序原始读取)速度约为 70-150 MB/s。这意味着,即使忽略文件系统的开销和最终的磁盘寻道,CPU 散列文件的速度也比磁盘读取文件的速度快大约 3 倍。

您在处理第一个文件时获得的性能提升可能是因为第一个文件被操作系统缓存在内存中,所以没有磁盘 I/O 发生。这可以通过以下任一方式确认:

  • 重启服务器,从而刷新缓存
  • 通过从磁盘读取足够大的文件,用其他东西填充缓存
  • 在处理第一个文件时仔细聆听是否存在磁盘寻道

现在,由于哈希文件的性能瓶颈是磁盘,因此在多个进程或线程中执行哈希是没有用的,因为它们都将使用同一个磁盘。正如@Max Noel 所提到的,它实际上可以降低 性能,因为您将并行读取多个文件,因此您的磁盘将不得不在文件之间进行查找。正如他所提到的,性能也会因您使用的操作系统的 I/O 调度程序而异。

现在,如果您仍在生成数据,您有一些可能的解决方案:

  • 按照@Max Noel 的建议,使用更快的磁盘或 SSD。
  • 从多个磁盘中读取 - 在不同的文件系统中或在 RAID 上的单个文件系统中
  • 将任务拆分到多台机器上(每台机器有一个或多个磁盘)

但是,如果您只想对这 2500 个文件进行哈希处理,并且您已经将它们放在一个磁盘上,那么这些解决方案就毫无用处。将它们从磁盘读取到其他磁盘然后执行散列较慢,因为您将读取文件两次,并且您可以尽可能快地散列他们。

最后,根据@yaccz 的想法,我想如果您安装了 findxargs< 的="" cygwin="">md5sum

关于python - 为什么这个 python 多处理脚本会在一段时间后变慢?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20383114/

有关python - 为什么这个 python 多处理脚本会在一段时间后变慢?的更多相关文章

  1. ruby - 为什么我可以在 Ruby 中使用 Object#send 访问私有(private)/ protected 方法? - 2

    类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc

  2. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

  3. ruby-on-rails - Rails - 子类化模型的设计模式是什么? - 2

    我有一个模型:classItem项目有一个属性“商店”基于存储的值,我希望Item对象对特定方法具有不同的行为。Rails中是否有针对此的通用设计模式?如果方法中没有大的if-else语句,这是如何干净利落地完成的? 最佳答案 通常通过Single-TableInheritance. 关于ruby-on-rails-Rails-子类化模型的设计模式是什么?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.co

  4. ruby - 什么是填充的 Base64 编码字符串以及如何在 ruby​​ 中生成它们? - 2

    我正在使用的第三方API的文档状态:"[O]urAPIonlyacceptspaddedBase64encodedstrings."什么是“填充的Base64编码字符串”以及如何在Ruby中生成它们。下面的代码是我第一次尝试创建转换为Base64的JSON格式数据。xa=Base64.encode64(a.to_json) 最佳答案 他们说的padding其实就是Base64本身的一部分。它是末尾的“=”和“==”。Base64将3个字节的数据包编码为4个编码字符。所以如果你的输入数据有长度n和n%3=1=>"=="末尾用于填充n%

  5. ruby - 解析 RDFa、微数据等的最佳方式是什么,使用统一的模式/词汇(例如 schema.org)存储和显示信息 - 2

    我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i

  6. ruby - 如何指定 Rack 处理程序 - 2

    Rackup通过Rack的默认处理程序成功运行任何Rack应用程序。例如:classRackAppdefcall(environment)['200',{'Content-Type'=>'text/html'},["Helloworld"]]endendrunRackApp.new但是当最后一行更改为使用Rack的内置CGI处理程序时,rackup给出“NoMethodErrorat/undefinedmethod`call'fornil:NilClass”:Rack::Handler::CGI.runRackApp.newRack的其他内置处理程序也提出了同样的反对意见。例如Rack

  7. ruby - 为什么 4.1%2 使用 Ruby 返回 0.0999999999999996?但是 4.2%2==0.2 - 2

    为什么4.1%2返回0.0999999999999996?但是4.2%2==0.2。 最佳答案 参见此处:WhatEveryProgrammerShouldKnowAboutFloating-PointArithmetic实数是无限的。计算机使用的位数有限(今天是32位、64位)。因此计算机进行的浮点运算不能代表所有的实数。0.1是这些数字之一。请注意,这不是与Ruby相关的问题,而是与所有编程语言相关的问题,因为它来自计算机表示实数的方式。 关于ruby-为什么4.1%2使用Ruby返

  8. ruby - ruby 中的 TOPLEVEL_BINDING 是什么? - 2

    它不等于主线程的binding,这个toplevel作用域是什么?此作用域与主线程中的binding有何不同?>ruby-e'putsTOPLEVEL_BINDING===binding'false 最佳答案 事实是,TOPLEVEL_BINDING始终引用Binding的预定义全局实例,而Kernel#binding创建的新实例>Binding每次封装当前执行上下文。在顶层,它们都包含相同的绑定(bind),但它们不是同一个对象,您无法使用==或===测试它们的绑定(bind)相等性。putsTOPLEVEL_BINDINGput

  9. ruby - Infinity 和 NaN 的类型是什么? - 2

    我可以得到Infinity和NaNn=9.0/0#=>Infinityn.class#=>Floatm=0/0.0#=>NaNm.class#=>Float但是当我想直接访问Infinity或NaN时:Infinity#=>uninitializedconstantInfinity(NameError)NaN#=>uninitializedconstantNaN(NameError)什么是Infinity和NaN?它们是对象、关键字还是其他东西? 最佳答案 您看到打印为Infinity和NaN的只是Float类的两个特殊实例的字符串

  10. ruby-on-rails - 如果 Object::try 被发送到一个 nil 对象,为什么它会起作用? - 2

    如果您尝试在Ruby中的nil对象上调用方法,则会出现NoMethodError异常并显示消息:"undefinedmethod‘...’fornil:NilClass"然而,有一个tryRails中的方法,如果它被发送到一个nil对象,它只返回nil:require'rubygems'require'active_support/all'nil.try(:nonexisting_method)#noNoMethodErrorexceptionanymore那么try如何在内部工作以防止该异常? 最佳答案 像Ruby中的所有其他对象

随机推荐