数据流块基础[六]

·

5 min read

1. 简介

TPL 数据流(dataflow)库的功能很强大,可用来创建网格(mesh)和管道(pipleline), 并通过它们以异步方式发送数据。

主要命名空间: System.Threading.Tasks.Dataflow

2. 链接数据流块

创建网格时,需要把数据流块互相连接起来。

public static void LinkBlockRun()
{
    System.Console.WriteLine("Building Block link.");
    TransformBlock<int, int> multiplyBlock = new TransformBlock<int, int>(item =>
    {
        System.Console.WriteLine("first block.");
        Thread.Sleep(500);
        return item * 2;
    });
    var subtractBlock = new TransformBlock<int, int>(item =>
    {
        System.Console.WriteLine("last block.");
        Thread.Sleep(500);
        return item - 2;
    });
    var options = new DataflowLinkOptions
    {
        PropagateCompletion = true
    };
    multiplyBlock.LinkTo(subtractBlock, options);

    System.Console.WriteLine("Builded Block link.");

    var task = Task.Run(async () =>
    {
        System.Console.WriteLine("Posting");

        for (int i = 0; i < 3; i++)
        {
            multiplyBlock.Post(i);
        }

        System.Console.WriteLine("Posted");

        // 第一个块的完成情况自动传递给第二个块。 
        // Complete 后,再进行 Post 是无效的
        multiplyBlock.Complete();

        await multiplyBlock.Completion;
        // 链接使用完了
        System.Console.WriteLine("Block link Ended.");
    });

    task.Wait();
}

输出为:

Building Block link.
Builded Block link.
Posting
Posted
first block.
first block.
last block.
first block.
last block.
last block.
Block link Ended.

3. 传递出错信息

public static void BlockErrorRun()
{
    Task.Run(async () =>
    {
        try
        {
            //单个块异常类型
            var block = new TransformBlock<int, int>(item =>
              {
                  if (item == 1)
                      throw new InvalidOperationException("Blech.");
                  return item * 2;
              });
            block.Post(1);
            await block.Completion;

        }
        catch (InvalidOperationException ex)
        {
            System.Console.WriteLine(ex.GetType().Name);
        }

        try
        {
            //被连接的块异常类型
            var multiplyBlock = new TransformBlock<int, int>(item =>
             {
                 if (item == 1)
                     throw new InvalidOperationException("Blech.");
                 return item * 2;
             });
            var subtractBlock = new TransformBlock<int, int>(item => item - 2);
            multiplyBlock.LinkTo(subtractBlock, new DataflowLinkOptions { PropagateCompletion = true });
            multiplyBlock.Post(1);
            await subtractBlock.Completion;
        }
        catch (AggregateException ex)
        {
            System.Console.WriteLine(ex.GetType().Name);
        }

    }).Wait();
}

输出为:

InvalidOperationException
AggregateException
  • 对于最简单的情况,最好是把错误传递下去,等到最后再作一次性处理。

  • 对于更复杂的网格,在数据流完成后需要检查每一个数据流块。

4. 断开链接

public static void BlockDisposeRun()
{
    var multiplyBlock = new TransformBlock<int, int>(item =>
    {
        System.Console.WriteLine("first block.");
        Thread.Sleep(500);
        return item * 2;
    });
    var subtractBlock = new TransformBlock<int, int>(item =>
    {
        System.Console.WriteLine("last block.");
        Thread.Sleep(500);
        return item - 2;
    });

    IDisposable link = multiplyBlock.LinkTo(subtractBlock);
    multiplyBlock.Post(1);
    multiplyBlock.Post(2);
    // 断开数据流块的链接。
    // 前面的代码中,数据可能已经通过链接传递过去,也可能还没有。 
    // 在实际应用中,考虑使用代码块,而不是调用 Dispose。 
    link.Dispose();
    Thread.Sleep(1200);
}

输出为:

first block.
first block.

5. 限制流量

用数据流块 的 BoundedCapacity 属性,来限制目标块的流量(throttling)。 BoundedCapacity 的默认设置是 DataflowBlockOptions.Unbounded

解决的问题:

  • 防止数据的数据太多太快,导致第一个目标块在还来不及处理数据时就得对所有数据进行了缓冲
public static void BlockBoundedCapacityRun()
{
    var sourceBlock = new BufferBlock<int>();
    var options = new DataflowBlockOptions
    {
        BoundedCapacity = 10
        //BoundedCapacity = DataflowBlockOptions.Unbounded
    };
    var targetBlockA = new BufferBlock<int>(options);
    var targetBlockB = new BufferBlock<int>(options);
    sourceBlock.LinkTo(targetBlockA);
    sourceBlock.LinkTo(targetBlockB);

    for (int i = 0; i < 31; i++)
    {
        System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} Post:{i % 10}");
        sourceBlock.Post(i % 10);
    }
    //向水管中注入31个水滴
    //由于分支的限流, targetBlockA 和 targetBlockB 各得到了10各水滴
    var task = Task.Run(() =>
    {
        int i = 0;

        System.Console.WriteLine("先处理 targetBlockA 的水滴,此处循环接收会将水滴接干,但是接不到存在 targetBlockB 中的水滴");
        do
        {
            IList<int> res;
            if (targetBlockA.TryReceiveAll(out res))
            {
                i += res.Count;
                System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} RevcA:{string.Join(",", res)} {i}");
            }
            else
            {
                break;
            }
            Thread.Sleep(100);
        } while (true);

        i = 0;

        System.Console.WriteLine("处理 targetBlockB 的水滴,只剩下缓冲的水滴");
        do
        {
            IList<int> res;
            if (targetBlockB.TryReceiveAll(out res))
            {
                i += res.Count;
                System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} RevcB:{string.Join(",", res)} {i}");
            }
            else
            {
                break;
            }
            Thread.Sleep(100);
        } while (true);
    });

    task.Wait();
}

输出为:

40:28.026 Post:0
40:28.038 Post:1
40:28.038 Post:2
40:28.038 Post:3
40:28.038 Post:4
40:28.038 Post:5
40:28.038 Post:6
40:28.038 Post:7
40:28.038 Post:8
40:28.038 Post:9
40:28.038 Post:0
40:28.038 Post:1
40:28.038 Post:2
40:28.038 Post:3
40:28.038 Post:4
40:28.038 Post:5
40:28.038 Post:6
40:28.038 Post:7
40:28.038 Post:8
40:28.038 Post:9
40:28.038 Post:0
40:28.038 Post:1
40:28.038 Post:2
40:28.038 Post:3
40:28.038 Post:4
40:28.038 Post:5
40:28.038 Post:6
40:28.038 Post:7
40:28.038 Post:8
40:28.038 Post:9
40:28.038 Post:0
先处理 targetBlockA 的水滴,此处循环接收会将水滴接干,但是接不到存在 targetBlockB 中的水滴
40:28.043 RevcA:0,1,2,3,4,5,6,7,8,9 10
40:28.149 RevcA:0,1,2,3,4,5,6,7,8,9 20
40:28.249 RevcA:0 21
处理 targetBlockB 的水滴,只剩下缓冲的水滴
40:28.350 RevcB:0,1,2,3,4,5,6,7,8,9 10

限流例子: 在用 I/O 操作的数据填充数据流网格时,可以设置数据流块的 BoundedCapacity 属性。这样,在网格来不及处理数据时,就不会读取过多的 I/O 数据,网格也不会缓存所有数据。

6. 数据流块的并行处理

public static void BlockParalleRun()
{
    var multiplyBlock = new TransformBlock<int, int>(
    item =>
    {
        System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} first block.");
        Thread.Sleep(100);
        return item * 2;
    },
    new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    }
    );
    var subtractBlock = new TransformBlock<int, int>(item =>
    {
        System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} last block.");
        Thread.Sleep(100);
        return item - 2;
    });
    multiplyBlock.LinkTo(subtractBlock, new DataflowLinkOptions { PropagateCompletion = true });

    var task = Task.Run(async () =>
    {
        for (int i = 0; i < 7; i++)
        {
            multiplyBlock.Post(i);
        }

        multiplyBlock.Complete();
        await multiplyBlock.Completion;

        var tk = Task.Run(() =>
        {
            IList<int> recvResList;
            //此处延时为了TryReceiveAll获取所有数据,防止 subtractBlock 还有数据未接收
            Thread.Sleep(1500);
            if (subtractBlock.TryReceiveAll(out recvResList))
            {
                System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} Revc {string.Join(",", recvResList)}.");
            }
            else
            {
                System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} Revc null.");
            }
        });
        await tk;
        // multiplyBlock 已经调用完成,subtractBlock 的完成状态依赖于 Link 参数 PropagateCompletion
        await subtractBlock.Completion;
    });
    task.Wait();
}

输出为:

44:16.023 first block.
44:16.023 first block.
44:16.023 first block.
44:16.023 first block.
44:16.023 first block.
44:16.023 first block.
44:16.023 first block.
44:16.146 last block.
44:16.250 last block.
44:16.351 last block.
44:16.452 last block.
44:16.552 last block.
44:16.652 last block.
44:16.753 last block.
44:17.656 Revc -2,0,2,4,6,8,10.

真正的难点: 找出哪些数据流块需要并行处理

7. 创建自定义数据流块

public static void BlockCustomRun()
{
    var block = CreateMyCustomBlock();
    for (int i = 0; i < 7; i++)
    {
        block.Post(i);//target
    }
    var task = Task.Run(async () =>
    {
        var tk = Task.Run(() =>
        {
            List<int> recvResList = new List<int>();
            //此处延时为了TryReceiveAll获取所有数据,防止 subtractBlock 还有数据未接收

            while (true)
            {
                try
                {
                    var recvRes = block.Receive();//source
                    recvResList.Add(recvRes);
                }
                catch (System.InvalidOperationException)
                {
                    break;
                }
            }
            Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} Revc {string.Join(",", recvResList)}.");
        });
        block.Complete();//target
        await block.Completion;//source
        await tk;
    });
    task.Wait();
}

static IPropagatorBlock<int, int> CreateMyCustomBlock()
{
    var multiplyBlock = new TransformBlock<int, int>(item =>
    {
        int res = item * 2;
        System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} first block {res}.");
        Thread.Sleep(100);
        return res;
    });
    var addBlock = new TransformBlock<int, int>(item =>
    {
        int res = item + 2;
        System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} next block {res}.");
        Thread.Sleep(100);
        return res;
    });
    var divideBlock = new TransformBlock<int, int>(item =>
    {
        int res = item / 2;
        System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} last block {res}.");
        Thread.Sleep(100);
        return res;
    });
    var flowCompletion = new DataflowLinkOptions { PropagateCompletion = true };
    multiplyBlock.LinkTo(addBlock, flowCompletion);
    addBlock.LinkTo(divideBlock, flowCompletion);
    return DataflowBlock.Encapsulate(multiplyBlock, divideBlock);
}

输出为:

45:00.528 first block 0.
45:00.639 first block 2.
45:00.641 next block 2.
45:00.739 first block 4.
45:00.746 next block 4.
45:00.747 last block 1.
45:00.844 first block 6.
45:00.847 next block 6.
45:00.848 last block 2.
45:00.947 first block 8.
45:00.951 next block 8.
45:00.951 last block 3.
45:01.049 first block 10.
45:01.055 next block 10.
45:01.056 last block 4.
45:01.152 first block 12.
45:01.159 next block 12.
45:01.160 last block 5.
45:01.264 next block 14.
45:01.265 last block 6.
45:01.365 last block 7.
45:01.472 Revc 1,2,3,4,5,6,7.

DataflowBlock.Encapsulate 只会封装只有一个输入块和一个输出块的网格。如果一个可重用的网格带有多个输入或输出,就应该把它封装进一个自定义对象