using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ConsoleApplication1
{
public class LimitedConcurrencyLevelTaskScheduler : System.Threading.Tasks.TaskScheduler
{
#region Memeber Declarations
/// <summary>
/// Whether the current thread is processing work item.
/// </summary>
[ThreadStatic]
private static bool m_bCurrentThreadIsProcessingItems;
/// <summary>
/// The list of tasks to be executed.
/// </summary>
private readonly LinkedList<System.Threading.Tasks.Task> m_lstTasks =
new LinkedList<System.Threading.Tasks.Task>();
//protected by lock(m_lstTasks)
/// <summary>
/// The maximum concurrency level allowed by this scheduler.
/// </summary>
private readonly int m_iMaxDegreeOfParallelism;
/// <summary>
/// whether the scheduler is currently processing work item.
/// </summary>
private int m_iDelegatesQueueOrRunning =
0;
//protected by lock(m_lstTasks)
#endregion
#region Constructor
public LimitedConcurrencyLevelTaskScheduler(
int maxDegreeOfParallelism)
{
if (maxDegreeOfParallelism <
1)
throw new ArgumentOutOfRangeException(
"maxDegreeOfParallelism");
m_iMaxDegreeOfParallelism =
maxDegreeOfParallelism;
}
#endregion
#region Methods
protected sealed override void QueueTask(System.Threading.Tasks.Task task)
{
lock (m_lstTasks)
{
m_lstTasks.AddLast(task);
if (m_iDelegatesQueueOrRunning <
m_iMaxDegreeOfParallelism)
{
++
m_iDelegatesQueueOrRunning;
NotifyThreadPoolPendingWork();
}
}
}
private void NotifyThreadPoolPendingWork()
{
System.Threading.ThreadPool.UnsafeQueueUserWorkItem(_ =>
{
//Note that the current thread is now processing work item.
//This is nessary to enable inlining of tasks into this thread.
m_bCurrentThreadIsProcessingItems =
true;
try
{
while (
true)
{
System.Threading.Tasks.Task item;
lock (m_lstTasks)
{
if (m_lstTasks.Count ==
0)
{
--
m_iDelegatesQueueOrRunning;
break;
}
//Gets the next item from the queue.
item =
m_lstTasks.First.Value;
m_lstTasks.RemoveFirst();
}
//Execute the tak we pulled out of the queue.
base.TryExecuteTask(item);
}
}
//we're done processing item on the current thread.
finally
{
m_bCurrentThreadIsProcessingItems =
false;
}
}, null);
}
///<summary>
///Attempts to execute the specified task on the current thread.
///</summary>
///<param name="task">The task to be executed.</param>
///<param name="taskWasPreviousQueued"></param>
///<returns>whether the task could be executed on the current thread.</returns>
protected sealed override bool TryExecuteTaskInline(System.Threading.Tasks.Task task,
bool taskWasPreviouslyQueued)
{
if (!m_bCurrentThreadIsProcessingItems)
return false;
if (taskWasPreviouslyQueued) TryDequeue(task);
return base.TryExecuteTask(task);
}
protected sealed override bool TryDequeue(System.Threading.Tasks.Task task)
{
lock (m_lstTasks)
return m_lstTasks.Remove(task);
}
public sealed override int MaximumConcurrencyLevel
{
get {
return m_iMaxDegreeOfParallelism; }
}
protected sealed override IEnumerable<System.Threading.Tasks.Task>
GetScheduledTasks()
{
bool lockToken =
false;
try
{
System.Threading.Monitor.TryEnter(m_lstTasks, ref lockToken);
if (lockToken)
return m_lstTasks.ToArray();
else throw new NotSupportedException();
}
finally
{
if (lockToken) System.Threading.Monitor.Exit(m_lstTasks);
}
}
#endregion
}
}
转载于:https://www.cnblogs.com/hongjiumu/archive/2013/01/01/2841546.html