我一直在测试如何使用 dask(具有 20 个内核的集群),我对调用 len 函数与通过 loc 切片的速度相比感到惊讶。
import dask.dataframe as dd
from dask.distributed import Client
client = Client('192.168.1.220:8786')
log = pd.read_csv('800000test', sep='\t')
logd = dd.from_pandas(log,npartitions=20)
#This is the code than runs slowly
#(2.9 seconds whilst I would expect no more than a few hundred millisencods)
print(len(logd))
#Instead this code is actually running almost 20 times faster than pandas
logd.loc[:'Host'].count().compute()
知道为什么会发生这种情况吗? len 运行得快对我来说并不重要,但我觉得由于不理解这种行为,所以我对这个库有一些不了解的地方。
所有绿色框都对应于“from_pandas”,而在 Matthew Rocklin 的这篇文章中 http://matthewrocklin.com/blog/work/2017/01/12/dask-dataframes调用图看起来更好(调用 len_chunk 明显更快,并且调用似乎没有被锁定并等待一个工作人员完成他的任务,然后再开始另一个工作人员)
最佳答案
问得好,这涉及到数据何时向上移动到集群并返回到客户端(您的 Python session )的几点。让我们看看您计算的几个阶段
这是您的 python session 中的 Pandas 数据框,因此它显然仍在您的本地进程中。
log = pd.read_csv('800000test', sep='\t') # on client
这会将您的 Pandas 数据框分解为 20 个 Pandas 数据框,但这些数据框仍在客户端上。 Dask 数据帧不会急切地将数据发送到集群。
logd = dd.from_pandas(log,npartitions=20) # still on client
调用 len 实际上会在此处进行计算(通常您会使用 df.some_aggregation().compute()。所以现在 Dask 启动了。首先它将您的数据移出到集群(慢),然后它在所有 20 个分区上调用 len(快),它聚合这些(快),然后将结果向下移动到您的客户端,以便它可以打印。
print(len(logd)) # costly roundtrip client -> cluster -> client
所以这里的问题是我们的 dask.dataframe 仍然在本地 python session 中拥有它的所有数据。
使用本地线程调度器比分布式调度器要快得多。这应该以毫秒计算
with dask.set_options(get=dask.threaded.get): # no cluster, just local threads
print(len(logd)) # stays on client
但您可能想知道如何扩展到更大的数据集,所以让我们以正确的方式进行。
让 Dask 工作人员加载 csv 文件的位,而不是在您的客户端/本地 session 中加载 Pandas。这样就不需要客户与工作人员的沟通。
# log = pd.read_csv('800000test', sep='\t') # on client
log = dd.read_csv('800000test', sep='\t') # on cluster workers
但是,与 pd.read_csv 不同,dd.read_csv 是惰性的,因此它应该几乎立即返回。我们可以强制 Dask 使用 persist 方法实际进行计算
log = client.persist(log) # triggers computation asynchronously
现在集群开始运行并直接在工作进程中加载数据。这个比较快。请注意,当工作在后台进行时,此方法会立即返回。如果您想等到它完成,请调用 wait。
from dask.distributed import wait
wait(log) # blocks until read is done
如果您正在使用小型数据集进行测试并希望获得更多分区,请尝试更改 block 大小。
log = dd.read_csv(..., blocksize=1000000) # 1 MB blocks
无论如何,log 上的操作现在应该很快
len(log) # fast
回答关于 this blogpost 的问题以下是我们对文件所在位置所做的假设。
通常,当您向 dd.read_csv 提供文件名时,它假定该文件对所有工作人员都是可见的。如果您使用的是网络文件系统,或者像 S3 或 HDFS 这样的全局存储,这是正确的。如果您使用的是网络文件系统,那么您将需要使用绝对路径(如 /path/to/myfile.*.csv),或者确保您的工作人员和客户端具有相同的工作目录.
如果不是这种情况,并且您的数据仅在您的客户端计算机上,那么您将不得不加载并分散它。
简单的方法就是做你原来做的,但坚持你的 dask.dataframe
log = pd.read_csv('800000test', sep='\t') # on client
logd = dd.from_pandas(log,npartitions=20) # still on client
logd = client.persist(logd) # moves to workers
这很好,但会导致沟通不尽如人意。
相反,您可以明确地将数据分散到集群中
[future] = client.scatter([log])
虽然这会涉及到更复杂的 API,所以我只会向您指出文档
http://distributed.readthedocs.io/en/latest/manage-computation.html http://distributed.readthedocs.io/en/latest/memory.html http://dask.pydata.org/en/latest/delayed-collections.html
关于python - dask 分布式数据帧上的慢 len 函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41902069/
关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。
我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i
我想在一个没有Sass引擎的类中使用Sass颜色函数。我已经在项目中使用了sassgem,所以我认为搭载会像以下一样简单:classRectangleincludeSass::Script::FunctionsdefcolorSass::Script::Color.new([0x82,0x39,0x06])enddefrender#hamlengineexecutedwithcontextofself#sothatwithintemlateicouldcall#%stop{offset:'0%',stop:{color:lighten(color)}}endend更新:参见上面的#re
我想设置一个默认日期,例如实际日期,我该如何设置?还有如何在组合框中设置默认值顺便问一下,date_field_tag和date_field之间有什么区别? 最佳答案 试试这个:将默认日期作为第二个参数传递。youcorrectlysetthedefaultvalueofcomboboxasshowninyourquestion. 关于ruby-on-rails-date_field_tag,如何设置默认日期?[rails上的ruby],我们在StackOverflow上找到一个类似的问
我将我的Rails应用程序部署到OpenShift,它运行良好,但我无法在生产服务器上运行“Rails控制台”。它给了我这个错误。我该如何解决这个问题?我尝试更新rubygems,但它也给出了权限被拒绝的错误,我也无法做到。railsc错误:Warning:You'reusingRubygems1.8.24withSpring.UpgradetoatleastRubygems2.1.0andrun`gempristine--all`forbetterstartupperformance./opt/rh/ruby193/root/usr/share/rubygems/rubygems
我正在尝试用ruby中的gsub函数替换字符串中的某些单词,但有时效果很好,在某些情况下会出现此错误?这种格式有什么问题吗NoMethodError(undefinedmethod`gsub!'fornil:NilClass):模型.rbclassTest"replacethisID1",WAY=>"replacethisID2andID3",DELTA=>"replacethisID4"}end另一个模型.rbclassCheck 最佳答案 啊,我找到了!gsub!是一个非常奇怪的方法。首先,它替换了字符串,所以它实际上修改了
我正在尝试从Postgresql表(table1)中获取数据,该表由另一个相关表(property)的字段(table2)过滤。在纯SQL中,我会这样编写查询:SELECT*FROMtable1JOINtable2USING(table2_id)WHEREtable2.propertyLIKE'query%'这工作正常:scope:my_scope,->(query){includes(:table2).where("table2.property":query)}但我真正需要的是使用LIKE运算符进行过滤,而不是严格相等。然而,这是行不通的:scope:my_scope,->(que
我有一些代码在几个不同的位置之一运行:作为具有调试输出的命令行工具,作为不接受任何输出的更大程序的一部分,以及在Rails环境中。有时我需要根据代码的位置对代码进行细微的更改,我意识到以下样式似乎可行:print"Testingnestedfunctionsdefined\n"CLI=trueifCLIdeftest_printprint"CommandLineVersion\n"endelsedeftest_printprint"ReleaseVersion\n"endendtest_print()这导致:TestingnestedfunctionsdefinedCommandLin
有时我需要处理键/值数据。我不喜欢使用数组,因为它们在大小上没有限制(很容易不小心添加超过2个项目,而且您最终需要稍后验证大小)。此外,0和1的索引变成了魔数(MagicNumber),并且在传达含义方面做得很差(“当我说0时,我的意思是head...”)。散列也不合适,因为可能会不小心添加额外的条目。我写了下面的类来解决这个问题:classPairattr_accessor:head,:taildefinitialize(h,t)@head,@tail=h,tendend它工作得很好并且解决了问题,但我很想知道:Ruby标准库是否已经带有这样一个类? 最佳
我有一个涉及多台机器、消息队列和事务的问题。因此,例如用户点击网页,点击将消息发送到另一台机器,该机器将付款添加到用户的帐户。每秒可能有数千次点击。事务的所有方面都应该是容错的。我以前从未遇到过这样的事情,但一些阅读表明这是一个众所周知的问题。所以我的问题。我假设安全的方法是使用两阶段提交,但协议(protocol)是阻塞的,所以我不会获得所需的性能,我是否正确?我通常写Ruby,但似乎Redis之类的数据库和Rescue、RabbitMQ等消息队列系统对我的帮助不大——即使我实现某种两阶段提交,如果Redis崩溃,数据也会丢失,因为它本质上只是内存。所有这些让我开始关注erlang和