第 27 章 计算限制的异步操作

第 27 章 计算限制的异步操作

本章内容:

27.1 CLR 线程池基础

CLR 提供了线程池来管理可用的线程集合。每个CLR都有自己的线程池,但多个CLR之间共享。

基本流程如下:

  • 线程池内部维护一个请求列队,用于缓存用户请求需要执行的代码任务,就是ThreadPool.QueueUserWorkItem提交的请求;
  • 有新任务后,线程池使用空闲线程或新线程来执行队列请求;
  • 任务执行完后线程不会销毁,留着重复使用;
    线程池自己负责维护线程的创建和销毁,当线程池中有大量闲置的线程时,线程池会自动结束一部分多余的线程来释放资源;

线程池的不足:

  • 线程池内的线程不支持线程的挂起、取消等操作,如想要取消线程里的任务,.NET 支持一种协作式方式取消;
  • 线程内的任务没有返回值,也不知道何时执行完成;
  • 不支持设置线程的优先级,还包括其他类似需要对线程有更多的控制的需求都不支持;

27.2 执行简单的计算限制操作

要将一个异步的计算限制操作放到线程池的队列中,通常可以调用 ThreadPool 类定义的以下方法之一:

1
2
static bool QueueUserWorkItem(WaitCallback callBack);
static bool QueueUserWorkItem(WaitCallback callBack, Object state);

27.3 执行上下文

每个线程都关联了一个执行上下文数据结构。执行上下文(execution context)包括的东西有安全设置宿主设置以及逻辑调用上下文数据
每当一个线程(初始线程)使用另一个线程(辅助线程)执行任务时,前者的执行上下文应该流向(复制到)辅助线程。
默认情况下,CLR 自动造成初始线程的执行上下文“流向”任何辅助线程。这造成将上下文信息传给辅助线程,但这会对性能造成一定影响。

27.4 协作式取消和超时

取消操作首先要创建一个 System.Threading.CancellationTokenSource 对象,这个对象包含了和管理取消有关的所有状态。可从它的 Token 属性获得一个或多个CancellationToken(一个值类型)实例,并传给你的操作,使操作可以取消。

要执行一个不允许被取消的操作,可向该操作传递通过调用CancellationToken的静态None属性而返回的CancellationToken

1
2
3
4
5
6
7
8
ThreadPool.QueueUserWorkItem(o => Count(CancellationToken.None, 10));
````

可调用 CancellationTokenSource 的 Register 方法登记一个或多个在取消一个 CancellationTokenSource 时调用的方法。要向方法传递一个 Action<Object> 委托
```c#
public CancellationTokenRegistration Register(Action callback)

public CancellationTokenRegistration Register(Action callback, bool useSynchronizationContext)

27.5 任务

Microsoft 引入了 任务的概念。我们通过 System.Threading.Tasks 命名空间中的类型来使用任务。任务可以让你知道操作在什么时候完成,也没有机制在操作完成时获得返回值。

1
2
3
ThreadPool.QueueUserWorkItem(ComputeBoundOp, 5);    // 调用 QueueUserWorkItem
new Task(ComputeBoundOp, 5).Start(); // 用 Task 来做相同的事情
Task.Run(() => ComputeBoundOp(5)); // 另一个等价的写法

27.5.1 等待任务完成并获取结果

1
2
3
4
5
6
7
8
9
10
11
// 创建一个 Task(现在还没有开始运行)
Task<Int32> t = new Task<Int32>(n => Sum((Int32)n), 1000000000);

// 可以后再启动任务
t.Start();

// 可选择显式等待任务完成
t.Wait(); // 注意:还有一些重载的版本能接受 timeout/CancellationToken 值

// 可获得结果(Result 属性内部会调用 Wait)
Console.WriteLine("The Sum is: " + t.Result); // 一个 Int32 值

除了等待单个任务,Task 类还提供了两个静态方法,其中,Task 的静态 WaitAny 方法会阻塞调用线程,直到数组中的任何 Task 对象完成。方法返回 Int32 数组索引值,指明完成的是哪个 Task 对象。

Task 类还有一个静态 WaitAll 方法,它阻塞调用线程,直到数组中的所有 Task 对象完成。如果所有 Task 对象都完成,WaitAll 方法返回true。发生超时则返回 false。如果 WaitAll 通过一个 CancellationToken取消,会抛出一个 OperationCanceledException

如果计算限制的任务抛出未处理的异常,异常会被“吞噬”并存储到一个集合中,而线程池线程可以返回到线程池中。调用 Wait 方法或者 Result 属性时,这些成员会抛出一个 System.AggregateException 对象。
AggregateException 类型封装了异常对象的一个集合

27.5.2 取消任务

可用一个 CancellationTokenSource 取消 Task。首先必须修订前面的 Sun 方法,让它接受一个 CancellationToken

1
2
3
4
5
6
7
8
9
10
11
12
private static Int32 Sum(CancellationToken ct, Int32 n) {
Int32 sum = 0;
for (; n > 0; n--) {

// 在取消标志引用的 CancellationTokenSource 上调用 Cancel,
// 下面这行代码就会抛出 OperationCanceledException
ct.ThrowIfCancellationRequested();

checked { sum += n; } // 如果 n 太大,会抛出 System.OverflowException
}
return sum;
}

现在像下面这样创建 CancellationTokenSourceTask 对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
CancellationTokenSource cts = new CancellationTokenSource();
Task<Int32> t = Task.Run(() => Sum(cts.Token, 1000000000), cts.Token);

// 在之后的某个时间,取消 CancellationTokenSource 以取消 Task
cts.Cancel(); // 这是异步请求,Task 可能已经完成了

try {
// 如果任务已取消,Request 会抛出一个 AggregateException
Console.WriteLine("The sum is: " + t.Result); // 一个 Int32 值
}
catch (AggregateException x) {
// 将任何 OperationCanceledException 对象都视为已处理。
// 其他任何异常都造成抛出一个新的 AggregateException,
// 其中只包含未处理的异常
x.Handle(e => e is OperationCanceledException);
// 所有异常都处理好之后,执行下面这一行
Console.WriteLine("Sum was canceled");
}

27.5.3 任务完成时自动启动新任务

1
2
3
4
5
// 创建并启动一个 Task,继续另一个任务
Task<Int32> t = Task.Run(() => Sum(CancellationToken.None, 1000));

// ContinueWith 返回一个 Task,但一般都不需要再使用该对象(下例的 cwt)
Task cwt = t.ContinueWith(task => Concole.WriteLine("The sum is: " + task.Result));

Task 对象内部包含了 ContinueWith 任务的一个集合。所以,实际可以用一个 Task 对象来多次调用 ContinueWith。任务完成时,所有ContinueWith 任务都会进入线程池的队列中。

此外,可在调用 ContinueWith 时传递对一组 TaskContinuationOptions 枚举值进行按位 OR 运算

27.5.4 任务可以启动子任务

最后,任务支持父/子关系,如以下代码所示:

1
2
3
4
5
6
7
8
9
10
11
Task<Int32[]> parent = new Task<int[]>(() => {
var results = new Int32[3]; // 创建一个数组来存储结果

// 这个任务创建并启动 3 个子任务
new Task(() => results[0] = Sum(10000), TaskCreationOptions.AttachedToParent).Start();
new Task(() => results[1] = Sum(20000), TaskCreationOptions.AttachedToParent).Start();
new Task(() => results[2] = Sum(30000), TaskCreationOptions.AttachedToParent).Start();

// 返回对数组的引用(即使数组元素可能还没有初始化)
return results;
});

// 付任务及其子任务运行完成后,用一个延续任务显示结果
var cwt = parent.ContinueWith(parentTask => Array.ForEach(parentTask.Result, Console.WriteLine));

// 启动父任务,便于它启动它的子任务
parent.Start();
TaskCreationOptions.AttachedToParent 标志将一个 Task 和创建它的 Task 关联,结果是除非所有子任务(以及子任务的子任务)结束运行,否则创建任务(父任务)不认为已经结束。

27.5.5 任务内部揭秘

每个 Task 对象都有一组字段,这些字段构成了任务的状态。其中包括:

  • 一个 Int32 ID、代表Task 执行状态的一个Int32
  • 对父任务的引用
  • Task创建时指定的 TaskScheduler 的引用
  • 对回调方法的引用
  • 对要传给回调方法的对象的引用
  • ExecutionContext 的引用
  • ManualResetEventSlim 对象的引用。
    另外,每个 Task 对象都有对根据需要创建的补充状态的引用。补充状态包含
  • 一个 CancellationToken
  • 一个 ContinueWithTask 对象集合
  • 为抛出未处理异常的子任务而准备的一个 Task 对象集合等。

任务 ID 从 1 开始,每分配一个 ID 都递增 1。

27.5.6 任务工厂

有时需要创建一组共享相同配置的 Task 对象。为避免机械地将相同的参数传给每个 Task 的构造器,可创建一个任务工厂来封装通用的配置。

27.5.7 任务调度器(TaskScheduler)

TaskScheduler 对象负责执行被调度的任务

27.6 Parallel 的静态 ForForEachInvoke方法

使用 Parallel 类的 For 方法,用多个线程池线程辅助完成工作:
图 27-1 展示了构成作为线程池一部分的工作者线程的各种数据结构。ThreadPool.QueueUserWorkItem 方法和 Timer 类总是将工作项放到全局队列中。工作者线程采用一个先入先出(first-in-first-out,FIFO)算法将工作项从这个队列中取出,并处理它们。由于多个工作者线程可能同时从全局队列中拿走工作项,所以所有工作者线程都竞争一个线程同步锁,以保证两个或多个线程不会获取同一个工作项。这个线程同步锁在某些应用程序中可能成为瓶颈,对伸缩性和性能造成某种程度的限制。

1
2
// 线程池的线程并行处理工作
Parallel.For(0, 1000, i => DoWork(i));
1
2
// 线程池的线程并行处理工作
Parallel.ForEach(collection, item => DoWork(item));

最后,如果要执行多个方法,那么既可像下面这样并行执行,如下所示:

1
2
3
4
5
// 线程池的线程并行执行方法
Parallel.Invoke(
() => Method1(),
() => Method2(),
() => Method3());

调用 Parallel 的方法时有一个很重要的前提条件:工作项必须能并行执行!

ParallelForForEachInvoke 方法都提供了接受一个 ParallelOptions 对象的重载版本。这个对象的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
public class ParallelOptions {
public ParallelOptions();

// 允许取消操作
public CancellationToken CancellationToken { get; set; } // 默认为 CancellationToken.None

// 允许指定可以并发操作的最大工作项数目
public Int32 MaxDegreeOfParallelism { get; set; } // 默认为 -1 (可用 CPU 数)

// 允许指定要使用那个 TaskScheduler
public TaskSchedulerTaskScheduler { get; set; } // 默认为 TakScheduler.Default
}

27.9 线程池如何管理线程

27.9.2 如何管理工作者线程

QueueUserWorkItem 方法和 Timer 类总是将工作项放到全局队列中。工作者线程采用一个先入先出算法将工作项从这个队列中取出,并处理它们。由于多个工作者线程可能同时从全局队列中拿走工作项,所以所有工作者线程都竞争一个线程同步锁,以保证两个或多个线程不会获取同一个工作项。