任务调度[十一]

·

3 min read

1. 调度到线程池

Task task = Task.Run(() =>
{
    Thread.Sleep(TimeSpan.FromSeconds(2));
});

Task.Run 也能正常地返回结果,能使用异步 Lambda 表达式。下面代码中 Task.Run 返回的 task 会在 2 秒后完成,并返回结果 13:

Task<int> task = Task.Run(async () =>
{ 
    await Task.Delay(TimeSpan.FromSeconds(2));
    return 13;
});

Task.Run 返回一个 Task (或 Task<T>)对象,该对象可以被异步或响应式代码正常使用。

注意: 但不要在 ASP.NET 中使用 Task.Run,除非你有绝对的把握。在 ASP.NET 中, 处理请求的代码本来就是在 ASP.NET 线程池线程中运行的,强行把它放到另一个线程池线程通常会适得其反。
但UI程序,使用Task.Run可以执行耗时操作,有效的防止页面卡住问题。

在进行动态并行开发时, 一定要用 Task.Factory.StartNew 来代替 Task.Run

  • 因为根据默认配置, Task.Run 返回的 Task 对象适合被异步调用(即被异步代码或响应式代码使用)。

  • Task.Run 也不支持动态并行代码中普遍使用的高级概念,例如 父/子任务。

2. 任务调度器

需要让多个代码段按照指定的方式运行。例如

  • 让所有代码段在 UI 线程中运行

  • 只允许特定数量的代码段同时运行。

2.1. Default 调度器

TaskScheduler.Default,它的作用是让任务在线程池中排队, Task.Run、并行、数据流的代码用的都是 TaskScheduler.Default

2.2. 捕获当前同步上下文 调度器

可以捕获一个特定的上下文,用 TaskScheduler.FromCurrentSynchronizationContext 调度任务,让它回到该上下文:

TaskScheduler scheduler = TaskScheduler.FromCurrentSynchronizationContext();
这条语句创建了一个捕获当前 同步上下文TaskScheduler 对象,并将代码调度到这个上下文中

  • SynchronizationContext 类表示一个通用的调度上下文。

  • 大多数 UI 框架有一个表示 UI 线程的 同步上下文

  • ASP.NET 有一个表示 HTTP 请求的 同步上下文

建议:
在 UI 线程上执行代码时,永远不要使用针对特定平台的类型。\

  • WPF、IOS、Android 都有 Dispatcher

  • Windows 应用商店平台使用 CoreDispatcher

  • WinForms 有 ISynchronizeInvoke 接口(即 Control.Invoke

不要在新写的代码中使用这些类型,就当它们不存在吧。使用这些类型会使代码无谓地绑定在某个特定平台上。

同步上下文 是通用的、基于上述类型的抽象类。

2.3. ConcurrentExclusiveSchedulerPair 调度器

它实际上是两个互相关联的调度器。 只要 ExclusiveScheduler 上没有运行任务, ConcurrentScheduler 就可以让多个任务同时执行。只有当 ConcurrentScheduler 没有执行任务时, ExclusiveScheduler 才可以执行任务,并且每次只允许运行一个任务:

public static void ConcurrentExclusiveSchedulerPairRun()
{
    var schedulerPair = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default, maxConcurrencyLevel: 2);
    //由于并行被限流,所以ConcurrentScheduler 会两个两个输出,然后执行完这两个开启的8个串行任务
    TaskScheduler concurrent = schedulerPair.ConcurrentScheduler;
    TaskScheduler exclusive = schedulerPair.ExclusiveScheduler;

    //Default 由于没有限制,所以第一层会先输出,全部随机
    // TaskScheduler concurrent = TaskScheduler.Default;
    // TaskScheduler exclusive =TaskScheduler.Default;

    var list = new List<List<int>>();
    for (int i = 0; i < 4; i++)
    {
        var actionList = new List<int>();
        list.Add(actionList);
        for (int j = 0; j < 4; j++)
        {
            actionList.Add(i * 10 + j);
        }
    }

    var tasks = list.Select(u => Task.Factory.StartNew(state =>
    {
        System.Console.WriteLine($"ConcurrentScheduler");
        ((List<int>)state).Select(i => Task.Factory.StartNew(state2 => System.Console.WriteLine($"ExclusiveScheduler:{state2}"), i, CancellationToken.None, TaskCreationOptions.None, exclusive)).ToArray();
    }, u, CancellationToken.None, TaskCreationOptions.None, concurrent));


    Task.WaitAll(tasks.ToArray());
}

输出:

ConcurrentScheduler
ConcurrentScheduler
ExclusiveScheduler:0
ExclusiveScheduler:1
ExclusiveScheduler:2
ExclusiveScheduler:3
ExclusiveScheduler:10
ExclusiveScheduler:11
ExclusiveScheduler:12
ExclusiveScheduler:13
ConcurrentScheduler
ConcurrentScheduler
ExclusiveScheduler:20
ExclusiveScheduler:21
ExclusiveScheduler:22
ExclusiveScheduler:23
ExclusiveScheduler:30
ExclusiveScheduler:31
ExclusiveScheduler:32
ExclusiveScheduler:33

ConcurrentExclusiveSchedulerPair 的常见用法是

  • ExclusiveScheduler 来确保每次只运行一个任务。

  • ExclusiveScheduler 执行的代码会在线程池中运行,但是使用了同一个 ExclusiveScheduler 对象的其他代码不能同时运行。

ConcurrentExclusiveSchedulerPair 的另一个用法是作为限流调度器。

  • 创建的 ConcurrentExclusiveSchedulerPair 对象可以限制自身的并发数量。

  • 这时通常不使用 ExclusiveScheduler

var schedulerPair = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default,maxConcurrencyLevel: 8);
TaskScheduler scheduler = schedulerPair.ConcurrentScheduler;

3. 调度并行代码

public static void RotateMatricesRun()
{
    List<List<Action<float>>> actionLists = new List<List<Action<float>>>();
    for (int i = 0; i < 15; i++)
    {
        var actionList = new List<Action<float>>();
        actionLists.Add(actionList);
        for (int j = 0; j < 15; j++)
        {
            actionList.Add(new Action<float>(degrees =>
            {
                Thread.Sleep(200);
                System.Console.WriteLine("degrees:" + degrees + " " + DateTime.Now.ToString("HHmmss.fff"));
            }));
        }
    }
    RotateMatrices(actionLists, 10);
    //虽然两个并行嵌套但是由于调度器的设置,导致任务是8个8个执行的,结果是8个后200ms再8个
}

static void RotateMatrices(IEnumerable<IEnumerable<Action<float>>> collections, float degrees)
{
    var schedulerPair = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default, maxConcurrencyLevel: 8);
    TaskScheduler scheduler = schedulerPair.ConcurrentScheduler;
    ParallelOptions options = new ParallelOptions
    {
        TaskScheduler = scheduler
    };
    Parallel.ForEach(collections, options,
        matrices =>
        {
            Parallel.ForEach(matrices,
                options,
                matrix => matrix.Invoke(degrees)
            );
            System.Console.WriteLine($"============");
        });
}

输出:

degrees:10 190424.120
...  118个 ...
degrees:10 190426.963
============
============
============
============
============
============
============
============
degrees:10 190427.167
...  6个 ...
degrees:10 190427.167
... 5个 ...
degrees:10 190428.589
...  6个 ...
degrees:10 190428.589
degrees:10 190428.791
degrees:10 190428.791
degrees:10 190428.791
degrees:10 190428.791
degrees:10 190428.791
degrees:10 190428.791
============
degrees:10 190428.791
degrees:10 190428.791
degrees:10 190428.994
...  6个 ...
degrees:10 190428.994
============
degrees:10 190429.194
...  6个 ...
degrees:10 190429.194
============
degrees:10 190429.395
degrees:10 190429.395
degrees:10 190429.395
degrees:10 190429.395
degrees:10 190429.395
============
degrees:10 190429.395
degrees:10 190429.395
degrees:10 190429.395
degrees:10 190429.598
degrees:10 190429.598
degrees:10 190429.598
degrees:10 190429.598
============
degrees:10 190429.598
degrees:10 190429.598
degrees:10 190429.598
degrees:10 190429.598
============
degrees:10 190429.800
============

4. 用调度器实现数据流的同步

Stopwatch sw = Stopwatch.StartNew();
// 模拟 UI同步上下文
AsyncContext.Run(() =>
{
    var options = new ExecutionDataflowBlockOptions
    {
        //使用次调度器,则代码会放到创建线程的同步上下文上执行(若是当前同步上下文是UI Context 或 此例的AsyncContext)
        //运行和注释下行运行观察Creator和Executor线程Id的变化
        TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext(),
    };
    var multiplyBlock = new TransformBlock<int, int>(item => item * 2);
    System.Console.WriteLine($"Creator ThreadId: {Thread.CurrentThread.ManagedThreadId}.");
    var displayBlock = new ActionBlock<int>(result =>
    {
        // ListBox.Items.Add(result)
        System.Console.WriteLine($"Executor ThreadId: {Thread.CurrentThread.ManagedThreadId} res:{result}.");
    }, options);
    multiplyBlock.LinkTo(displayBlock);

    for (int i = 0; i < 5; i++)
    {
        multiplyBlock.Post(i);
        System.Console.WriteLine($"Post {i}");
    }
    multiplyBlock.Completion.Wait(2000);
});
System.Console.WriteLine($"Cost {sw.ElapsedMilliseconds}ms.");

输出:

Creator ThreadId: 1.
Post 0
Post 1
Post 2
Post 3
Post 4
Executor ThreadId: 1 res:0.
Executor ThreadId: 1 res:2.
Executor ThreadId: 1 res:4.
Executor ThreadId: 1 res:6.
Executor ThreadId: 1 res:8.
Cost 2062ms.