1 public delegate void UserWorkEventHandler<T>(
object sender, WorkQueue<T>
.EnqueueEventArgs e);
2 public class WorkQueue<T>
3 {
4 private bool IsWorking;
//表明处理线程是否正在工作
5 private object lockIsWorking =
new object();
//对IsWorking的同步对象
6
7
8 private Queue<T> queue;
//实际的队列
9 private object lockObj =
new object();
//队列同步对象
10
11
12
13
14
15 /// <summary>
16 /// 绑定用户需要对队列中的item对象
17 /// 施加的操作的事件
18 /// </summary>
19 public event UserWorkEventHandler<T>
UserWork;
20
21 public WorkQueue(
int n)
22 {
23 queue =
new Queue<T>
(n);
24 }
25
26 public WorkQueue()
27 {
28 queue =
new Queue<T>
();
29 }
30
31 /// <summary>
32 /// 谨慎使用此函数,
33 /// 只保证此瞬间,队列值为空
34 /// </summary>
35 /// <returns></returns>
36 public bool IsEmpty()
37 {
38 lock (lockObj)
39 {
40 return queue.Count ==
0;
41 }
42 }
43
44 private bool isOneThread;
45
46 /// <summary>
47 /// 队列处理是否需要单线程顺序执行
48 /// ture表示单线程处理队列的T对象
49 /// 默认为false,表明按照顺序出队,但是多线程处理item
50 /// *****注意不要频繁改变此项****
51 /// </summary>
52 public bool WorkSequential
53 {
54 get
55 {
56 return isOneThread;
57 }
58 set
59 {
60 isOneThread =
value;
61 }
62
63 }
64
65
66
67 /// <summary>
68 /// 向工作队列添加对象,
69 /// 对象添加以后,如果已经绑定工作的事件
70 /// 会触发事件处理程序,对item对象进行处理
71 /// </summary>
72 /// <param name="item">添加到队列的对象</param>
73 public void EnqueueItem(T item)
74 {
75 lock (lockObj)
76 {
77 queue.Enqueue(item);
78 }
79
80 lock (lockIsWorking)
81 {
82 if (!
IsWorking)
83 {
84 IsWorking =
true;
85 ThreadPool.QueueUserWorkItem(doUserWork);
86 }
87 }
88
89
90
91 }
92
93
94 /// <summary>
95 /// 处理队列中对象的函数
96 /// </summary>
97 /// <param name="o"></param>
98 private void doUserWork(
object o)
99 {
100 try
101 {
102 T item;
103
104 while (
true)
105 {
106 lock (lockObj)
107 {
108 if (queue.Count >
0)
109 {
110 item =
queue.Dequeue();
111 }
112 else
113 {
114 return;
115 }
116 }
117 if (!item.Equals(
default(T)))
118 {
119
120 if (isOneThread)
121 {
122 if (UserWork !=
null)
123 {
124 UserWork(
this,
new EnqueueEventArgs(item));
125 }
126 }
127 else
128 {
129 ThreadPool.QueueUserWorkItem(obj =>
130 {
131 if (UserWork !=
null)
132 {
133 UserWork(
this,
new EnqueueEventArgs(obj));
134 }
135 }, item);
136 }
137
138
139 }
140
141 }
142 }
143 finally
144 {
145 lock (lockIsWorking)
146 {
147 IsWorking =
false;
148 }
149
150 }
151 }
152
153 /// <summary>
154 /// UserWork事件的参数,包含item对象
155 /// </summary>
156 public class EnqueueEventArgs : EventArgs
157 {
158 public T Item {
get;
private set; }
159 public EnqueueEventArgs(
object item)
160 {
161 try
162 {
163 Item =
(T)item;
164 }
165 catch (Exception)
166 {
167
168 throw new InvalidCastException(
"object to T 转换失败");
169 }
170 }
171 }
172 }
使用:
1 private static List<
string> list=
new List<
string>(
1000);
2 static StreamWriter sw =
new StreamWriter(
new FileStream(
"test.dat", FileMode.Create));
3 static void Main(
string[] args)
4 {
5 WorkQueue<
int> workQueue=
new WorkQueue<
int>(
1000);
6 workQueue.UserWork +=
new UserWorkEventHandler<
int>
(workQueue_UserWork);
7 // workQueue.WorkSequential = true;
8 ThreadPool.QueueUserWorkItem(o =>
9 {
10 for (
int i =
0; i <
1000; i++
)
11 {
12 workQueue.EnqueueItem(i);
13
14 }
15 });
16 Console.ReadLine();
17
18 list.ForEach(str=>
sw.WriteLine(str));
19 Console.WriteLine(workQueue.IsEmpty());
20 sw.Close();
21 }
22
23 static void workQueue_UserWork(
object sender, WorkQueue<
int>
.EnqueueEventArgs e)
24 {
25
26 StringBuilder sb=
new StringBuilder();
27 sb.Append(e.Item).Append(
"\t\t").Append(DateTime.Now.ToString(
"u")+
"\t\t").Append(Thread.CurrentThread.ManagedThreadId);
28 list.Add(sb.ToString());
29 Thread.Sleep(
15);
30 }
转载于:https://www.cnblogs.com/mschen/p/7771626.html