C# 简单实现线程池
NET 6 环境开发 实现 线程数量,任务队列,非核心线程,及核心线程活跃时间的管理。
namespace CustomThreadPool;
/// <summary>
/// 线程池类
/// </summary>
public class ThreadPoolExecutor
{
/// <summary>
/// 核心线程的任务队列
/// </summary>
private readonly Queue<WorkTask> tasks = new Queue<WorkTask>();
/// <summary>
/// 最大核心线程数
/// </summary>
private int coreThreadCount;
/// <summary>
/// 最大非核心线程数
/// </summary>
private int noneCoreThreadCount;
/// <summary>
/// 当前运行的核心线程的数量
/// </summary>
private int runCoreThreadCount;
/// <summary>
/// 当前运行的非核心线程的数量
/// </summary>
private int runNoneCoreThreadCount;
/// <summary>
/// 核心线程队列的最大数
/// </summary>
private int maxQueueCount;
/// <summary>
/// 当核心线程空闲时最大活跃时间
/// </summary>
private int keepAliveTimeout;
/// <summary>
/// 设置是否为后台线程
/// </summary>
private bool isBackground;
private ThreadPoolExecutor() { }
/// <summary>
///
/// </summary>
/// <param name="CoreThreadCount">核心线程数</param>
/// <param name="TotalThreadCount">总线程数</param>
/// <param name="IsBackground">是否为后台线程</param>
/// <param name="QueueCount">核心队列的最大数</param>
/// <param name="KeepAliveTimeout">当核心线程空闲时最大活跃时间</param>
/// <exception cref="ArgumentOutOfRangeException"></exception>
/// <exception cref="ArgumentException"></exception>
public ThreadPoolExecutor(int CoreThreadCount = 5, int TotalThreadCount = 10, bool IsBackground = true, int QueueCount = 200, int KeepAliveTimeout = 0)
{
if (CoreThreadCount < 1) throw new ArgumentOutOfRangeException(nameof(CoreThreadCount), CoreThreadCount, null);
if (TotalThreadCount < CoreThreadCount) throw new ArgumentException($"{nameof(TotalThreadCount)}:{TotalThreadCount} must be greater than {nameof(CoreThreadCount)}:{CoreThreadCount}");
if (QueueCount < 0) throw new ArgumentOutOfRangeException(nameof(QueueCount), QueueCount, null);
if (KeepAliveTimeout < 0) throw new ArgumentOutOfRangeException(nameof(KeepAliveTimeout), KeepAliveTimeout, null);
coreThreadCount = CoreThreadCount;
noneCoreThreadCount = TotalThreadCount - CoreThreadCount;
keepAliveTimeout = KeepAliveTimeout;
maxQueueCount = QueueCount;
isBackground = IsBackground;
}
/// <summary>
/// 执行任务
/// </summary>
/// <param name="task">一个自定义任务</param>
/// <exception cref="ArgumentNullException">任务为null时,抛出该错误</exception>
/// <exception cref="NotSupportedException">当核心任务队列已满且非核心线程最大数为0时抛出该错误</exception>
public void QueueTask(WorkTask task)
{
if (task == null) throw new ArgumentNullException(nameof(task));
lock (tasks)
{
tasks.Enqueue(task);
if (tasks.Count <= maxQueueCount)
{
if (runCoreThreadCount < coreThreadCount)
{
++runCoreThreadCount;
Run(true);
}
}
else
{
if (noneCoreThreadCount > 0 && runNoneCoreThreadCount < noneCoreThreadCount)
{
++runNoneCoreThreadCount;
Run(false);
}
}
}
}
private void Run(bool isCore)
{
Tuple<int, bool> state = new(keepAliveTimeout, isCore);
Thread thread = new(t => Excute(t))
{
Name = Guid.NewGuid().ToString("D"),
IsBackground = isBackground
};
thread.Start(state);
}
private void Excute(object? state)
{
if (state == null) return;
var parameter = (Tuple<int, bool>)state;
bool first = true;
DateTime firstTime = DateTime.Now;
while (true)
{
WorkTask? item = null;
lock (tasks)
{
if (tasks.Count > 0)
{
first = true;
item = tasks.Dequeue();
}
else
{
if (parameter.Item2)
{
if (first)
{
firstTime = DateTime.Now;
first = false;
}
if ((DateTime.Now - firstTime).TotalMilliseconds > parameter.Item1)
{
--runCoreThreadCount;
break;
}
}
else
{
--runNoneCoreThreadCount;
break;
}
}
}
item?.Runsynchronous();
}
}
}
namespace CustomThreadPool;
/// <summary>
/// 包装的任务类
/// </summary>
public class WorkTask
{
public static WorkTaskFactory Factory { get; private set; } = new WorkTaskFactory();
/// <summary>
/// 任务运行结束时触发该事件
/// </summary>
public event Action<WorkTask>? TaskCompleted;
/// <summary>
/// 任务ID
/// </summary>
private static int _id = 0;
/// <summary>
/// 委托给任务不带执行参数的代码
/// </summary>
private readonly Action? action;
/// <summary>
/// 委托给任务执行的带输入参数代码
/// </summary>
private readonly Action<object?>? actionWithParamter;
/// <summary>
/// 线程间的同步事件
/// </summary>
public AutoResetEvent WaitHandle { get; protected set; } = new AutoResetEvent(false);
/// <summary>
/// 执行代码的参数
/// </summary>
public object? State { get; protected set; }
/// <summary>
/// 接收任务抛出的异常
/// </summary>
public WorkTaskException? Exception { get; protected set; }
/// <summary>
/// 任务是否完成标志
/// </summary>
public bool IsCompleted { get; protected set; } = false;
/// <summary>
/// 任务知否有异常
/// </summary>
public bool IsFaulted { get; protected set; } = false;
/// <summary>
/// 任务状态
/// </summary>
public WorkTaskStatus Status { get; protected set; } = WorkTaskStatus.Created;
public int Id { get { return Interlocked.Increment(ref _id); } }
protected WorkTask() { }
protected void OnTaskCompleted(WorkTask sender)
{
TaskCompleted?.Invoke(sender);
}
public WorkTask(Action action)
{
this.action = action ?? throw new ArgumentNullException(nameof(action));
}
public WorkTask(Action<object?> action, object state)
{
actionWithParamter = action ?? throw new ArgumentNullException(nameof(action));
this.State = state;
}
/// <summary>
/// 任务的同步方法
/// </summary>
public virtual void Runsynchronous()
{
if (Status != WorkTaskStatus.Created) return;
Status = WorkTaskStatus.Running;
try
{
action?.Invoke();
actionWithParamter?.Invoke(State);
}
catch (Exception ex)
{
Exception = new WorkTaskException(ex.Message, ex);
IsFaulted = true;
}
finally
{
OnTaskCompleted(this);
WaitHandle.Set();
IsCompleted = true;
Status = WorkTaskStatus.RanToCompleted;
}
}
/// <summary>
/// 通过调用线程执行的方法
/// </summary>
public void Start()
{
Factory.ThreadPoolExcutor?.QueueTask(this);
}
/// <summary>
/// 通过调用线程执行的方法
/// </summary>
/// <param name="executor">线程池管理类</param>
public void Start(ThreadPoolExecutor executor)
{
executor.QueueTask(this);
}
/// <summary>
/// 执行一组任务并等待所有任务完成。
/// </summary>
/// <param name="tasks">一组任务</param>
/// <returns>所有任务是否都接收到完成的信号。</returns>
public static bool WaitAll(WorkTask[] tasks)
{
var result = true;
foreach (var task in tasks)
{
result = result && task.WaitHandle.WaitOne();
}
return result;
}
/// <summary>
/// 执行一组任务并等待任意一个任务完成。
/// </summary>
/// <param name="tasks">一组任务</param>
/// <returns>返回已完成任务的索引</returns>
public static int WaitAny(WorkTask[] tasks)
{
var index = new Random().Next(0, tasks.Length - 1);
tasks[index].WaitHandle.WaitOne();
return index;
}
}
/// <summary>
/// 具有返回类型的任务
/// </summary>
/// <typeparam name="TResult"></typeparam>
public class WorkTask<TResult> : WorkTask
{
private readonly Func<TResult>? func;
private readonly Func<object?, TResult>? funcWithParameter;
protected TResult? _result = default(TResult);
public TResult? Result
{
get
{
if (!isSetSignal)
WaitHandle.WaitOne();
return _result;
}
}
public WorkTask(Func<TResult> func)
{
this.func = func ?? throw new ArgumentNullException(nameof(func));
}
public WorkTask(Func<object?, TResult> func, object? state)
{
this.funcWithParameter = func ?? throw new ArgumentNullException(nameof(func));
this.State = state;
}
private bool isSetSignal = false;
public override void Runsynchronous()
{
if (Status != WorkTaskStatus.Created) return;
Status = WorkTaskStatus.Running;
try
{
if (func != null) _result = func();
if (funcWithParameter != null) _result = funcWithParameter(State);
}
catch (Exception ex)
{
Exception = new WorkTaskException(ex.Message, ex);
IsFaulted = true;
}
finally
{
OnTaskCompleted(this);
isSetSignal = WaitHandle.Set();
Status = WorkTaskStatus.RanToCompleted;
IsCompleted = true;
}
}
}
public class WorkTaskException : Exception
{
public WorkTaskException()
{
}
public WorkTaskException(string Message)
: base(Message)
{
}
public WorkTaskException(string Message, Exception InnerException)
: base(Message, InnerException)
{
}
}
public enum WorkTaskStatus
{
/// <summary>
/// 已创建
/// </summary>
Created = 0,
/// <summary>
/// 正在运行
/// </summary>
Running = 1,
/// <summary>
/// 已完成
/// </summary>
RanToCompleted = 2,
}
namespace CustomThreadPool;
public class WorkTaskFactory
{
public ThreadPoolExecutor? ThreadPoolExcutor { get; private set; }
public WorkTaskFactory(ThreadPoolExecutor excutor)
{
ThreadPoolExcutor = excutor;
}
public WorkTaskFactory()
: this(new ThreadPoolExecutor(5, 10))
{
}
public WorkTask StartNew(Action action, ThreadPoolExecutor? executor = null)
{
WorkTask task = new WorkTask(action);
ThreadPoolExcutor = executor ?? ThreadPoolExcutor;
ThreadPoolExcutor?.QueueTask(task);
return task;
}
public WorkTask<TResult> StartNew<TResult>(Func<object?, TResult> func, object? state, ThreadPoolExecutor? executor = null)
{
WorkTask<TResult> task = new WorkTask<TResult>(func, state);
ThreadPoolExcutor = executor ?? ThreadPoolExcutor;
ThreadPoolExcutor?.QueueTask(task);
return task;
}
}
namespace CustomThreadPool;
using System.Threading;
using System.Text;
using System;
using System.Diagnostics;
using System.Reflection.Emit;
class Program
{
static void Main(string[] args)
{
int count = 5;
ThreadPoolExecutor poolExcutor = new(5, 6, QueueCount: 5, KeepAliveTimeout: 2000);
WorkTask<int?>[] workTasks = new WorkTask<int?>[count];
for (int i = 0; i < count; i++) workTasks[i] = WorkTask.Factory.StartNew(t => Action(t), state: i, executor: poolExcutor);
WorkTask<int> task = WorkTask.Factory.StartNew(t =>
{
Thread.Sleep(100);
Console.WriteLine("start thread");
return 100;
}, state: null, executor: poolExcutor);
Console.WriteLine("start main");
WorkTask.WaitAll(workTasks);
Console.WriteLine(task.Result);
Console.WriteLine(workTasks.Sum(t => t.Result));
}
private static int? Action(object? t)
{
Thread.Sleep(2000);
Console.WriteLine($"Task Id:{Environment.CurrentManagedThreadId},Parameter:{t}");
return t == null ? default(int?) : (int)t + 1;
}
}
调用结果