草庐IT

python - 理解 Dask 分布式的内存行为

coder 2023-08-19 原文

类似于this question ,我遇到了分布式 Dask 的内存问题。然而,在我的例子中,解释并不是客户端试图收集大量数据。

这个问题可以基于一个非常简单的任务图来说明:delayed 操作列表生成一些固定大小为 ~500 MB 的随机数据帧(模拟从文件加载多个分区)。任务图中的下一个操作是获取每个 DataFrame 的大小。最后将所有大小缩减为一个总大小,即需要返回给客户端的数据很小。

出于测试目的,我正在运行本地调度程序/工作程序单线程,限制为 2GB 内存,即:

$ dask-scheduler
$ dask-worker localhost:8786 --nthreads 1 --memory-limit 2000000000

我对任务图的期望是工作人员永远不需要超过 500 MB 的 RAM,因为在“生成数据”“获取数据大小”> 应该立即使数据变小。但是,我观察到工作人员需要的内存远不止于此:

因子 2 表示必须在内部复制数据。因此,任何使分区大小接近节点物理内存的尝试都会导致 MemoryErrors 或大量交换。

非常感谢任何有助于阐明这一点的信息。特别是:

  • 我是否可以控制数据重复,这是可以避免的吗?或者一般的经验法则是将有效负载保持在远低于 50% 以解决数据重复问题?
  • worker memory-limit 如何影响这个行为?从我的测试来看,使用较低的阈值似乎更早触发 GC(和/或溢出到磁盘?),但另一方面还有其他内存峰值甚至超过使用较高阈值的峰值内存。

请注意,我知道我可以通过在第一个操作中采用 大小来解决这个特定问题,而且 Dask 的单机执行程序可能更适合这个问题,但我要问用于教育目的。


附件一:测试代码

from __future__ import division, print_function
import pandas as pd
import numpy as np
from dask import delayed
from dask.distributed import Client, Executor


def simulate_df_partition_load(part_id):
    """
    Creates a random DataFrame of ~500 MB
    """
    num_rows = 5000000
    num_cols = 13

    df = pd.DataFrame()
    for i in xrange(num_cols):
        data_col = np.random.uniform(0, 1, num_rows)
        df["col_{}".format(i)] = data_col
        del data_col    # for max GC-friendliness

    print("[Partition {}] #rows: {}, #cols: {}, memory: {} MB".format(
        part_id, df.shape[0], df.shape[1],
        df.memory_usage().sum() / (2 ** 20)
    ))
    return df


e = Executor('127.0.0.1:8786', set_as_default=True)

num_partitions = 2

lazy_dataframes = [
    delayed(simulate_df_partition_load)(part_id)
    for part_id in xrange(num_partitions)
]

length_partitions = [df.shape[0] for df in lazy_dataframes]
dag = delayed(sum)(length_partitions)

length_total = dag.compute()

附件二:DAG图示

最佳答案

这里有几个问题:

  1. 为什么我看到的内存使用量是单个数据元素的两倍?
  2. 推荐的行为是让分区大小远低于总内存吗?
  3. 当我超出 --memory-limit 值时会发生什么

为什么我看到内存使用量是原来的两倍?

在执行第一个计算大小的任务之前,worker 可能正在运行两个创建数据的任务。这是因为调度程序将所有当前可运行的任务分配给工作人员,可能超过他们一次可以运行的任务。工作人员完成第一个并向调度程序报告。当调度程序确定要发送给工作人员的新任务(计算大小任务)时,工作人员立即启动另一个创建数据任务。

是否建议将分区大小保持在总内存以下?

是的。

当我超出 --memory-limit 值时会发生什么?

worker 将开始将最近最少使用的数据元素写入磁盘。默认情况下,当您使用大约 60% 的内存时(根据 __sizeof__ 协议(protocol)测量),它会执行此操作。

注意:感谢您提出的问题

关于python - 理解 Dask 分布式的内存行为,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44344171/

有关python - 理解 Dask 分布式的内存行为的更多相关文章

  1. ruby-on-rails - Ruby net/ldap 模块中的内存泄漏 - 2

    作为我的Rails应用程序的一部分,我编写了一个小导入程序,它从我们的LDAP系统中吸取数据并将其塞入一个用户表中。不幸的是,与LDAP相关的代码在遍历我们的32K用户时泄漏了大量内存,我一直无法弄清楚如何解决这个问题。这个问题似乎在某种程度上与LDAP库有关,因为当我删除对LDAP内容的调用时,内存使用情况会很好地稳定下来。此外,不断增加的对象是Net::BER::BerIdentifiedString和Net::BER::BerIdentifiedArray,它们都是LDAP库的一部分。当我运行导入时,内存使用量最终达到超过1GB的峰值。如果问题存在,我需要找到一些方法来更正我的代

  2. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

  3. ruby - 如何根据特征实现 FactoryGirl 的条件行为 - 2

    我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden

  4. ruby-on-rails - Ruby 中的内存模型 - 2

    ruby如何管理内存。例如:如果我们在执行过程中采用C程序,则以下是内存模型。类似于这个ruby如何处理内存。C:__________________|||stack|||------------------||||------------------|||||Heap|||||__________________|||data|__________________|text|__________________Ruby:? 最佳答案 Ruby中没有“内存”这样的东西。Class#allocate分配一个对象并返回该对象。这就是程序

  5. ruby - 分布式事务和队列,ruby,erlang,scala - 2

    我有一个涉及多台机器、消息队列和事务的问题。因此,例如用户点击网页,点击将消息发送到另一台机器,该机器将付款添加到用户的帐户。每秒可能有数千次点击。事务的所有方面都应该是容错的。我以前从未遇到过这样的事情,但一些阅读表明这是一个众所周知的问题。所以我的问题。我假设安全的方法是使用两阶段提交,但协议(protocol)是阻塞的,所以我不会获得所需的性能,我是否正确?我通常写Ruby,但似乎Redis之类的数据库和Rescue、RabbitMQ等消息队列系统对我的帮助不大——即使我实现某种两阶段提交,如果Redis崩溃,数据也会丢失,因为它本质上只是内存。所有这些让我开始关注erlang和

  6. Python 相当于 Perl/Ruby ||= - 2

    这个问题在这里已经有了答案:关闭10年前。PossibleDuplicate:Pythonconditionalassignmentoperator对于这样一个简单的问题表示歉意,但是谷歌搜索||=并不是很有帮助;)Python中是否有与Ruby和Perl中的||=语句等效的语句?例如:foo="hey"foo||="what"#assignfooifit'sundefined#fooisstill"hey"bar||="yeah"#baris"yeah"另外,类似这样的东西的通用术语是什么?条件分配是我的第一个猜测,但Wikipediapage跟我想的不太一样。

  7. java - 什么相当于 ruby​​ 的 rack 或 python 的 Java wsgi? - 2

    什么是ruby​​的rack或python的Java的wsgi?还有一个路由库。 最佳答案 来自Python标准PEP333:Bycontrast,althoughJavahasjustasmanywebapplicationframeworksavailable,Java's"servlet"APImakesitpossibleforapplicationswrittenwithanyJavawebapplicationframeworktoruninanywebserverthatsupportstheservletAPI.ht

  8. 华为OD机试用Python实现 -【明明的随机数】 2023Q1A - 2

    华为OD机试题本篇题目:明明的随机数题目输入描述输出描述:示例1输入输出说明代码编写思路最近更新的博客华为od2023|什么是华为od,od薪资待遇,od机试题清单华为OD机试真题大全,用Python解华为机试题|机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为o

  9. python - 如何读取 MIDI 文件、更改其乐器并将其写回? - 2

    我想解析一个已经存在的.mid文件,改变它的乐器,例如从“acousticgrandpiano”到“violin”,然后将它保存回去或作为另一个.mid文件。根据我在文档中看到的内容,该乐器通过program_change或patch_change指令进行了更改,但我找不到任何在已经存在的MIDI文件中执行此操作的库.他们似乎都只支持从头开始创建的MIDI文件。 最佳答案 MIDIpackage会为您完成此操作,但具体方法取决于midi文件的原始内容。一个MIDI文件由一个或多个音轨组成,每个音轨是十六个channel中任何一个上的

  10. 「Python|Selenium|场景案例」如何定位iframe中的元素? - 2

    本文主要介绍在使用Selenium进行自动化测试或者任务时,对于使用了iframe的页面,如何定位iframe中的元素文章目录场景描述解决方案具体代码场景描述当我们在使用Selenium进行自动化测试的时候,可能会遇到一些界面或者窗体是使用HTML的iframe标签进行承载的。对于iframe中的标签,如果直接查找是无法找到的,会抛出没有找到元素的异常。比如近在咫尺的例子就是,CSDN的登录窗体就是使用的iframe,大家可以尝试通过F12开发者模式查看到的tag_name,class_name,id或者xpath来定位中的页面元素,会抛出NoSuchElementException异常。解决

随机推荐