我正在对 40GB 的数据进行计算。每个文件都是一个包含 json 行的压缩 gzip 文件。每个文件最多有 500,000 行,或大约 500MB。我有一个运行 128 个 CPU 和 1952 GB 内存的亚马逊实例。我要做的是尽快处理每个文件。
我正在使用这样的多处理池:
def initializeLock(l):
global lock
lock = l
if __name__ == '__main__':
directory = '/home/ubuntu/[directory_containing_files]/*.gz'
file_names = glob.glob(directory)
lock = Lock()
pool = Pool(initializer=initializeLock, initargs=(lock,))
pool.map(do_analysis, file_names)
pool.close()
pool.join()
我希望发生的情况是创建大量进程,并且每个进程处理一个文件。实际发生的是最初创建了 100 多个进程。此时我使用了大约 85% 的内存,这太棒了!然后每一个都完成。最终运行的进程数量下降到大约 10 个。此时我只使用了 5% 的内存。定期启动其他进程,但它永远不会恢复到运行 100 个左右。所以我有这个大 CPU 和所有这些空闲内存,但大多数时候我最多运行 10 个进程。
知道如何让它继续运行 100 个进程直到所有文件都完成吗?
编辑:
我向应用程序添加了一些日志记录。最初它加载 127 个进程,我认为这是因为我有 128 个 CPU,并且在加载进程时有一个正在使用。一些过程成功完成,结果被保存。然后在某个时候,除了少数正在运行的进程外,所有进程都结束了。当我查看有多少文件已完成时,127 个中只有 22 个是完整的。然后它只使用 5-10 个进程运行,所有这些都成功完成。我在想它可能会耗尽内存并崩溃。但为什么?我有那么多内存和那么多 CPU。
编辑 2:
所以我找到了问题所在。问题是我在 do_analysis 方法中设置了一个锁,所有进程大约在同一时间完成并等待释放锁。进程没有停止,它们正在休眠。所以这让我想到了另一个问题:我的主要目标是获取每个包含许多 json 行的文件,从 json 行获取 ID 属性,然后将其附加到包含具有相同 id 的其他行的文件。如果该文件不存在,我会创建它。我所做的是在访问文件时设置一个锁,以避免它被另一个进程访问。这是我的代码。
for key, value in dataframe.iteritems():
if os.path.isfile(file_name):
lock.acquire()
value.to_csv(filename), mode='a', header=False, encoding='utf-8')
lock.release()
else:
value.to_csv(filename), header=True, encoding='utf-8')
所以现在我正在尝试想出一种创造性的方法来附加到文件,但不阻止所有其他进程。我正在处理大量数据,需要同时访问两个文件的可能性很小,但它仍然会发生。所以我需要确保在附加文件时,另一个进程不会尝试打开该文件。
最佳答案
感谢大家的意见。这是我目前对这个问题的解决方案,我计划在接下来的一周内提高效率。我接受了 Martin 的建议,一旦文件全部完成,我就将它们粘合在一起,但是,我想努力实现 daphtdazz 解决方案,让一个进程在我生成更多文件的同时使用队列进行粘合。
def do_analyis(file):
# To keep the file names unique, I append the process id to the end
process_id = multiprocessing.current_process().pid
# doing analysis work...
for key, value in dataframe.iteritems():
if os.path.isfile(filename):
value.to_csv(filename), mode='a', header=False, encoding='utf-8')
else:
value.to_csv(filename), header=True, encoding='utf-8')
def merge_files(base_file_name):
write_directory = 'write_directory'
all_files = glob.glob('{0}*'.format(base_file_name))
is_file_created = False
for file in all_files:
if is_file_created:
print 'File already exists, appending'
dataframe = pandas.read_csv(file, index_col=0)
dataframe.to_csv('{0}{1}.csv'.format(write_directory, os.path.basename(base_file_name)), mode='a', header=False, encoding='utf-8')
else:
print 'File does not exist, creating.'
dataframe = pandas.read_csv(file, index_col=0)
dataframe.to_csv('{0}{1}.csv'.format(write_directory, os.path.basename(base_file_name)), header=True, encoding='utf-8')
is_file_created = True
if __name__ == '__main__':
# Run the code to do analysis and group files by the id in the json lines
directory = 'directory'
file_names = glob.glob(directory)
pool = Pool()
pool.imap_unordered(do_analysis, file_names, 1)
pool.close()
pool.join()
# Merge all of the files together
base_list = get_unique_base_file_names('file_directory')
pool = Pool()
pool.imap_unordered(merge_files, base_list, 100)
pool.close()
pool.join()
这会保存每个文件,并在文件末尾附加一个唯一的进程 ID,然后返回并通过 json 文件中的 ID 获取所有文件并将它们合并在一起。创建文件时,cpu 使用率在 60-70% 之间。那是体面的。合并文件时,cpu 使用率约为 8%。这是因为文件合并得如此之快,以至于我不需要我拥有的所有 CPU 处理能力。该解决方案有效。但它可能更有效率。我将努力同时完成这两项工作。欢迎任何建议。
关于Python 多处理池没有创建足够的进程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40725622/
出于纯粹的兴趣,我很好奇如何按顺序创建PI,而不是在过程结果之后生成数字,而是让数字在过程本身生成时显示。如果是这种情况,那么数字可以自行产生,我可以对以前看到的数字实现垃圾收集,从而创建一个无限系列。结果只是在Pi系列之后每秒生成一个数字。这是我通过互联网筛选的结果:这是流行的计算机友好算法,类机器算法:defarccot(x,unity)xpow=unity/xn=1sign=1sum=0loopdoterm=xpow/nbreakifterm==0sum+=sign*(xpow/n)xpow/=x*xn+=2sign=-signendsumenddefcalc_pi(digits
关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。
Rackup通过Rack的默认处理程序成功运行任何Rack应用程序。例如:classRackAppdefcall(environment)['200',{'Content-Type'=>'text/html'},["Helloworld"]]endendrunRackApp.new但是当最后一行更改为使用Rack的内置CGI处理程序时,rackup给出“NoMethodErrorat/undefinedmethod`call'fornil:NilClass”:Rack::Handler::CGI.runRackApp.newRack的其他内置处理程序也提出了同样的反对意见。例如Rack
我好像记得Lua有类似Ruby的method_missing的东西。还是我记错了? 最佳答案 表的metatable的__index和__newindex可以用于与Ruby的method_missing相同的效果。 关于ruby-难道Lua没有和Ruby的method_missing相媲美的东西吗?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/7732154/
使用带有Rails插件的vim,您可以创建一个迁移文件,然后一次性打开该文件吗?textmate也可以这样吗? 最佳答案 你可以使用rails.vim然后做类似的事情::Rgeneratemigratonadd_foo_to_bar插件将打开迁移生成的文件,这正是您想要的。我不能代表textmate。 关于ruby-使用VimRails,您可以创建一个新的迁移文件并一次性打开它吗?,我们在StackOverflow上找到一个类似的问题: https://sta
我对最新版本的Rails有疑问。我创建了一个新应用程序(railsnewMyProject),但我没有脚本/生成,只有脚本/rails,当我输入ruby./script/railsgeneratepluginmy_plugin"Couldnotfindgeneratorplugin.".你知道如何生成插件模板吗?没有这个命令可以创建插件吗?PS:我正在使用Rails3.2.1和ruby1.8.7[universal-darwin11.0] 最佳答案 随着Rails3.2.0的发布,插件生成器已经被移除。查看变更日志here.现在
我有一个奇怪的问题:我在rvm上安装了rubyonrails。一切正常,我可以创建项目。但是在我输入“railsnew”时重新启动后,我有“程序'rails'当前未安装。”。SystemUbuntu12.04ruby-v"1.9.3p194"gemlistactionmailer(3.2.5)actionpack(3.2.5)activemodel(3.2.5)activerecord(3.2.5)activeresource(3.2.5)activesupport(3.2.5)arel(3.0.2)builder(3.0.0)bundler(1.1.4)coffee-rails(
在MRIRuby中我可以这样做:deftransferinternal_server=self.init_serverpid=forkdointernal_server.runend#Maketheserverprocessrunindependently.Process.detach(pid)internal_client=self.init_client#Dootherstuffwithconnectingtointernal_server...internal_client.post('somedata')ensure#KillserverProcess.kill('KILL',
如何使用RSpec::Core::RakeTask初始化RSpecRake任务?require'rspec/core/rake_task'RSpec::Core::RakeTask.newdo|t|#whatdoIputinhere?endInitialize函数记录在http://rubydoc.info/github/rspec/rspec-core/RSpec/Core/RakeTask#initialize-instance_method没有很好的记录;它只是说:-(RakeTask)initialize(*args,&task_block)AnewinstanceofRake
我正在编写一个gem,我必须在其中fork两个启动两个webrick服务器的进程。我想通过基类的类方法启动这个服务器,因为应该只有这两个服务器在运行,而不是多个。在运行时,我想调用这两个服务器上的一些方法来更改变量。我的问题是,我无法通过基类的类方法访问fork的实例变量。此外,我不能在我的基类中使用线程,因为在幕后我正在使用另一个不是线程安全的库。所以我必须将每个服务器派生到它自己的进程。我用类变量试过了,比如@@server。但是当我试图通过基类访问这个变量时,它是nil。我读到在Ruby中不可能在分支之间共享类变量,对吗?那么,还有其他解决办法吗?我考虑过使用单例,但我不确定这是