我在不同的任务上运行了一个非常典型的生产者/消费者模型。
任务 1:从二进制文件中读取成批的 byte[] 并为每个字节数组集合启动一个新任务。 (出于内存管理目的,该操作是批处理的)。
任务 2-n:这些是工作任务,每个任务都对传入的字节数组集合(来自 Tasks1)进行操作并反序列化字节数组,按特定标准对它们进行排序,然后存储结果对象的集合(每个字节数组在并发字典中反序列化为此类对象。
任务 (n+1) 我选择了一个并发字典,因为这个任务的工作是合并存储在并发字典中的那些集合,其顺序与它们来自 Task1 的顺序相同。我通过从 Task1 一直向下传递到此任务的 collectionID(它的类型为 int 并为 Task1 中的每个新集合递增)来实现这一点。此任务基本上检查下一个预期的 collectionID 是否已存储在并发字典中,如果是,则将其取出,将其添加到最终队列并检查并发字典中的下一个集合。
现在,根据我阅读的内容和观看的视频,在我看来,TPL Dataflow 可能是这种生产者/消费者模型的完美候选者。我似乎无法设计出一个设计并因此开始,因为我从未使用过 TPL Dataflow。在吞吐量和延迟方面,这个库是否能胜任这项任务?我目前在生成的集合中每秒处理 250 万个字节数组和对象。 TPL Dataflow 可以帮助简化吗?我对以下问题的答案特别感兴趣:TPL 数据流能否在产生工作任务时保留来自 Task1 的收集批处理的顺序,并在工作任务完成后重新合并它们?它优化了东西吗?在分析了整个结构之后,我觉得由于旋转和涉及的并发集合太多而浪费了很多时间。
有什么想法、想法吗?
最佳答案
编辑: 结果证明我错了。 TransformBlock 确实按照它们进入的相同顺序返回项目,即使它被配置为并行。因此,我原始答案中的代码完全无用且正常 TransformBlock可以代替使用。
原始答案:
据我所知,.Net 中只有一个并行结构支持按照它们进入的顺序返回已处理的项目:PLINQ with AsOrdered() .但是在我看来,PLINQ 并不能很好地满足您的需求。
另一方面,我认为 TPL 数据流非常适合,但它没有支持并行和同时按顺序返回项目的 block (TransformBlock 支持这两者,但不支持同一时间)。幸运的是,Dataflow block 在设计时考虑到了可组合性,因此我们可以构建自己的 block 来实现这一点。
但首先,我们必须弄清楚如何对结果进行排序。像您建议的那样,使用并发字典以及一些同步机制肯定会起作用。但我认为有一个更简单的解决方案:使用 Task 的队列秒。在输出任务中,您出列了一个 Task ,等待它完成(异步),当它完成时,您发送它的结果。当队列为空时,我们仍然需要一些同步,但如果我们巧妙地选择使用哪个队列,我们可以免费获得同步。
所以,总体思路是这样的:我们正在写的将是一个 IPropagatorBlock ,有一些输入和一些输出。创建自定义的最简单方法 IPropagatorBlock是创建一个处理输入的 block ,另一个产生结果的 block ,并使用 DataflowBlock.Encapsulate() 将它们视为一个 block .
输入 block 必须以正确的顺序处理传入的项目,因此那里没有并行化。它将创建一个新的 Task (实际上,一个 TaskCompletionSource ,以便我们稍后可以设置 Task 的结果),将其添加到队列中,然后发送项目进行处理,以及一些设置正确 Task 结果的方法。 .因为我们不需要将这个 block 链接到任何东西,所以我们可以使用 ActionBlock .
输出 block 必须采用Task s 从队列中异步等待它们,然后将它们一起发送。但是因为所有的 block 都有一个嵌入其中的队列,并且接受委托(delegate)的 block 有内置的异步等待,这将非常简单:new TransformBlock<Task<TOutput>, TOutput>(t => t) .该 block 将同时用作队列和输出 block 。因此,我们不必处理任何同步。
拼图的最后一 block 实际上是并行处理项目。为此,我们可以使用另一个 ActionBlock , 这次是 MaxDegreeOfParallelism放。它将接受输入,处理它,并设置正确的 Task 的结果。在队列中。
放在一起,它可能看起来像这样:
public static IPropagatorBlock<TInput, TOutput>
CreateConcurrentOrderedTransformBlock<TInput, TOutput>(
Func<TInput, TOutput> transform)
{
var queue = new TransformBlock<Task<TOutput>, TOutput>(t => t);
var processor = new ActionBlock<Tuple<TInput, Action<TOutput>>>(
tuple => tuple.Item2(transform(tuple.Item1)),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
var enqueuer = new ActionBlock<TInput>(
async item =>
{
var tcs = new TaskCompletionSource<TOutput>();
await processor.SendAsync(
new Tuple<TInput, Action<TOutput>>(item, tcs.SetResult));
await queue.SendAsync(tcs.Task);
});
enqueuer.Completion.ContinueWith(
_ =>
{
queue.Complete();
processor.Complete();
});
return DataflowBlock.Encapsulate(enqueuer, queue);
}
说了这么多,我认为这是相当少的代码。
您似乎非常关心性能,因此您可能需要微调这段代码。例如,设置 MaxDegreeOfParallelism 可能有意义的 processor阻止类似 Environment.ProcessorCount 的内容, 以避免超额认购。此外,如果延迟对您来说比吞吐量更重要,那么设置 MaxMessagesPerTask 可能是有意义的同一 block 的 1(或另一个小数字),这样当一个项目的处理完成时,它会立即发送到输出。
此外,如果您想限制传入的项目,您可以设置 BoundedCapacity的 enqueuer .
关于c# - 这是 TPL Dataflow 的工作吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11052972/
我在从html页面生成PDF时遇到问题。我正在使用PDFkit。在安装它的过程中,我注意到我需要wkhtmltopdf。所以我也安装了它。我做了PDFkit的文档所说的一切......现在我在尝试加载PDF时遇到了这个错误。这里是错误:commandfailed:"/usr/local/bin/wkhtmltopdf""--margin-right""0.75in""--page-size""Letter""--margin-top""0.75in""--margin-bottom""0.75in""--encoding""UTF-8""--margin-left""0.75in""-
我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t
我花了三天的时间用头撞墙,试图弄清楚为什么简单的“rake”不能通过我的规范文件。如果您遇到这种情况:任何文件夹路径中都不要有空格!。严重地。事实上,从现在开始,您命名的任何内容都没有空格。这是我的控制台输出:(在/Users/*****/Desktop/LearningRuby/learn_ruby)$rake/Users/*******/Desktop/LearningRuby/learn_ruby/00_hello/hello_spec.rb:116:in`require':cannotloadsuchfile--hello(LoadError) 最佳
关闭。这个问题需要detailsorclarity.它目前不接受答案。想改进这个问题吗?通过editingthispost添加细节并澄清问题.关闭8年前。Improvethisquestion在首页我有:汽车:VolvoSaabMercedesAudistatic_pages_spec.rb中的测试代码:it"shouldhavetherightselect"dovisithome_pathit{shouldhave_select('cars',:options=>['volvo','saab','mercedes','audi'])}end响应是rspec./spec/request
在Rails4.0.2中,我使用s3_direct_upload和aws-sdkgems直接为s3存储桶上传文件。在开发环境中它工作正常,但在生产环境中它会抛出如下错误,ActionView::Template::Error(noimplicitconversionofnilintoString)在View中,create_cv_url,:id=>"s3_uploader",:key=>"cv_uploads/{unique_id}/${filename}",:key_starts_with=>"cv_uploads/",:callback_param=>"cv[direct_uplo
如何在ruby中调用C#dll? 最佳答案 我能想到几种可能性:为您的DLL编写(或找人编写)一个COM包装器,如果它还没有,则使用Ruby的WIN32OLE库来调用它;看看RubyCLR,其中一位作者是JohnLam,他继续在Microsoft从事IronRuby方面的工作。(估计不会再维护了,可能不支持.Net2.0以上的版本);正如其他地方已经提到的,看看使用IronRuby,如果这是您的技术选择。有一个主题是here.请注意,最后一篇文章实际上来自JohnLam(看起来像是2009年3月),他似乎很自在地断言RubyCL
这个问题在这里已经有了答案:Arraysmisbehaving(1个回答)关闭6年前。是否应该这样,即我误解了,还是错误?a=Array.new(3,Array.new(3))a[1].fill('g')=>[["g","g","g"],["g","g","g"],["g","g","g"]]它不应该导致:=>[[nil,nil,nil],["g","g","g"],[nil,nil,nil]]
我正在尝试在Ruby中复制Convert.ToBase64String()行为。这是我的C#代码:varsha1=newSHA1CryptoServiceProvider();varpasswordBytes=Encoding.UTF8.GetBytes("password");varpasswordHash=sha1.ComputeHash(passwordBytes);returnConvert.ToBase64String(passwordHash);//returns"W6ph5Mm5Pz8GgiULbPgzG37mj9g="当我在Ruby中尝试同样的事情时,我得到了相同sha
使用Ruby1.9.2运行IDE提示说需要gemruby-debug-base19x并提供安装它。但是,在尝试安装它时会显示消息Failedtoinstallgems.Followinggemswerenotinstalled:C:/ProgramFiles(x86)/JetBrains/RubyMine3.2.4/rb/gems/ruby-debug-base19x-0.11.30.pre2.gem:Errorinstallingruby-debug-base19x-0.11.30.pre2.gem:The'linecache19'nativegemrequiresinstall
C#实现简易绘图工具一.引言实验目的:通过制作窗体应用程序(C#画图软件),熟悉基本的窗体设计过程以及控件设计,事件处理等,熟悉使用C#的winform窗体进行绘图的基本步骤,对于面向对象编程有更加深刻的体会.Tutorial任务设计一个具有基本功能的画图软件**·包括简单的新建文件,保存,重新绘图等功能**·实现一些基本图形的绘制,包括铅笔和基本形状等,学习橡皮工具的创建**·设计一个合理舒适的UI界面**注明:你可能需要先了解一些关于winform窗体应用程序绘图的基本知识,以及关于GDI+类和结构的知识二.实验环境Windows系统下的visualstudio2017C#窗体应用程序三.