草庐IT

python - dask 分布式数据帧上的慢 len 函数

coder 2023-08-15 原文

我一直在测试如何使用 dask(具有 20 个内核的集群),我对调用 len 函数与通过 loc 切片的速度相比感到惊讶。

import dask.dataframe as dd
from dask.distributed import Client
client = Client('192.168.1.220:8786')

log = pd.read_csv('800000test', sep='\t')
logd = dd.from_pandas(log,npartitions=20)

#This is the code than runs slowly 
#(2.9 seconds whilst I would expect no more than a few hundred millisencods)

print(len(logd))

#Instead this code is actually running almost 20 times faster than pandas
logd.loc[:'Host'].count().compute()

知道为什么会发生这种情况吗? len 运行得快对我来说并不重要,但我觉得由于不理解这种行为,所以我对这个库有一些不了解的地方。

所有绿色框都对应于“from_pandas”,而在 Matthew Rocklin 的这篇文章中 http://matthewrocklin.com/blog/work/2017/01/12/dask-dataframes调用图看起来更好(调用 len_chunk 明显更快,并且调用似乎没有被锁定并等待一个工作人员完成他的任务,然后再开始另一个工作人员)

最佳答案

问得好,这涉及到数据何时向上移动到集群并返回到客户端(您的 Python session )的几点。让我们看看您计算的几个阶段

使用 Pandas 加载数据

这是您的 python session 中的 Pandas 数据框,因此它显然仍在您的本地进程中。

log = pd.read_csv('800000test', sep='\t')  # on client

转换为懒惰的 Dask.dataframe

这会将您的 Pandas 数据框分解为 20 个 Pandas 数据框,但这些数据框仍在客户端上。 Dask 数据帧不会急切地将数据发送到集群。

logd = dd.from_pandas(log,npartitions=20)  # still on client

计算长度

调用 len 实际上会在此处进行计算(通常您会使用 df.some_aggregation().compute()。所以现在 Dask 启动了。首先它将您的数据移出到集群(慢),然后它在所有 20 个分区上调用 len(快),它聚合这些(快),然后将结果向下移动到您的客户端,以便它可以打印。

print(len(logd))  # costly roundtrip client -> cluster -> client

分析

所以这里的问题是我们的 dask.dataframe 仍然在本地 python session 中拥有它的所有数据。

使用本地线程调度器比分布式调度器要快得多。这应该以毫秒计算

with dask.set_options(get=dask.threaded.get):  # no cluster, just local threads
    print(len(logd))  # stays on client

但您可能想知道如何扩展到更大的数据集,所以让我们以正确的方式进行。

在worker上加载你的数据

让 Dask 工作人员加载 csv 文件的位,而不是在您的客户端/本地 session 中加载 Pandas。这样就不需要客户与工作人员的沟通。

# log = pd.read_csv('800000test', sep='\t')  # on client
log = dd.read_csv('800000test', sep='\t')    # on cluster workers

但是,与 pd.read_csv 不同,dd.read_csv 是惰性的,因此它应该几乎立即返回。我们可以强制 Dask 使用 persist 方法实际进行计算

log = client.persist(log)  # triggers computation asynchronously

现在集群开始运行并直接在工作进程中加载​​数据。这个比较快。请注意,当工作在后台进行时,此方法会立即返回。如果您想等到它完成,请调用 wait

from dask.distributed import wait
wait(log)  # blocks until read is done

如果您正在使用小型数据集进行测试并希望获得更多分区,请尝试更改 block 大小。

log = dd.read_csv(..., blocksize=1000000)  # 1 MB blocks

无论如何,log 上的操作现在应该很快

len(log)  # fast

编辑

回答关于 this blogpost 的问题以下是我们对文件所在位置所做的假设。

通常,当您向 dd.read_csv 提供文件名时,它假定该文件对所有工作人员都是可见的。如果您使用的是网络文件系统,或者像 S3 或 HDFS 这样的全局存储,这是正确的。如果您使用的是网络文件系统,那么您将需要使用绝对路径(如 /path/to/myfile.*.csv),或者确保您的工作人员和客户端具有相同的工作目录.

如果不是这种情况,并且您的数据仅在您的客户端计算机上,那么您将不得不加载并分散它。

简单但次优

简单的方法就是做你原来做的,但坚持你的 dask.dataframe

log = pd.read_csv('800000test', sep='\t')  # on client
logd = dd.from_pandas(log,npartitions=20)  # still on client
logd = client.persist(logd)  # moves to workers

这很好,但会导致沟通不尽如人意。

复杂但最优

相反,您可以明确地将数据分散到集群中

[future] = client.scatter([log])

虽然这会涉及到更复杂的 API,所以我只会向您指出文档

http://distributed.readthedocs.io/en/latest/manage-computation.html http://distributed.readthedocs.io/en/latest/memory.html http://dask.pydata.org/en/latest/delayed-collections.html

关于python - dask 分布式数据帧上的慢 len 函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41902069/

有关python - dask 分布式数据帧上的慢 len 函数的更多相关文章

  1. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

  2. ruby - 解析 RDFa、微数据等的最佳方式是什么,使用统一的模式/词汇(例如 schema.org)存储和显示信息 - 2

    我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i

  3. ruby - 在没有 sass 引擎的情况下使用 sass 颜色函数 - 2

    我想在一个没有Sass引擎的类中使用Sass颜色函数。我已经在项目中使用了sassgem,所以我认为搭载会像以下一样简单:classRectangleincludeSass::Script::FunctionsdefcolorSass::Script::Color.new([0x82,0x39,0x06])enddefrender#hamlengineexecutedwithcontextofself#sothatwithintemlateicouldcall#%stop{offset:'0%',stop:{color:lighten(color)}}endend更新:参见上面的#re

  4. ruby-on-rails - date_field_tag,如何设置默认日期? [ rails 上的 ruby ] - 2

    我想设置一个默认日期,例如实际日期,我该如何设置?还有如何在组合框中设置默认值顺便问一下,date_field_tag和date_field之间有什么区别? 最佳答案 试试这个:将默认日期作为第二个参数传递。youcorrectlysetthedefaultvalueofcomboboxasshowninyourquestion. 关于ruby-on-rails-date_field_tag,如何设置默认日期?[rails上的ruby],我们在StackOverflow上找到一个类似的问

  5. ruby-on-rails - openshift 上的 rails 控制台 - 2

    我将我的Rails应用程序部署到OpenShift,它运行良好,但我无法在生产服务器上运行“Rails控制台”。它给了我这个错误。我该如何解决这个问题?我尝试更新ruby​​gems,但它也给出了权限被拒绝的错误,我也无法做到。railsc错误:Warning:You'reusingRubygems1.8.24withSpring.UpgradetoatleastRubygems2.1.0andrun`gempristine--all`forbetterstartupperformance./opt/rh/ruby193/root/usr/share/rubygems/rubygems

  6. ruby-on-rails - 在 ruby​​ 中使用 gsub 函数替换单词 - 2

    我正在尝试用ruby​​中的gsub函数替换字符串中的某些单词,但有时效果很好,在某些情况下会出现此错误?这种格式有什么问题吗NoMethodError(undefinedmethod`gsub!'fornil:NilClass):模型.rbclassTest"replacethisID1",WAY=>"replacethisID2andID3",DELTA=>"replacethisID4"}end另一个模型.rbclassCheck 最佳答案 啊,我找到了!gsub!是一个非常奇怪的方法。首先,它替换了字符串,所以它实际上修改了

  7. ruby-on-rails - 相关表上的范围为 "WHERE ... LIKE" - 2

    我正在尝试从Postgresql表(table1)中获取数据,该表由另一个相关表(property)的字段(table2)过滤。在纯SQL中,我会这样编写查询:SELECT*FROMtable1JOINtable2USING(table2_id)WHEREtable2.propertyLIKE'query%'这工作正常:scope:my_scope,->(query){includes(:table2).where("table2.property":query)}但我真正需要的是使用LIKE运算符进行过滤,而不是严格相等。然而,这是行不通的:scope:my_scope,->(que

  8. ruby - 在 Ruby 中有条件地定义函数 - 2

    我有一些代码在几个不同的位置之一运行:作为具有调试输出的命令行工具,作为不接受任何输出的更大程序的一部分,以及在Rails环境中。有时我需要根据代码的位置对代码进行细微的更改,我意识到以下样式似乎可行:print"Testingnestedfunctionsdefined\n"CLI=trueifCLIdeftest_printprint"CommandLineVersion\n"endelsedeftest_printprint"ReleaseVersion\n"endendtest_print()这导致:TestingnestedfunctionsdefinedCommandLin

  9. ruby - Ruby 有 `Pair` 数据类型吗? - 2

    有时我需要处理键/值数据。我不喜欢使用数组,因为它们在大小上没有限制(很容易不小心添加超过2个项目,而且您最终需要稍后验证大小)。此外,0和1的索引变成了魔数(MagicNumber),并且在传达含义方面做得很差(“当我说0时,我的意思是head...”)。散列也不合适,因为可能会不小心添加额外的条目。我写了下面的类来解决这个问题:classPairattr_accessor:head,:taildefinitialize(h,t)@head,@tail=h,tendend它工作得很好并且解决了问题,但我很想知道:Ruby标准库是否已经带有这样一个类? 最佳

  10. ruby - 分布式事务和队列,ruby,erlang,scala - 2

    我有一个涉及多台机器、消息队列和事务的问题。因此,例如用户点击网页,点击将消息发送到另一台机器,该机器将付款添加到用户的帐户。每秒可能有数千次点击。事务的所有方面都应该是容错的。我以前从未遇到过这样的事情,但一些阅读表明这是一个众所周知的问题。所以我的问题。我假设安全的方法是使用两阶段提交,但协议(protocol)是阻塞的,所以我不会获得所需的性能,我是否正确?我通常写Ruby,但似乎Redis之类的数据库和Rescue、RabbitMQ等消息队列系统对我的帮助不大——即使我实现某种两阶段提交,如果Redis崩溃,数据也会丢失,因为它本质上只是内存。所有这些让我开始关注erlang和

随机推荐