C#多线程并发控制
1 Parallel.ForEach
在 C# 里,Parallel.ForEach
是 System.Threading.Tasks
命名空间下的一个方法,它能并行处理集合中的元素。与传统的 foreach
循环不同,Parallel.ForEach
会利用多个线程同时处理集合中的元素,以此提升性能,特别是在处理大型集合或者每个元素的处理操作较为耗时的情况下。
1.1 基本语法
Parallel.ForEach<TSource>(IEnumerable<TSource> source, Action<TSource> body);
source
:这是要处理的集合,它需要实现IEnumerable<TSource>
接口。body
:这是一个委托,针对集合中的每个元素都会执行此委托。
1.2 示例代码
using System;
using System.Collections.Generic;
using System.Threading.Tasks;class Program
{static void Main(){// 创建一个包含一些整数的列表List<int> numbers = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };// 使用 Parallel.ForEach 并行处理列表中的每个元素Parallel.ForEach(numbers, number =>{// 模拟一些耗时的操作System.Threading.Thread.Sleep(100);Console.WriteLine($"处理数字 {number},线程 ID: {System.Threading.Thread.CurrentThread.ManagedThreadId}");});Console.WriteLine("所有元素处理完毕。");}
}
1.3 代码解释
- 创建集合:创建了一个包含 10 个整数的列表
numbers
。 - 使用
Parallel.ForEach
:对numbers
列表里的每个元素并行执行指定的操作。 - 模拟耗时操作:借助
Thread.Sleep(100)
模拟每个元素处理时的耗时操作。 - 输出结果:输出正在处理的元素以及当前线程的 ID。
- 完成处理:所有元素处理完成后,输出 “所有元素处理完毕。”。
1.4 可选参数
Parallel.ForEach
还有一些可选参数,可用于对并行处理进行更细致的控制:
Parallel.ForEach<TSource>(IEnumerable<TSource> source, ParallelOptions parallelOptions, Action<TSource> body);
parallelOptions
:这是一个ParallelOptions
对象,能够用来设置最大并行度、取消标记等。
1.5 示例代码(使用 ParallelOptions
)
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;class Program
{static void Main(){ParallelOptions options = new ParallelOptions(){MaxDegreeOfParallelism = 3};await Parallel.ForEachAsync(Enumerable.Range(1, 10), options, async (i, c) =>{Console.WriteLine($"任务 {i} 开始执行...");await Task.Delay(2000, c);Console.WriteLine($"任务 {i} 执行完成!");});Console.WriteLine("所有任务执行完成!");}/*任务 2 开始执行...任务 1 开始执行...任务 3 开始执行...任务 1 执行完成!任务 3 执行完成!任务 2 执行完成!任务 4 开始执行...任务 5 开始执行...任务 6 开始执行...任务 6 执行完成!任务 5 执行完成!任务 4 执行完成!任务 7 开始执行...任务 8 开始执行...任务 9 开始执行...任务 9 执行完成!任务 10 开始执行...任务 8 执行完成!任务 7 执行完成!任务 10 执行完成!任务全部完成!*/
}
1.6 注意事项
- 线程安全:在
Parallel.ForEach
中访问共享资源时,要确保线程安全,可使用锁机制或者线程安全的集合。 - 性能考量:并非所有情况都适合使用
Parallel.ForEach
,在处理小型集合或者每个元素的处理操作非常快时,使用Parallel.ForEach
可能会因为线程创建和管理的开销而导致性能下降。
2 SemaphoreSlim
SemaphoreSlim
是 C# 中用于控制对有限资源访问的轻量级同步原语,它位于 System.Threading
命名空间下。下面将从基本概念、使用场景、常用方法和示例代码等方面详细介绍 SemaphoreSlim
。
2.1 基本概念
SemaphoreSlim
是信号量的一种轻量级实现,信号量本质上是一个计数器,用于限制同时访问某个资源或代码段的线程数量。当一个线程想要访问受信号量保护的资源时,它需要先请求信号量。如果信号量的计数器大于 0,计数器会减 1,线程可以继续访问资源;如果计数器为 0,线程会被阻塞,直到有其他线程释放信号量。
2.2 使用场景
- 限制并发访问:当系统中的某些资源是有限的,如数据库连接池、网络连接等,使用
SemaphoreSlim
可以限制同时访问这些资源的线程数量,避免资源耗尽。 - 任务调度:在并行编程中,可以使用信号量来控制同时执行的任务数量,确保系统不会因为并发任务过多而过载。
2.3 常用方法
-
构造函数
SemaphoreSlim(int initialCount)
:初始化SemaphoreSlim
实例,指定初始的信号量计数。SemaphoreSlim(int initialCount, int maxCount)
:初始化SemaphoreSlim
实例,指定初始的信号量计数和最大的信号量计数。
-
Wait
方法Wait()
:请求信号量,如果信号量的计数大于 0,则计数减 1 并继续执行;如果计数为 0,则线程会被阻塞,直到有其他线程释放信号量。Wait(int millisecondsTimeout)
:请求信号量,指定等待的最大时间(以毫秒为单位)。如果在指定时间内信号量可用,则计数减 1 并继续执行;否则返回false
。
-
WaitAsync
方法WaitAsync()
:异步请求信号量,返回一个Task
,该Task
在信号量可用时完成。WaitAsync(int millisecondsTimeout)
:异步请求信号量,指定等待的最大时间(以毫秒为单位)。返回一个Task<bool>
,如果在指定时间内信号量可用,则计数减 1 且Task
的结果为true
;否则为false
。
-
Release
方法Release()
:释放信号量,将信号量的计数加 1。如果有其他线程正在等待信号量,则其中一个线程会被唤醒。Release(int releaseCount)
:释放指定数量的信号量,将信号量的计数增加releaseCount
。
2.4 示例代码
以下是一个简单的示例,展示了如何使用 SemaphoreSlim
来限制同时访问某个资源的线程数量:
using System;
using System.Threading;
using System.Threading.Tasks;class Program
{// 创建一个 SemaphoreSlim 实例,初始计数为 2,最大计数为 2private static SemaphoreSlim semaphore = new SemaphoreSlim(2, 2);static async Task Main(){// 创建多个任务来模拟并发访问Task[] tasks = new Task[5];for (int i = 0; i < 5; i++){tasks[i] = Task.Run(() => AccessResource(i));}// 等待所有任务完成await Task.WhenAll(tasks);Console.WriteLine("所有任务完成。");}static async Task AccessResource(int taskId){Console.WriteLine($"任务 {taskId} 正在等待访问资源...");// 请求信号量await semaphore.WaitAsync();try{Console.WriteLine($"任务 {taskId} 已获得访问资源的权限。");// 模拟一些耗时操作await Task.Delay(1000);}finally{// 释放信号量semaphore.Release();Console.WriteLine($"任务 {taskId} 已释放资源。");}}
}
- 创建
SemaphoreSlim
实例:在Main
方法中,创建了一个SemaphoreSlim
实例,初始计数为 2,最大计数为 2,表示最多允许 2 个线程同时访问资源。 - 创建多个任务:使用
Task.Run
方法创建 5 个任务,模拟并发访问资源的情况。 - 请求信号量:在
AccessResource
方法中,使用WaitAsync
方法异步请求信号量。如果信号量可用,则继续执行;否则任务会被阻塞,直到有其他线程释放信号量。 - 释放信号量:在
try-finally
块中,使用Release
方法释放信号量,确保无论任务是否发生异常,信号量都会被释放。
2.5 注意事项
- 资源泄漏:确保在使用完信号量后及时释放,避免资源泄漏。通常使用
try-finally
块来保证信号量的释放。 - 异步操作:在异步代码中,优先使用
WaitAsync
方法,避免阻塞线程。 - 异常处理:在使用信号量时,要考虑异常情况,确保信号量的状态不会被破坏。
3 TPL Dataflow
TPL Dataflow
(Task Parallel Library Dataflow)是 .NET 框架中的一个库,它位于 System.Threading.Tasks.Dataflow
命名空间下,为构建基于消息传递的并行和异步数据流提供了高级抽象,让开发者能够轻松构建可扩展、高效且响应式的应用程序。下面从基本概念、核心组件、使用场景、示例代码等方面详细介绍 TPL Dataflow。
3.1 基本概念
TPL Dataflow
基于数据流编程模型,它将应用程序分解为一系列相互连接的处理块(Block),这些处理块通过消息传递的方式进行通信。每个处理块负责执行特定的任务,当一个处理块接收到消息时,它会对消息进行处理,并将处理结果传递给下一个处理块,以此类推,形成一个数据流。
3.2 核心组件
3.2.1 数据块(Dataflow Blocks)
- 转换块(TransformBlock):接收输入消息,对其进行转换,然后输出转换后的消息。例如,将一个整数转换为其平方值。
- 动作块(ActionBlock):接收输入消息,并执行指定的操作,但不产生输出。常用于执行一些副作用操作,如写入文件、记录日志等。
- 缓冲块(BufferBlock):简单地存储接收到的消息,并将其传递给连接的目标块。常用于解耦生产者和消费者。
- 批量块(BatchBlock):将接收到的消息收集成指定大小的批次,然后将整个批次作为一个消息传递给目标块。
- 合并块(JoinBlock):从多个源块接收消息,等待所有输入源都有消息到达后,将这些消息组合成一个元组传递给目标块。
3.2.2 数据块之间的连接
数据块之间可以通过 LinkTo
方法进行连接,形成数据流。连接可以设置不同的传播选项,如是否在源块完成时传播完成信号等。
3.2.3 消息传递
数据块之间通过发送和接收消息进行通信。消息可以是任何类型的对象,处理块接收到消息后会根据其功能进行相应的处理。
3.3 使用场景
- 并行处理:当需要对大量数据进行并行处理时,TPL Dataflow 可以将数据分割成多个消息,由多个处理块并行处理,提高处理效率。
- 异步 I/O 操作:在处理异步 I/O 操作时,TPL Dataflow 可以帮助管理数据流,确保数据在不同的 I/O 操作之间有序传递。
- 流水线处理:构建复杂的流水线处理系统,每个处理块负责一个特定的处理步骤,将数据依次传递给后续的处理块进行处理。
3.4 示例代码
以下是一个简单的示例,展示了如何使用 TPL Dataflow 构建一个简单的数据流:
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;class Program
{static async Task Main(){// 创建一个转换块,将输入的整数转换为其平方值var transformBlock = new TransformBlock<int, int>(num => num * num);// 创建一个动作块,用于打印转换后的结果var actionBlock = new ActionBlock<int>(result =>{Console.WriteLine($"结果: {result}");});// 将转换块连接到动作块transformBlock.LinkTo(actionBlock);// 向转换块发送一些数据for (int i = 1; i <= 5; i++){transformBlock.Post(i);}// 标记转换块完成transformBlock.Complete();// 等待动作块处理完所有消息await actionBlock.Completion;Console.WriteLine("所有消息处理完成。");}
}
3.5 代码解释
- 创建处理块:创建了一个
TransformBlock
用于将输入的整数转换为其平方值,以及一个ActionBlock
用于打印转换后的结果。 - 连接处理块:使用
LinkTo
方法将TransformBlock
连接到ActionBlock
,形成一个简单的数据流。 - 发送消息:使用
Post
方法向TransformBlock
发送一些整数数据。 - 标记完成:调用
Complete
方法标记TransformBlock
完成,表示不再有新的消息发送。 - 等待完成:使用
Completion
属性等待ActionBlock
处理完所有消息。
3.6 注意事项
- 资源管理:在使用完处理块后,要确保正确处理其完成状态,避免资源泄漏。
- 异常处理:处理块可能会抛出异常,需要使用
Completion
属性的Exception
来捕获和处理这些异常。 - 并发控制:某些处理块支持设置并发度,可根据实际需求进行调整,以避免过度并发导致性能下降。
4 TaskScheduler
TaskScheduler
是 .NET 中 System.Threading.Tasks
命名空间下的一个抽象类,它在任务并行库(TPL)里扮演着核心角色,主要负责管理和调度 Task
对象的执行。下面将从基本概念、内置任务调度器、使用场景、自定义任务调度器等方面详细介绍 TaskScheduler
。
4.1 基本概念
TaskScheduler
是任务调度的核心,它决定了 Task
何时、在哪个线程上执行。当你创建并启动一个 Task
时,Task
不会立刻执行,而是被提交给 TaskScheduler
进行调度。TaskScheduler
会根据自身的调度策略,将任务分配到合适的线程上执行。
4.2 内置任务调度器
.NET 提供了几个内置的任务调度器:
TaskScheduler.Default
- 这是默认的任务调度器,它基于线程池来调度任务。线程池中的线程是由系统管理的,会根据系统资源和负载情况动态调整线程数量。对于大多数情况,使用默认的任务调度器就能满足需求,因为它能高效地利用系统资源。
TaskScheduler.Current
- 表示当前正在执行的任务所使用的任务调度器。在嵌套任务或者异步方法中,使用
TaskScheduler.Current
可以确保子任务使用与父任务相同的调度器。
- 表示当前正在执行的任务所使用的任务调度器。在嵌套任务或者异步方法中,使用
TaskScheduler.FromCurrentSynchronizationContext()
- 这个调度器用于在当前的同步上下文中执行任务。通常在 UI 编程中使用,比如 Windows Forms 或 WPF 应用程序,它能确保任务在 UI 线程上执行,避免跨线程访问 UI 控件时出现异常。
4.3 使用场景
- 并行计算
- 在进行大规模的并行计算时,使用默认的任务调度器可以充分利用多核处理器的性能,将任务分配到不同的线程上并行执行,提高计算效率。
- UI 编程
- 在 Windows Forms、WPF 或 ASP.NET 等 UI 应用程序中,使用
TaskScheduler.FromCurrentSynchronizationContext()
可以确保任务在 UI 线程上执行,避免因跨线程访问 UI 控件而导致的异常。
- 在 Windows Forms、WPF 或 ASP.NET 等 UI 应用程序中,使用
- 自定义调度需求
- 当默认的任务调度器无法满足特定的调度需求时,比如需要对任务进行优先级排序、限制并发任务数量等,可以自定义任务调度器。
4.4 示例代码
4.4.1 使用默认任务调度器
using System;
using System.Threading.Tasks;class Program
{static void Main(){// 创建一个任务并使用默认任务调度器执行Task task = Task.Factory.StartNew(() =>{Console.WriteLine("任务在默认任务调度器上执行。");});// 等待任务完成task.Wait();Console.WriteLine("任务执行完成。");}
}
4.4.2 在 UI 线程上执行任务(以 Windows Forms 为例)
using System;
using System.Threading.Tasks;
using System.Windows.Forms;namespace WindowsFormsApp1
{public partial class Form1 : Form{public Form1(){InitializeComponent();}private async void button1_Click(object sender, EventArgs e){// 获取当前同步上下文的任务调度器TaskScheduler uiScheduler = TaskScheduler.FromCurrentSynchronizationContext();// 创建一个任务并在 UI 线程上执行await Task.Factory.StartNew(() =>{// 更新 UI 控件label1.Text = "任务在 UI 线程上执行。";}, CancellationToken.None, TaskCreationOptions.None, uiScheduler);}}
}
4.5 自定义任务调度器
如果内置的任务调度器无法满足需求,可以通过继承 TaskScheduler
类来创建自定义任务调度器。以下是一个简单的自定义任务调度器示例,它限制了并发任务的数量:
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
{// 最大并发任务数量private readonly int _maxDegreeOfParallelism;// 任务队列private readonly LinkedList<Task> _tasks = new LinkedList<Task>();private int _delegatesQueuedOrRunning = 0;public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism){if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism));_maxDegreeOfParallelism = maxDegreeOfParallelism;}protected override IEnumerable<Task> GetScheduledTasks(){bool lockTaken = false;try{Monitor.TryEnter(_tasks, ref lockTaken);if (lockTaken) return _tasks;else throw new NotSupportedException();}finally{if (lockTaken) Monitor.Exit(_tasks);}}protected override void QueueTask(Task task){lock (_tasks){_tasks.AddLast(task);if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism){++_delegatesQueuedOrRunning;NotifyThreadPoolOfPendingWork();}}}private void NotifyThreadPoolOfPendingWork(){ThreadPool.UnsafeQueueUserWorkItem(_ =>{while (true){Task item;lock (_tasks){if (_tasks.Count == 0){--_delegatesQueuedOrRunning;break;}item = _tasks.First.Value;_tasks.RemoveFirst();}TryExecuteTask(item);}}, null);}protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued){if (taskWasPreviouslyQueued) return false;return TryExecuteTask(task);}public override int MaximumConcurrencyLevel => _maxDegreeOfParallelism;
}
4.6 代码解释
LimitedConcurrencyLevelTaskScheduler
类:继承自TaskScheduler
,通过_maxDegreeOfParallelism
字段限制并发任务的数量。QueueTask
方法:将任务添加到任务队列中,并根据当前并发任务数量决定是否将任务提交给线程池执行。NotifyThreadPoolOfPendingWork
方法:将任务从队列中取出并提交给线程池执行。TryExecuteTaskInline
方法:尝试在当前线程上直接执行任务。
4.7 注意事项
- 资源管理:在使用自定义任务调度器时,要确保正确管理资源,避免出现资源泄漏或死锁等问题。
- 异常处理:任务在执行过程中可能会抛出异常,需要在合适的地方捕获和处理这些异常,避免影响整个应用程序的稳定性。
- 性能考虑:不合理的任务调度策略可能会导致性能下降,在设计任务调度器时要充分考虑系统资源和任务的特点。