并发编程相关概念(一)
1. 概念介绍
现在我们先说明几个概念:
并发
就是同时做多件事情,比如:
程序写入数据库的同时响应用户输入
服务器处理第一个请求的同时响应第二个请求。
多线程
是并发的一种形式,它采用多个线程来执行程序,
- 注意: 多线程是并发的一种形式,但并不是唯一的形式。
多线程是比较基础的技术,我们需要理解,知晓原理,但是真正使用时最好使用对多线程进行封装的类,这样能更好的节省资源,减少问题的产生。
并行处理
把正在执行的大量的任务分割成小块,分配给多个同时运行的线程。
这样会使处理器的利用效率最大化,使用时需要注意因为控制不好的化会在短时间内极大的降低服务器的处理性能
异步编程
并发的一种形式,它采用未来模式(future)或回调机制(callback),以避免产生不必要的线程。
- 在 .NET 中,新版中使用
Task
和Task<TResult>
类型实现未来模式,在老式异步编程 API 中,采用回调或事件(event)
- 在 .NET 中,新版中使用
也是关键字
async
和await
解决的问题
响应式编程
- 一种声明式的编程模式,程序在该模式中对事件做出响应。
2. 异步编程
2.1. async运行过程
async
方法在开始时以同步方式执行。在async
方法内部,运行到await
关键字会执行一个异步等待它首先检查操作是否已经完成,如果完成了,就继续运行 (同步方式)。
否则,它会暂停
async
方法,并返回,留下一个未完成的task
。一段时间后,
await
住的操作完成,async
方法就恢复运行(不一定是原来的线程,具体看同步上下文的配置)。
2.2. async运行中同步上下文简介
最常见的情况是,用
await
语句等待一个任务完成,这时会捕捉同步上下文。如果当前
SynchronizationContext
不为空,这个上下文就是当前SynchronizationContext
。如果当前
SynchronizationContext
为空,则这个上下文为当前 TaskScheduler。
该方法会在这个上下文中继续运行。
注意: 最好的做法是,在核心库代码中一直使用
ConfigureAwait
。在外围的用户界面代码中,只在需要时才恢复上下文。
2.3. 创建Task实例
有两种基本的方法可以创建 Task 实例。
有些任务表示 CPU 需要实际执行的指令,创建这种计算类的任务时,使用
Task.Run
- 如UI线程触发的事件,如读取目录信息,配合
await
关键字,将任务交给线程池完成,解决读取时窗体卡顿情况
- 如UI线程触发的事件,如读取目录信息,配合
如需要按照特定的计划运行,则用
TaskFactory.StartNew
其他的任务表示一个通知( notification 操作会在回调中完成再通知回来),创建这种基于事件的任务时,使用
TaskCompletionSource<T>
。- 大部分 I/O 型任务采用
TaskCompletionSource<T>
。
- 大部分 I/O 型任务采用
2.4. 捕获异步异常类型
- 捕获
await
抛出的异常,我们更想要
try
{
await Task.Run(() => throw new NotSupportedException());
}
catch (Exception ex)
{
//print: NotSupportedException
Console.WriteLine(ex.GetType().Name);
}
Wait()
方法,异常类型被包装
try
{
Task task = Task.Run(() => throw new NotSupportedException());
task.Wait();
}
catch (Exception ex)
{
//print: AggregateException
Console.WriteLine(ex.GetType().Name);
}
3. 并行编程
并行编程可临时提高 CPU 利用率,以提高吞吐量。
若客户端系统中的 CPU 经常处于空闲状态,这个方法就非常有用
通常并不适合服务器系统,将降低本身的并行处理能力,并且不会有实际的好处。
反面教材: 之前在工作中出现一起事故,实施好的项目,3个月后每天凌晨出现大量设备掉线的情况。
由于数据超时时间时3个月,而且发现出现问题的日志和数据清理发生时间有关联关系
排查代码发现文件清理器,清理数据使用的Parallel类,并行删除文件,而且没有对并发数限制
文件清理器运行时,导致服务器性能急剧下降,造成处理设备消息延迟,心跳超时导致掉线
重构了文件清理器代码,解决了这个问题
数据并行(data parallelism):
- 是指有大量的数据需要处理,并且每一块数据的处理过程基本上是彼此独立的。
任务并行(task parallelim):
- 是指需要执行大量任务,并且每个任务的执行过程基本上是彼此独立的。
3.1. Parallel
不保证顺序执行。
//ForEach
int[] arr = new int[] { 1, 2, 3, 4 };
Parallel.ForEach(arr, item => Console.Write(item));
System.Console.WriteLine();
//PLINQ
var sum = arr.AsParallel().Select(item => item * 2).Sum();
System.Console.WriteLine($"sum:{sum}.");
//Invoke
int num = 10;
Parallel.Invoke(
() => num += 2,
() => num -= 2,
() => num -= num,
() => num += 2
);
System.Console.WriteLine($"num:{num}.");
/*
print:
1243
sum:20.
num:0.
*/
3.2. 异常处理
系统会把这些异常封装在 AggregateException 类里,在程序中抛给代码。 这一特点对所有方法都是一样的,包括 Parallel.ForEach、Paralle.lInvoke、Task.Wait 等。 AggregateException 类型有几个实用的 Flatten 和 Handle 方法,用来简化错误处理的代码:
try
{
Parallel.Invoke(() => { throw new Exception(); },
() => { throw new Exception(); });
}
catch (AggregateException ex)
{
ex.Handle(exception =>
{
Console.WriteLine(exception);
return true; //“已经处理”
});
}
3.3. 注意事项
在编写任务并行程序时,要格外留意下闭包(closure)捕获的变量。 记住闭包捕获的是引用(不是值),因此可以在结束时以不明显地方式地分享这些变量。
任务不要特别短
- 没必要用,直接同步执行
也不要特别长
- 应采用更可控的方式,削峰设计
4. 响应式编程
如果事件中带有参数,那么最好 采用响应式编程,而不是常规的事件处理程序。
//System.Runtime.dll namespace:System 中定义了这些接口
interface IObserver<in T>
{
void OnNext(T item);
void OnCompleted();
void OnError(Exception error);
}
interface IObservable<out T>
{
IDisposable Subscribe(IObserver<T> observer);
}
Rx(Rx-Main)中定义了响应式编程的封装,后面会有介绍。