.NET(C#) Internals: 以一个数组填充的例子初步了解.NET 4.0中的并行(二)

简介:

引言

随着CPU多核的普及,编程时充分利用这个特性越显重要。上篇首先用传统的嵌套循环进行数组填充,然后用.NET 4.0中的System.Threading.Tasks提供的Parallel Class来并行地进行填充,最后对比他们的性能。本文将深入分析Parallel Class并借机回答上篇9楼提出的问题,而System.Threading.Tasks分析,这个将推迟到.NET(C#) Internals: 以一个数组填充的例子初步了解.NET 4.0中的并行(三)中介绍。内容如下:

  1. 1、Parallel Class
    1. 1.1、For方法
    2. 1.2、ForEach方法
    3. 1.3、Invoke方法
  2. 2、并发控制疑问?
    1. 2.1、使用Lock锁
    2. 2.2、使用PLINQ——用AsParallel
    3. 2.3、使用PLINQ——用ParallelEnumerable
    4. 2.4、使用Interlocked操作
    5. 2.5、使用Parallel.For的有Thread-Local变量重载函数
  3. 性能比较

1、Parallel Class

Parallel——这个类提供对通常操作(诸如for、foreach、执行语句块)基于库的数据并行替换。它只是System.Threading.Tasks命名空间的一个类,该命名空间中还包括很多其他的类。下面举个例子来说明如何使用Parallel.For(来自MSDN):

 
  1. using System.Threading.Tasks;     
  2. class Test  
  3. {  
  4.     static int N = 1000;  
  5.  
  6.     static void TestMethod()  
  7.     {  
  8.         // Using a named method.  
  9.         Parallel.For(0, N, Method2);  
  10.  
  11.         // Using an anonymous method.  
  12.         Parallel.For(0, N, delegate(int i)  
  13.         {  
  14.             // Do Work.  
  15.         });  
  16.  
  17.         // Using a lambda expression.  
  18.         Parallel.For(0, N, i =>  
  19.         {  
  20.             // Do Work.  
  21.         });  
  22.     }  
  23.  
  24.     static void Method2(int i)  
  25.     {  
  26.         // Do work.  
  27.     }  

上面这个例子简单易懂,上篇我们就是用的Parallel.For,这里就不解释了。其实Parallel类的方法主要分为下面三类:

  1. For方法
  2. ForEach方法
  3. Invoke方法

1.1、For方法

在里面执行的for循环可能并行地运行,它有12个重载。这12个重载中Int32参数和Int64参数的方法各为6个,下面以Int32为例列出:

  1. For(Int32 fromInclusive, Int32 toExclusive, Action<Int32> body),该方法对区间(fromInclusive,toExclusive)之间的迭代调用body表示的委托。body委托有一个迭代数次的int32参数,如果fromInclusive>=toExclusive,则不会执行任何迭代。
  2. For(Int32 fromInclusive, Int32 toExclusive, Action<Int32, ParallelLoopState>),该方法对区间(fromInclusive, toExclusive)之间的迭代调用body表示的委托。body委托有两个参数——表示迭代数次的int32参数、一个可用于过早地跳出循环的ParallelLoopState实例。如果fromInclusive>=toExclusive,则不会执行任何迭代。 
    调用Break通知For操作当前迭代之后的迭代不需要执行。然而,在此之前的迭代如果没有完成仍然需要执行。因此,调用Break类似于调用break跳出传统的for循环,不是break的原因是它不保证当前迭代之后的迭代绝对不会执行。 
    如果在当前迭代之前的迭代不必要执行,应该调用Stop而不是Break。调用Stop通知For循环放弃剩下的迭代,不管它是否在当前迭代之前或之后,因为所以要求的工作已经完成。然而,Break并不能保证这个。
  3. For(Int32 fromInclusive, Int32 toExclusive, ParallelOptions parallelOptions, Action<Int32> body),跟第一个方法类似,但它的区间是[fromInclusive, toExclusive)。
  4. For(Int32 fromInclusive, Int32 toExclusive, ParallelOptions parallelOptions, Action<Int32, ParallelLoopState> body),跟第二个方法类似,单的区间是[fromInclusive, toExclusive)。
  5. For<TLocal>(Int32 fromInclusive, Int32 toExclusive, Func<TLocal> localInit, Func<Int32, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally),它的迭代区间是[fromInclusive, toExclusive)。另外body有两个local状态变量用于同一线程的迭代之间共享。localInit委托将在每个线程参与循环执行时调用,并返回这些线程初始的local状态。这些初始状态被传递给body,当它在每个线程上第一次调用时。然后,接下来body调用返回一个可能的修改状态值且传递给下一次body调用。最终,最后一次在每个线程上的body调用返回的一个状态值传递给localFinally委托。每个线程执行在自己的loacl 状态上执行最后一个动作时,localFinally委托将被调用。这个委托可能在多个线程上并发执行,因此,你必须同步访问任何共享变量。
  6. For<TLocal>(Int32, Int32, ParallelOptions, Func<TLocal>, Func<Int32, ParallelLoopState, TLocal, TLocal>, Action<TLocal>),跟上面的方法类似。

下面代码演示了For(Int32 fromInclusive, Int32 toExclusive, ParallelOptions parallelOptions, Action<Int32> body)方法(来自MSDN):

 
  1. using System;  
  2. using System.Collections.Generic;  
  3. using System.Linq;  
  4. using System.Text;  
  5. using System.Threading;  
  6. using System.Threading.Tasks;  
  7.  
  8. namespace ConsoleApplication2  
  9. {  
  10.     class Program  
  11.     {  
  12.         // Demonstrated features:  
  13.         //        CancellationTokenSource  
  14.         //         Parallel.For()  
  15.         //        ParallelOptions  
  16.         //        ParallelLoopResult  
  17.         // Expected results:  
  18.         //         An iteration for each argument value (0, 1, 2, 3, 4, 5, 6, 7, 8, 9) is executed.  
  19.         //        The order of execution of the iterations is undefined.  
  20.         //        The iteration when i=2 cancels the loop.  
  21.         //        Some iterations may bail out or not start at all; because they are temporally executed in unpredictable order,   
  22.         //          it is impossible to say which will start/complete and which won't.  
  23.         //        At the end, an OperationCancelledException is surfaced.  
  24.         // Documentation:  
  25.         //        http://msdn.microsoft.com/en-us/library/system.threading.cancellationtokensource(VS.100).aspx  
  26.  
  27.         static void Main(string[] args)  
  28.         {  
  29.             CancellationTokenSource cancellationSource = new CancellationTokenSource();  
  30.             ParallelOptions options = new ParallelOptions();  
  31.             options.CancellationToken = cancellationSource.Token;  
  32.             try 
  33.             {  
  34.                 ParallelLoopResult loopResult = Parallel.For(  
  35.                     0,  
  36.                     10,  
  37.                     options,  
  38.                     (i, loopState) =>  
  39.                     {  
  40.                         Console.WriteLine("Start Thread={0}, i={1}", Thread.CurrentThread.ManagedThreadId, i);  
  41.  
  42.                         // Simulate a cancellation of the loop when i=2  
  43.                         if (i == 2)  
  44.                         {  
  45.                             cancellationSource.Cancel();  
  46.                         }  
  47.  
  48.                         // Simulates a long execution  
  49.                         for (int j = 0; j < 10; j++)  
  50.                         {  
  51.                             Thread.Sleep(1 * 200);  
  52.  
  53.                             // check to see whether or not to continue  
  54.                             if (loopState.ShouldExitCurrentIteration) return;  
  55.                         }  
  56.  
  57.                         Console.WriteLine("Finish Thread={0}, i={1}", Thread.CurrentThread.ManagedThreadId, i);  
  58.                     }  
  59.                 );  
  60.                 if (loopResult.IsCompleted)  
  61.                 {  
  62.                     Console.WriteLine("All iterations completed successfully. THIS WAS NOT EXPECTED.");  
  63.                 }  
  64.             }  
  65.                 // No exception is expected in this example, but if one is still thrown from a task,  
  66.                 // it will be wrapped in AggregateException and propagated to the main thread.  
  67.             catch (AggregateException e)  
  68.             {  
  69.                 Console.WriteLine("Parallel.For has thrown an AggregateException. THIS WAS NOT EXPECTED.\n{0}", e);  
  70.             }  
  71.                 // Catching the cancellation exception  
  72.             catch (OperationCanceledException e)  
  73.             {  
  74.                 Console.WriteLine("An iteration has triggered a cancellation. THIS WAS EXPECTED.\n{0}", e.ToString());  
  75.             }  
  76.         }  
  77.     }  

1.2、ForEach方法

在迭代中执行的foreach操作可能并行地执行,它有20个重载。这个方法太多,但用法大概跟For方法差不多,请自行参考MSDN

1.3、Invoke方法

提供的每个动作可能并行地执行,它有2个重载。

例如下面代码执行了三个操作(来自MSDN):

 
  1. using System;  
  2. using System.Collections.Generic;  
  3. using System.Linq;  
  4. using System.Text;  
  5. using System.Threading;  
  6. using System.Threading.Tasks;  
  7.  
  8. namespace ConsoleApplication2  
  9. {  
  10.     class Program  
  11.     {  
  12.         static void Main()  
  13.         {  
  14.             try 
  15.             {  
  16.                 Parallel.Invoke(  
  17.                     BasicAction,    // Param #0 - static method  
  18.                     () =>            // Param #1 - lambda expression  
  19.                     {  
  20.                         Console.WriteLine("Method=beta, Thread={0}", Thread.CurrentThread.ManagedThreadId);  
  21.                     },  
  22.                     delegate()        // Param #2 - in-line delegate  
  23.                     {  
  24.                         Console.WriteLine("Method=gamma, Thread={0}", Thread.CurrentThread.ManagedThreadId);  
  25.                     }  
  26.                 );  
  27.             }  
  28.             // No exception is expected in this example, but if one is still thrown from a task,  
  29.             // it will be wrapped in AggregateException and propagated to the main thread.  
  30.             catch (AggregateException e)  
  31.             {  
  32.                 Console.WriteLine("An action has thrown an exception. THIS WAS UNEXPECTED.\n{0}", e.InnerException.ToString());  
  33.             }  
  34.         }  
  35.  
  36.         static void BasicAction()  
  37.         {  
  38.             Console.WriteLine("Method=alpha, Thread={0}", Thread.CurrentThread.ManagedThreadId);  
  39.         }  
  40.     }  

2、并发控制疑问?

有人提出以下疑问:“如果For里面的东西,对于顺序敏感的话,会不会有问题。并行处理的话,说到底应该是多线程。如果需要Lock住什么东西的话,应该怎么做呢?例如这个例子不是对数组填充,是对文件操作呢?对某个资源操作呢?”

关于对顺序敏感的话,也就是说该如何加锁来控制?下面我举个例子来说明:对1~1000求和。如果我们想上篇那样简单地用Parallel.For,将会产生错误的结果,代码如下:

 
  1. using System;  
  2. using System.Collections.Generic;  
  3. using System.Linq;  
  4. using System.Text;  
  5. using System.Threading;  
  6. using System.Threading.Tasks;  
  7.  
  8. namespace ConsoleApplication2  
  9. {  
  10.     class Program  
  11.     {          
  12.         static void Main(string[] args)  
  13.         {  
  14.             int loops=0;  
  15.             while (loops <= 100)  
  16.             {  
  17.                 long sum = 0;                  
  18.                 Parallel.For(1, 1001, delegate(long i)  
  19.                 {  
  20.                     sum += i;  
  21.                 });  
  22.                 System.Console.WriteLine(sum);  
  23.                 loops++;  
  24.             }  
  25.         }  
  26.     }  

在上述代码中,为了校验正确性我进行了重复做了100次,得出如下结果:

以一个数组填充的例子初步了解.NET 4.0中的并行(二)

图1、100次的前面部分结果

我们知道500500才是正确的答案,这说明Parallel.For不能保证对sum正确的并发执行,对此我们应该加上适当的控制,并借机来回答上面提出的如何加锁的问题。下面有几种方案可以解决这个问题:

2.1、使用Lock锁

这个我就不多解释了,直接上代码:

 
  1. using System;  
  2. using System.Collections.Generic;  
  3. using System.Linq;  
  4. using System.Text;  
  5. using System.Threading;  
  6. using System.Threading.Tasks;  
  7.  
  8. namespace ConsoleApplication2  
  9. {  
  10.     class Program  
  11.     {  
  12.         static void Main(string[] args)  
  13.         {  
  14.             int loops = 0;  
  15.             object moniter = new object();  
  16.             while (loops <= 100)  
  17.             {  
  18.                 long sum = 0;  
  19.                 Parallel.For(1, 1001, delegate(long i)  
  20.                 {  
  21.                     lock (moniter) { sum += i; }  
  22.                 });  
  23.                 System.Console.WriteLine(sum);  
  24.                 loops++;  
  25.             }  
  26.         }  
  27.     }  

我们加上lock锁之后就会得出正确的结果。

2.2、使用PLINQ——用AsParallel

关于PLINQ,以后将会介绍到,这里不会详细介绍,感兴趣的自行查阅资料。代码如下:

 
  1. using System;  
  2. using System.Collections.Generic;  
  3. using System.Linq;  
  4. using System.Text;  
  5. using System.Threading;  
  6. using System.Threading.Tasks;  
  7.  
  8. namespace ConsoleApplication2  
  9. {  
  10.     class Program  
  11.     {  
  12.         static void Main(string[] args)  
  13.         {  
  14.             int loops = 0;  
  15.             while (loops <= 100)  
  16.             {  
  17.                 long sum = 0;       
  18.                 sum = Enumerable.Range(0, 1001).AsParallel().Sum();  
  19.                 System.Console.WriteLine(sum);  
  20.                 loops++;  
  21.             }  
  22.         }  
  23.     }  

运行可以得到正确的结果。

2.3、使用PLINQ——用ParallelEnumerable

这个也不多说,直接上代码,因为关于PLINQ将在以后详细介绍,感兴趣的自行查阅资料。

 
  1. using System;  
  2. using System.Collections.Generic;  
  3. using System.Linq;  
  4. using System.Text;  
  5. using System.Threading;  
  6. using System.Threading.Tasks;  
  7.  
  8. namespace ConsoleApplication2  
  9. {  
  10.     class Program  
  11.     {  
  12.         static void Main(string[] args)  
  13.         {  
  14.             int loops = 0;  
  15.             while (loops <= 100)  
  16.             {  
  17.                 long sum = 0;  
  18.                 sum = ParallelEnumerable.Range(0, 1001).Sum();   
  19.                 System.Console.WriteLine(sum);  
  20.                 loops++;  
  21.             }  
  22.         }  
  23.     }  

运行同样可以得到正确结果。

2.4、使用Interlocked操作

代码如下:

 
  1. using System;  
  2. using System.Collections.Generic;  
  3. using System.Linq;  
  4. using System.Text;  
  5. using System.Threading;  
  6. using System.Threading.Tasks;  
  7.  
  8. namespace ConsoleApplication2  
  9. {  
  10.     class Program  
  11.     {  
  12.         static void Main(string[] args)  
  13.         {  
  14.             int loops = 0;  
  15.             while (loops <= 100)  
  16.             {  
  17.                 long sum = 0;  
  18.                 Parallel.For(1, 1001, delegate(long i)  
  19.                 {  
  20.                     Interlocked.Add(ref sum, i);  
  21.                 });  
  22.                 System.Console.WriteLine(sum);  
  23.                 loops++;  
  24.             }  
  25.  
  26.         }  
  27.     }  

运行可以得到正确结果。

2.5、使用Parallel.For的有Thread-Local变量重载函数

这个方法已经在1.2中介绍,这里直接上代码,代码如下:

 
  1. using System;  
  2. using System.Collections.Generic;  
  3. using System.Linq;  
  4. using System.Text;  
  5. using System.Threading;  
  6. using System.Threading.Tasks;  
  7.  
  8. namespace ConsoleApplication2  
  9. {  
  10.     class Program  
  11.     {  
  12.         static void Main(string[] args)  
  13.         {  
  14.             int loops = 0;  
  15.             while (loops <= 100)  
  16.             {  
  17.                 int sum = 0;  
  18.                 Parallel.For(0, 1001, () => 0, (i, state,subtotal) =>  
  19.                 {  
  20.                     subtotal += i;  
  21.                     return subtotal;  
  22.                 },  
  23.                 partial => Interlocked.Add(ref sum, partial));  
  24.  
  25.                 System.Console.WriteLine(sum);  
  26.                 loops++;  
  27.             }  
  28.  
  29.         }  
  30.     }  

运行可得正确结果。

3、性能比较

上面的解决方案那个比较好呢?请大家各抒己见!关于这个我已经测试了一下。

PS:感觉写这篇的时候很累,思绪也很乱,不知大家对这篇还满意(⊙_⊙)?有什么地方需要改进,或者说不易理解,或者哪个地方错了!






     本文转自Saylor87 51CTO博客,原文链接:http://blog.51cto.com/skynet/365694,如需转载请自行联系原作者






相关文章
|
4月前
.net-去重的几种情况
.net-去重的几种情况
25 0
一起谈.NET技术,.NET 3.x新特性之自动属性及集合初始化
  今天公司弄了个VS2008 beta 2中文版,虽然很大一部分是为了JS的智能提示外,也应该好好的温习一下.NET的一些新特性,由于以前写过一些文章,但是都没有系统的学过,都只是尝一下新鲜感。不知道从那开始,所以今天就来看看自动话属性,以及对象初始化和集合初始化的一些新特性。
638 0
|
Web App开发 缓存 .NET
一起谈.NET技术,全面认识一下.NET 4的缓存功能
  很多关于.NET 4.0新特性的介绍,缓存功能的增强肯定是不会被忽略的一个重要亮点。在很多文档中都会介绍到在.NET 4.0中,缓存功能的增强主要是在扩展性方面做了改进,改变了原来只能利用内存进行缓存的局限,允许用户在不改变代码的情况下通过修改配置的方式,灵活的切换缓存介质。
1183 0
|
测试技术
一起谈.NET技术,VS2010&amp;.Net 4.0 之并行运算(Parallel)(For、Foreach)
  VS2010&.Net 4.0的Beta2相比Beta1在性能上有了很大的改进,已经基本可以使用了。.NET 4.0给我们带来许多新特性,如动态类型、云平台、并行运算等。本文讨论一下.NET 4.0的并行运算。
990 0
|
Web App开发 JavaScript 前端开发
一起谈.NET技术,VS 2010 和 .NET 4.0 系列之《起始项目模板》篇
本系列文章导航 VS 2010 和 .NET 4.0 系列之《ASP.NET 4 中的SEO改进 》篇 VS 2010 和 .NET 4.0 系列之《干净的Web.Config文件 》篇 VS 2010 和 .
1104 0
一起谈.NET技术,.Net 4.0 Parallel 编程(五)Task (中)
  在上篇文章中我们看过了如何创建Task,本篇文章就各种类型Task的使用进行说明。   Task Continuations   首先我们来看看延续的Task,所谓的延续的Task就是在第一个Task完成后自动启动下一个Task。
809 0
一起谈.NET技术,.Net4.0 Parallel编程(四)Task 上
  在之前的文章中,已经介绍过了Parallel Loop(上、中、下)的相关内容。本篇文章中会就Task基础部分进行些介绍。   初识Task   首先我们来构建一个简单的Task的Demo: static void Main(string[] args){ Task.Factory.StartNew(() => { Console.WriteLine("Hello word!"); }); Console.Read();}   在上面这段代码中我们构建出了一段非常简单的使用Task类的代码,通过其Factory属性的创建出一个Task。
762 0
|
测试技术 C++
一起谈.NET技术,.NET4.0 之 Dynamic VS Reflection 效率
  在我先前的文章中,不断的推广.NET4.0新特性。特别是.NET4.0 Dynamic 这个新特性。随之而来的问题也出现了—Dynamic 执行效率如何?   我们做开发的不光需要代码简洁,能够希望自己能够写出好的架构。
835 0
|
安全
.NET Core中延迟单例另一种写法【.NET Core和.NET Framework的beforefieldinit差异】
1.BeforeFieldInit是什么    前段时间在反编译代码时无意间看到在类中有一个BeforeFieldInit特性,处于好奇的心态查了查这个特性,发现这是一个关于字段初始化时间的特性【提前初始化字段】,下面先来看一下这个特性在.
1779 0
|
JSON 数据格式
.Net 4.X 提前用上 .Net Core 的配置模式以及热重载配置
1. 前言 在提倡微服务及 Serverless 越来越普及的当下,传统 .Net 应用的配置模式往往依赖于一个名为 web.config 的 XML 文件,在可扩展性和可读性与时代脱节了。当然,我不会怂恿一下子把所有应用迁移到 .Net Core 上,本文将在尽量不引入 .Net Core 开发模式的前提下,获得最大的利益。
1204 0