Thread并发数控制

it2022-05-05  86

using System; using System.Collections.Generic; using System.Linq; using System.Text; namespace ConsoleApplication1 { public class LimitedConcurrencyLevelTaskScheduler : System.Threading.Tasks.TaskScheduler { #region Memeber Declarations /// <summary> /// Whether the current thread is processing work item. /// </summary> [ThreadStatic] private static bool m_bCurrentThreadIsProcessingItems; /// <summary> /// The list of tasks to be executed. /// </summary> private readonly LinkedList<System.Threading.Tasks.Task> m_lstTasks = new LinkedList<System.Threading.Tasks.Task>(); //protected by lock(m_lstTasks) /// <summary> /// The maximum concurrency level allowed by this scheduler. /// </summary> private readonly int m_iMaxDegreeOfParallelism; /// <summary> /// whether the scheduler is currently processing work item. /// </summary> private int m_iDelegatesQueueOrRunning = 0;//protected by lock(m_lstTasks) #endregion #region Constructor public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism) { if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism"); m_iMaxDegreeOfParallelism = maxDegreeOfParallelism; } #endregion #region Methods protected sealed override void QueueTask(System.Threading.Tasks.Task task) { lock (m_lstTasks) { m_lstTasks.AddLast(task); if (m_iDelegatesQueueOrRunning < m_iMaxDegreeOfParallelism) { ++m_iDelegatesQueueOrRunning; NotifyThreadPoolPendingWork(); } } } private void NotifyThreadPoolPendingWork() { System.Threading.ThreadPool.UnsafeQueueUserWorkItem(_ => { //Note that the current thread is now processing work item. //This is nessary to enable inlining of tasks into this thread. m_bCurrentThreadIsProcessingItems = true; try { while (true) { System.Threading.Tasks.Task item; lock (m_lstTasks) { if (m_lstTasks.Count == 0) { --m_iDelegatesQueueOrRunning; break; } //Gets the next item from the queue. item = m_lstTasks.First.Value; m_lstTasks.RemoveFirst(); } //Execute the tak we pulled out of the queue. base.TryExecuteTask(item); } } //we're done processing item on the current thread. finally { m_bCurrentThreadIsProcessingItems = false; } }, null); } ///<summary> ///Attempts to execute the specified task on the current thread. ///</summary> ///<param name="task">The task to be executed.</param> ///<param name="taskWasPreviousQueued"></param> ///<returns>whether the task could be executed on the current thread.</returns> protected sealed override bool TryExecuteTaskInline(System.Threading.Tasks.Task task, bool taskWasPreviouslyQueued) { if (!m_bCurrentThreadIsProcessingItems) return false; if (taskWasPreviouslyQueued) TryDequeue(task); return base.TryExecuteTask(task); } protected sealed override bool TryDequeue(System.Threading.Tasks.Task task) { lock (m_lstTasks) return m_lstTasks.Remove(task); } public sealed override int MaximumConcurrencyLevel { get { return m_iMaxDegreeOfParallelism; } } protected sealed override IEnumerable<System.Threading.Tasks.Task> GetScheduledTasks() { bool lockToken = false; try { System.Threading.Monitor.TryEnter(m_lstTasks, ref lockToken); if (lockToken) return m_lstTasks.ToArray(); else throw new NotSupportedException(); } finally { if (lockToken) System.Threading.Monitor.Exit(m_lstTasks); } } #endregion } }

 

转载于:https://www.cnblogs.com/hongjiumu/archive/2013/01/01/2841546.html


最新回复(0)