草庐IT

c# - 即使MSDN另有建议,为什么Parallel.ForEach比AsParallel()。ForAll()快得多?

coder 2023-07-10 原文

我一直在做一些调查,以了解如何创建运行在树上的多线程应用程序。

为了找到最佳方法,我创建了一个测试应用程序,该应用程序通过C:\磁盘运行并打开所有目录。

class Program
{
    static void Main(string[] args)
    {
        //var startDirectory = @"C:\The folder\RecursiveFolder";
        var startDirectory = @"C:\";

        var w = Stopwatch.StartNew();

        ThisIsARecursiveFunction(startDirectory);

        Console.WriteLine("Elapsed seconds: " + w.Elapsed.TotalSeconds);

        Console.ReadKey();
    }

    public static void ThisIsARecursiveFunction(String currentDirectory)
    {
        var lastBit = Path.GetFileName(currentDirectory);
        var depth = currentDirectory.Count(t => t == '\\');
        //Console.WriteLine(depth + ": " + currentDirectory);

        try
        {
            var children = Directory.GetDirectories(currentDirectory);

            //Edit this mode to switch what way of parallelization it should use
            int mode = 3;

            switch (mode)
            {
                case 1:
                    foreach (var child in children)
                    {
                        ThisIsARecursiveFunction(child);
                    }
                    break;
                case 2:
                    children.AsParallel().ForAll(t =>
                    {
                        ThisIsARecursiveFunction(t);
                    });
                    break;
                case 3:
                    Parallel.ForEach(children, t =>
                    {
                        ThisIsARecursiveFunction(t);
                    });
                    break;
                default:
                    break;
            }

        }
        catch (Exception eee)
        {
            //Exception might occur for directories that can't be accessed.
        }
    }
}

但是我遇到的是,当在模式3(Parallel.ForEach)中运行此代码时,代码将在2.5秒左右完成(是的,我有一个SSD;))。在不进行并行化的情况下运行代码大约需要8秒钟。在模式2(AsParalle.ForAll())中运行代码需要花费几乎无限的时间。

在 checkin 流程浏览器时,我还遇到了一些奇怪的事实:
Mode1 (No Parallelization):
Cpu:     ~25%
Threads: 3
Time to complete: ~8 seconds

Mode2 (AsParallel().ForAll()):
Cpu:     ~0%
Threads: Increasing by one per second (I find this strange since it seems to be waiting on the other threads to complete or a second timeout.)
Time to complete: 1 second per node so about 3 days???

Mode3 (Parallel.ForEach()):
Cpu:     100%
Threads: At most 29-30
Time to complete: ~2.5 seconds

我发现特别奇怪的是,Parallel.ForEach似乎忽略了仍在运行的任何父线程/任务,而AsParallel()。ForAll()似乎在等待上一个Task完成(自从所有父Tasks都不会很快完成)。仍在等待其子任务完成)。

我在MSDN上读到的也是:“尽可能使用ForAll而不是ForEach”

资料来源:http://msdn.microsoft.com/en-us/library/dd997403(v=vs.110).aspx

有人知道为什么会这样吗?

编辑1:

根据Matthew Watson的要求,我先将树加载到内存中,然后再遍历它。现在,树的加载是按顺序完成的。

但是结果是相同的。 Unparallelized和Parallel.ForEach现在可以在大约0.05秒内完成整个树,而AsParallel()。ForAll仍然仅以每秒1步的速度完成。

代码:
class Program
{
    private static DirWithSubDirs RootDir;

    static void Main(string[] args)
    {
        //var startDirectory = @"C:\The folder\RecursiveFolder";
        var startDirectory = @"C:\";

        Console.WriteLine("Loading file system into memory...");
        RootDir = new DirWithSubDirs(startDirectory);
        Console.WriteLine("Done");


        var w = Stopwatch.StartNew();

        ThisIsARecursiveFunctionInMemory(RootDir);

        Console.WriteLine("Elapsed seconds: " + w.Elapsed.TotalSeconds);

        Console.ReadKey();
    }        

    public static void ThisIsARecursiveFunctionInMemory(DirWithSubDirs currentDirectory)
    {
        var depth = currentDirectory.Path.Count(t => t == '\\');
        Console.WriteLine(depth + ": " + currentDirectory.Path);

        var children = currentDirectory.SubDirs;

        //Edit this mode to switch what way of parallelization it should use
        int mode = 2;

        switch (mode)
        {
            case 1:
                foreach (var child in children)
                {
                    ThisIsARecursiveFunctionInMemory(child);
                }
                break;
            case 2:
                children.AsParallel().ForAll(t =>
                {
                    ThisIsARecursiveFunctionInMemory(t);
                });
                break;
            case 3:
                Parallel.ForEach(children, t =>
                {
                    ThisIsARecursiveFunctionInMemory(t);
                });
                break;
            default:
                break;
        }
    }
}

class DirWithSubDirs
{
    public List<DirWithSubDirs> SubDirs = new List<DirWithSubDirs>();
    public String Path { get; private set; }

    public DirWithSubDirs(String path)
    {
        this.Path = path;
        try
        {
            SubDirs = Directory.GetDirectories(path).Select(t => new DirWithSubDirs(t)).ToList();
        }
        catch (Exception eee)
        {
            //Ignore directories that can't be accessed
        }
    }
}

编辑2:

阅读有关Matthew评论的更新后,我尝试将以下代码添加到程序中:
ThreadPool.SetMinThreads(4000, 16);
ThreadPool.SetMaxThreads(4000, 16);

但是,这不会改变AsParallel的执行方式。仍然会立即执行前8个步骤,然后再放慢到1个步骤/秒。

(特别注意,我目前忽略了无法通过Directory.GetDirectories()周围的Try Catch块访问目录时发生的异常)

编辑3:

我主要感兴趣的是Parallel.ForEach和AsParallel.ForAll之间的区别,因为对我来说,很奇怪,由于某种原因,第二个递归为每次递归创建一个线程,而第一个递归则处理大约30个线程中的所有线程最大限度。 (以及为什么MSDN建议使用AsParallel,即使它在〜1秒的超时时间内创建了这么多线程)

编辑4:

我发现了另一个奇怪的事情:
当我尝试将线程池上的MinThreads设置为1023以上时,似乎会忽略该值并将其缩放回8或16左右:
ThreadPool.SetMinThreads(1023,16);

仍然,当我使用1023时,它会非常快地处理前1023个元素,然后又回到我一直都在经历的缓慢步伐。

注意:现在实际上还创建了1000个以上的线程(相比之下,整个Parallel.ForEach每个线程只有30个线程)。

这是否意味着Parallel.ForEach在处理任务方面更聪明?

有关更多信息,当您将值设置为1023以上时,此代码将打印两次8-8 :(将值设置为1023或更低时,它将打印正确的值)
        int threadsMin;
        int completionMin;
        ThreadPool.GetMinThreads(out threadsMin, out completionMin);
        Console.WriteLine("Cur min threads: " + threadsMin + " and the other thing: " + completionMin);

        ThreadPool.SetMinThreads(1023, 16);
        ThreadPool.SetMaxThreads(1023, 16);

        ThreadPool.GetMinThreads(out threadsMin, out completionMin);
        Console.WriteLine("Now min threads: " + threadsMin + " and the other thing: " + completionMin);

编辑5:

根据Dean的要求,我创建了另一个案例来手动创建任务:
case 4:
    var taskList = new List<Task>();
    foreach (var todo in children)
    {
        var itemTodo = todo;
        taskList.Add(Task.Run(() => ThisIsARecursiveFunctionInMemory(itemTodo)));
    }
    Task.WaitAll(taskList.ToArray());
    break;

这也和Parallel.ForEach()循环一样快。因此,对于AsParallel()。ForAll()这么慢的原因,我们仍然没有答案。

最佳答案

这个问题是很容易调试的,当您遇到线程问题时,这是一种罕见的奢侈。您的基本工具是“调试”>“Windows”>“线程调试器”窗口。向您显示 Activity 线程,并让您了解它们的堆栈跟踪。您会很容易看到,一旦变慢,就会有数十个 Activity 的线程被卡住。它们的堆栈跟踪看起来都一样:

    mscorlib.dll!System.Threading.Monitor.Wait(object obj, int millisecondsTimeout, bool exitContext) + 0x16 bytes  
    mscorlib.dll!System.Threading.Monitor.Wait(object obj, int millisecondsTimeout) + 0x7 bytes 
    mscorlib.dll!System.Threading.ManualResetEventSlim.Wait(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) + 0x182 bytes    
    mscorlib.dll!System.Threading.Tasks.Task.SpinThenBlockingWait(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) + 0x93 bytes   
    mscorlib.dll!System.Threading.Tasks.Task.InternalRunSynchronously(System.Threading.Tasks.TaskScheduler scheduler, bool waitForCompletion) + 0xba bytes  
    mscorlib.dll!System.Threading.Tasks.Task.RunSynchronously(System.Threading.Tasks.TaskScheduler scheduler) + 0x13 bytes  
    System.Core.dll!System.Linq.Parallel.SpoolingTask.SpoolForAll<ConsoleApplication1.DirWithSubDirs,int>(System.Linq.Parallel.QueryTaskGroupState groupState, System.Linq.Parallel.PartitionedStream<ConsoleApplication1.DirWithSubDirs,int> partitions, System.Threading.Tasks.TaskScheduler taskScheduler) Line 172  C#
// etc..

每当您看到类似这样的内容时,就应该立即考虑消防软管问题。种族和死锁之后,可能是线程中第三大常见错误。

现在您可以找出原因了,代码的问题在于,每个完成的线程都会增加N个线程。其中N是目录中子目录的平均数量。实际上,线程数呈指数增长,这总是很糟糕的。如果N = 1,它将只处于控制状态,这在典型的磁盘上当然不会发生。

一定要当心,就像几乎所有线程问题一样,这种不当行为往往会重复出现。您计算机中的SSD倾向于将其隐藏。您的计算机中的RAM也是如此,该程序很可能会很快完成并且第二次运行它时不会出现故障。由于您现在将从文件系统缓存而不是磁盘中读取数据,因此速度非常快。修补ThreadPool.SetMinThreads()也将其隐藏,但无法修复。它永远不会解决任何问题,只会隐藏它们。因为无论发生什么情况,指数数始终会淹没所设置的最小线程数。您只能希望它在完成该驱动器之前完成对驱动器的迭代。对于拥有巨大驱动器的用户来说,希望寄予厚望。

现在也许也很容易解释ParallelEnumerable.ForAll()和Parallel.ForEach()之间的区别。您可以从堆栈跟踪中得知ForAll()做得很顽皮,RunSynchronously()方法将阻塞,直到所有线程完成。阻塞是线程池线程不应该做的事情,它阻塞了线程池,并且不允许它为另一个作业安排处理器。并具有您观察到的效果,线程池很快就被等待其他N个线程完成的线程所淹没。不会发生什么,因为它们已经处于 Activity 状态,所以它们正在池中等待并且没有得到调度。

这是一个死锁情况,一种很常见的情况,但是线程池管理器对此有一种解决方法。它监视 Activity 的线程池线程和未及时完成的​​线程。然后,它允许启动一个额外的线程,该线程比SetMinThreads()设置的最小线程多一个。但是不要多于SetMaxThreads()设置的最大值( Activity 的tp线程过多)有风险,并且很可能触发OOM。这确实解决了死锁,它获得了ForAll()调用之一来完成。但这发生的速度非常慢,线程池每秒仅执行两次此操作。您会在耐心 catch 之前耗尽耐心。

Parallel.ForEach()不会出现此问题,它不会阻塞,因此不会增加缓冲池。

似乎是解决方案,但请记住,您的程序仍在忙于寻找计算机的内存,从而向池中添加了更多的等待tp线程。这也可能使您的程序崩溃,只是不太可能,因为您有很多内存,并且线程池不使用很多内存来跟踪请求。但是有些程序员accomplish that as well

该解决方案是一种非常简单的解决方案,只是不要使用线程。这是有害,只有一个磁盘时没有并发。而且它不喜欢被多个线程占领。在主轴驱动器上尤其糟糕,寻头速度非常非常慢。固态硬盘的性能要好得多,但是仍然需要50微秒,这是您不想要或不需要的开销。访问磁盘的理想线程数始终是一个,否则您无法期望该高速缓存会被很好地缓存。

关于c# - 即使MSDN另有建议,为什么Parallel.ForEach比AsParallel()。ForAll()快得多?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25907829/

有关c# - 即使MSDN另有建议,为什么Parallel.ForEach比AsParallel()。ForAll()快得多?的更多相关文章

  1. ruby - 为什么我可以在 Ruby 中使用 Object#send 访问私有(private)/ protected 方法? - 2

    类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc

  2. ruby-on-rails - Rails - 子类化模型的设计模式是什么? - 2

    我有一个模型:classItem项目有一个属性“商店”基于存储的值,我希望Item对象对特定方法具有不同的行为。Rails中是否有针对此的通用设计模式?如果方法中没有大的if-else语句,这是如何干净利落地完成的? 最佳答案 通常通过Single-TableInheritance. 关于ruby-on-rails-Rails-子类化模型的设计模式是什么?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.co

  3. ruby - 什么是填充的 Base64 编码字符串以及如何在 ruby​​ 中生成它们? - 2

    我正在使用的第三方API的文档状态:"[O]urAPIonlyacceptspaddedBase64encodedstrings."什么是“填充的Base64编码字符串”以及如何在Ruby中生成它们。下面的代码是我第一次尝试创建转换为Base64的JSON格式数据。xa=Base64.encode64(a.to_json) 最佳答案 他们说的padding其实就是Base64本身的一部分。它是末尾的“=”和“==”。Base64将3个字节的数据包编码为4个编码字符。所以如果你的输入数据有长度n和n%3=1=>"=="末尾用于填充n%

  4. ruby - 解析 RDFa、微数据等的最佳方式是什么,使用统一的模式/词汇(例如 schema.org)存储和显示信息 - 2

    我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i

  5. ruby - 为什么 4.1%2 使用 Ruby 返回 0.0999999999999996?但是 4.2%2==0.2 - 2

    为什么4.1%2返回0.0999999999999996?但是4.2%2==0.2。 最佳答案 参见此处:WhatEveryProgrammerShouldKnowAboutFloating-PointArithmetic实数是无限的。计算机使用的位数有限(今天是32位、64位)。因此计算机进行的浮点运算不能代表所有的实数。0.1是这些数字之一。请注意,这不是与Ruby相关的问题,而是与所有编程语言相关的问题,因为它来自计算机表示实数的方式。 关于ruby-为什么4.1%2使用Ruby返

  6. ruby - ruby 中的 TOPLEVEL_BINDING 是什么? - 2

    它不等于主线程的binding,这个toplevel作用域是什么?此作用域与主线程中的binding有何不同?>ruby-e'putsTOPLEVEL_BINDING===binding'false 最佳答案 事实是,TOPLEVEL_BINDING始终引用Binding的预定义全局实例,而Kernel#binding创建的新实例>Binding每次封装当前执行上下文。在顶层,它们都包含相同的绑定(bind),但它们不是同一个对象,您无法使用==或===测试它们的绑定(bind)相等性。putsTOPLEVEL_BINDINGput

  7. ruby - Infinity 和 NaN 的类型是什么? - 2

    我可以得到Infinity和NaNn=9.0/0#=>Infinityn.class#=>Floatm=0/0.0#=>NaNm.class#=>Float但是当我想直接访问Infinity或NaN时:Infinity#=>uninitializedconstantInfinity(NameError)NaN#=>uninitializedconstantNaN(NameError)什么是Infinity和NaN?它们是对象、关键字还是其他东西? 最佳答案 您看到打印为Infinity和NaN的只是Float类的两个特殊实例的字符串

  8. ruby-on-rails - 如果 Object::try 被发送到一个 nil 对象,为什么它会起作用? - 2

    如果您尝试在Ruby中的nil对象上调用方法,则会出现NoMethodError异常并显示消息:"undefinedmethod‘...’fornil:NilClass"然而,有一个tryRails中的方法,如果它被发送到一个nil对象,它只返回nil:require'rubygems'require'active_support/all'nil.try(:nonexisting_method)#noNoMethodErrorexceptionanymore那么try如何在内部工作以防止该异常? 最佳答案 像Ruby中的所有其他对象

  9. ruby - 为什么 SecureRandom.uuid 创建一个唯一的字符串? - 2

    关闭。这个问题需要detailsorclarity.它目前不接受答案。想改进这个问题吗?通过editingthispost添加细节并澄清问题.关闭8年前。Improvethisquestion为什么SecureRandom.uuid创建一个唯一的字符串?SecureRandom.uuid#=>"35cb4e30-54e1-49f9-b5ce-4134799eb2c0"SecureRandom.uuid方法创建的字符串从不重复?

  10. ruby - 即使失败也继续进行多主机测试 - 2

    我已经构建了一些serverspec代码来在多个主机上运行一组测试。问题是当任何测试失败时,测试会在当前主机停止。即使测试失败,我也希望它继续在所有主机上运行。Rakefile:namespace:specdotask:all=>hosts.map{|h|'spec:'+h.split('.')[0]}hosts.eachdo|host|begindesc"Runserverspecto#{host}"RSpec::Core::RakeTask.new(host)do|t|ENV['TARGET_HOST']=hostt.pattern="spec/cfengine3/*_spec.r

随机推荐