第 27 章 计算限制的异步操作
第 27 章 计算限制的异步操作
NyxX第 27 章 计算限制的异步操作
本章内容:
- CLR 线程池基础
- 执行简单的计算限制操作
- 执行上下文
- 协作式取消和超时
- 任务
Parallel的静态For,ForEach和Invoke方法- 并行语言集成查询(PLINQ)
- 执行定时的计算限制操作
- 线程池如何管理线程
27.1 CLR 线程池基础
CLR 提供了线程池来管理可用的线程集合。每个CLR都有自己的线程池,但多个CLR之间共享。
基本流程如下:
- 线程池内部维护一个请求列队,用于缓存用户请求需要执行的代码任务,就是ThreadPool.QueueUserWorkItem提交的请求;
- 有新任务后,线程池使用空闲线程或新线程来执行队列请求;
- 任务执行完后线程不会销毁,留着重复使用;
线程池自己负责维护线程的创建和销毁,当线程池中有大量闲置的线程时,线程池会自动结束一部分多余的线程来释放资源;
线程池的不足:
- 线程池内的线程不支持线程的挂起、取消等操作,如想要取消线程里的任务,.NET 支持一种协作式方式取消;
- 线程内的任务没有返回值,也不知道何时执行完成;
- 不支持设置线程的优先级,还包括其他类似需要对线程有更多的控制的需求都不支持;
27.2 执行简单的计算限制操作
要将一个异步的计算限制操作放到线程池的队列中,通常可以调用 ThreadPool 类定义的以下方法之一:
1 | static bool QueueUserWorkItem(WaitCallback callBack); |
27.3 执行上下文
每个线程都关联了一个执行上下文数据结构。执行上下文(execution context)包括的东西有安全设置、宿主设置以及逻辑调用上下文数据。
每当一个线程(初始线程)使用另一个线程(辅助线程)执行任务时,前者的执行上下文应该流向(复制到)辅助线程。
默认情况下,CLR 自动造成初始线程的执行上下文“流向”任何辅助线程。这造成将上下文信息传给辅助线程,但这会对性能造成一定影响。
27.4 协作式取消和超时
取消操作首先要创建一个 System.Threading.CancellationTokenSource 对象,这个对象包含了和管理取消有关的所有状态。可从它的 Token 属性获得一个或多个CancellationToken(一个值类型)实例,并传给你的操作,使操作可以取消。
要执行一个不允许被取消的操作,可向该操作传递通过调用CancellationToken的静态None属性而返回的CancellationToken
1 | ThreadPool.QueueUserWorkItem(o => Count(CancellationToken.None, 10)); |
27.5 任务
Microsoft 引入了 任务的概念。我们通过 System.Threading.Tasks 命名空间中的类型来使用任务。任务可以让你知道操作在什么时候完成,也没有机制在操作完成时获得返回值。
1 | ThreadPool.QueueUserWorkItem(ComputeBoundOp, 5); // 调用 QueueUserWorkItem |
27.5.1 等待任务完成并获取结果
1 | // 创建一个 Task(现在还没有开始运行) |
除了等待单个任务,Task 类还提供了两个静态方法,其中,Task 的静态 WaitAny 方法会阻塞调用线程,直到数组中的任何 Task 对象完成。方法返回 Int32 数组索引值,指明完成的是哪个 Task 对象。
Task 类还有一个静态 WaitAll 方法,它阻塞调用线程,直到数组中的所有 Task 对象完成。如果所有 Task 对象都完成,WaitAll 方法返回true。发生超时则返回 false。如果 WaitAll 通过一个 CancellationToken取消,会抛出一个 OperationCanceledException。
如果计算限制的任务抛出未处理的异常,异常会被“吞噬”并存储到一个集合中,而线程池线程可以返回到线程池中。调用 Wait 方法或者 Result 属性时,这些成员会抛出一个 System.AggregateException 对象。AggregateException 类型封装了异常对象的一个集合
27.5.2 取消任务
可用一个 CancellationTokenSource 取消 Task。首先必须修订前面的 Sun 方法,让它接受一个 CancellationToken:
1 | private static Int32 Sum(CancellationToken ct, Int32 n) { |
现在像下面这样创建 CancellationTokenSource 和 Task 对象:
1 | CancellationTokenSource cts = new CancellationTokenSource(); |
27.5.3 任务完成时自动启动新任务
1 | // 创建并启动一个 Task,继续另一个任务 |
Task 对象内部包含了 ContinueWith 任务的一个集合。所以,实际可以用一个 Task 对象来多次调用 ContinueWith。任务完成时,所有ContinueWith 任务都会进入线程池的队列中。
此外,可在调用 ContinueWith 时传递对一组 TaskContinuationOptions 枚举值进行按位 OR 运算
27.5.4 任务可以启动子任务
最后,任务支持父/子关系,如以下代码所示:
1 | Task<Int32[]> parent = new Task<int[]>(() => { |
// 付任务及其子任务运行完成后,用一个延续任务显示结果
var cwt = parent.ContinueWith(parentTask => Array.ForEach(parentTask.Result, Console.WriteLine));
// 启动父任务,便于它启动它的子任务
parent.Start();TaskCreationOptions.AttachedToParent 标志将一个 Task 和创建它的 Task 关联,结果是除非所有子任务(以及子任务的子任务)结束运行,否则创建任务(父任务)不认为已经结束。
27.5.5 任务内部揭秘
每个 Task 对象都有一组字段,这些字段构成了任务的状态。其中包括:
- 一个
Int32 ID、代表Task执行状态的一个Int32 - 对父任务的引用
- 对
Task创建时指定的TaskScheduler的引用 - 对回调方法的引用
- 对要传给回调方法的对象的引用
- 对
ExecutionContext的引用 - 对
ManualResetEventSlim对象的引用。
另外,每个Task对象都有对根据需要创建的补充状态的引用。补充状态包含 - 一个
CancellationToken - 一个
ContinueWithTask对象集合 - 为抛出未处理异常的子任务而准备的一个
Task对象集合等。
任务 ID 从 1 开始,每分配一个 ID 都递增 1。
27.5.6 任务工厂
有时需要创建一组共享相同配置的 Task 对象。为避免机械地将相同的参数传给每个 Task 的构造器,可创建一个任务工厂来封装通用的配置。
27.5.7 任务调度器(TaskScheduler)
TaskScheduler 对象负责执行被调度的任务
27.6 Parallel 的静态 For,ForEach 和 Invoke方法
使用 Parallel 类的 For 方法,用多个线程池线程辅助完成工作:
图 27-1 展示了构成作为线程池一部分的工作者线程的各种数据结构。ThreadPool.QueueUserWorkItem 方法和 Timer 类总是将工作项放到全局队列中。工作者线程采用一个先入先出(first-in-first-out,FIFO)算法将工作项从这个队列中取出,并处理它们。由于多个工作者线程可能同时从全局队列中拿走工作项,所以所有工作者线程都竞争一个线程同步锁,以保证两个或多个线程不会获取同一个工作项。这个线程同步锁在某些应用程序中可能成为瓶颈,对伸缩性和性能造成某种程度的限制。
1 | // 线程池的线程并行处理工作 |
1 | // 线程池的线程并行处理工作 |
最后,如果要执行多个方法,那么既可像下面这样并行执行,如下所示:
1 | // 线程池的线程并行执行方法 |
调用 Parallel 的方法时有一个很重要的前提条件:工作项必须能并行执行!
Parallel 的 For, ForEach 和 Invoke 方法都提供了接受一个 ParallelOptions 对象的重载版本。这个对象的定义如下:
1 | public class ParallelOptions { |
27.9 线程池如何管理线程
27.9.2 如何管理工作者线程
QueueUserWorkItem 方法和 Timer 类总是将工作项放到全局队列中。工作者线程采用一个先入先出算法将工作项从这个队列中取出,并处理它们。由于多个工作者线程可能同时从全局队列中拿走工作项,所以所有工作者线程都竞争一个线程同步锁,以保证两个或多个线程不会获取同一个工作项。






