服务器
1
//
2
//
a simple agi server using epoll in linux
3
//
4
//
2010-12-20
5
//
by nsy
6
//
7
#include
<
sys
/
socket.h
>
8
#include
<
sys
/
epoll.h
>
9
#include
<
netinet
/
in
.h
>
10
#include
<
arpa
/
inet.h
>
11
#include
<
fcntl.h
>
12
#include
<
unistd.h
>
13
#include
<
stdio.h
>
14
#include
<
stdlib.h
>
15
#include
<
errno.h
>
16
#include
<
string
.h
>
17
#include
"
CallSvr.h
"
18
#include
<
pthread.h
>
19
#include
"
epoll.h
"
20
21
//
test
22
#include
"
msg.h
"
23
24
//
set event
25
void
EventSet(
struct
myevent_s
*
ev,
int
fd,
int
status)
26
{
27
ev
->
fd
=
fd;
28
ev
->
status
=
status;
29
ev
->
last_active
=
time(NULL);
30
fprintf(stderr,
"
function=%s,line=%d,fd=%d\n
"
,__func__,__LINE__,fd);
31
}
32
//
add/mod an event to epoll
33
void
EventAdd(
int
epollFd,
int
events,
struct
myevent_s
*
ev)
34
{
35
struct
epoll_event epv
=
{
0
, {
0
}};
36
int
op;
37
epv.data.ptr
=
ev;
38
epv.events
=
events;
39
if
(ev
->
status
==
1
){
40
op
=
EPOLL_CTL_MOD;
41
fprintf(stderr,
"
mod:function=%s,line=%d,fd=%d,status=%d\n
"
,__func__,__LINE__,ev
->
fd,ev
->
status);
42
}
43
else
{
44
op
=
EPOLL_CTL_ADD;
45
ev
->
status
=
1
;
46
fprintf(stderr,
"
add:function=%s,line=%d,fd=%d,status=%d\n
"
,__func__,__LINE__,ev
->
fd,ev
->
status);
47
}
48
if
(epoll_ctl(epollFd, op, ev
->
fd,
&
epv)
<
0
)
49
{
50
fprintf(stderr,
"
failed:function=%s,line=%d,fd=%d:errno=%d\n
"
,__func__,__LINE__,ev
->
fd,errno);
51
}
52
else
53
{
54
fprintf(stderr,
"
success:function=%s,line=%d,fd=%d\n
"
,__func__,__LINE__,ev
->
fd);
55
}
56
}
57
//
delete an event from epoll
58
void
EventDel(
int
epollFd,
struct
myevent_s
*
ev)
59
{
60
struct
epoll_event epv
=
{
0
, {
0
}};
61
if
(ev
->
status
!=
1
)
return
;
62
epv.data.ptr
=
ev;
63
ev
->
status
=
0
;
64
epoll_ctl(epollFd, EPOLL_CTL_DEL, ev
->
fd,
&
epv);
65
fprintf(stderr,
"
function=%s,line=%d,fd=%d\n
"
,__func__,__LINE__,ev
->
fd);
66
}
67
68
//
receive data
69
void
RecvData(
struct
myevent_s
*
ev)
70
{
71
msg_header header;
72
int
recvbytes;
73
if
((recvbytes
=
recv(ev
->
fd,
&
header,
sizeof
(msg_header),
0
))
==-
1
)
74
{
75
fprintf(stderr,
"
RecvHeaderErr:function=%s,line=%d,fd=%d\n
"
,__func__,__LINE__,ev
->
fd);
76
goto
errret;
77
}
78
if
(recvbytes
==
sizeof
(msg_header))
79
{
80
switch
(header.msg_type)
81
{
82
case
msg_lost:
83
{
84
rq_lost rq;
85
if
((recvbytes
=
recv(ev
->
fd, ((
char
*
)(
&
rq))
+
sizeof
(msg_header),
sizeof
(rq_lost)
-
sizeof
(msg_header),
0
))
==-
1
)
86
{
87
fprintf(stderr,
"
RecvAfter:function=%s,line=%d,fd=%d\n
"
,__func__,__LINE__,ev
->
fd);
88
goto
errret;
89
}
90
else
if
(recvbytes
>
0
)
91
{
92
printf(
"
recv sucess%d,hope to recv %d\n
"
,recvbytes,
sizeof
(rq_lost)
-
sizeof
(msg_header));
93
printf(
"
cardno is '%s'\n
"
,rq.cardno);
94
printf(
"
password is '%s'\n
"
,rq.password);
95
96
//
init ev recv
97
memcpy(
&
ev
->
header,
&
header,
sizeof
(msg_header));
98
memcpy(
&
ev
->
recv_buff,
&
rq,
sizeof
(rq_lost));
99
ev
->
recv_len
=
recvbytes;
100
fprintf(stderr,
"
addtologicqueue:function=%s,line=%d,fd=%d\n
"
,__func__,__LINE__,ev
->
fd);
101
//
add to logic queue
102
sem_wait(
&
bin_sem_logic_data_produce);
103
struct
QUEUE_LOGIC_DATA_ITEM
*
item;
104
item
=
malloc(
sizeof
(
struct
QUEUE_LOGIC_DATA_ITEM));
105
item
->
ev
=
ev;
106
pthread_mutex_lock(
&
queue_logic_data_mutex);
107
TAILQ_INSERT_TAIL(
&
queue_logic_data_header,item,logic_data_entries);
108
pthread_mutex_unlock(
&
queue_logic_data_mutex);
109
sem_post(
&
bin_sem_logic_data_consume);
110
fprintf(stderr,
"
addtologicqueue--end:function=%s,line=%d,fd=%d\n
"
,__func__,__LINE__,ev
->
fd);
111
}
else
112
{
113
fprintf(stderr,
"
RecvAfter:function=%s,line=%d,fd=%d:errno=%d\n
"
,__func__,__LINE__,ev
->
fd,errno);
114
goto
errret;
115
}
116
break
;
117
}
118
}
119
return
;
//
switch end function end
120
}
else
121
{
122
fprintf(stderr,
"
function=%s,line=%d,fd=%d\n
"
,__func__,__LINE__,ev
->
fd);
123
goto
errret;
124
}
125
errret:
126
EventDel(g_epollFd, ev);
127
close(ev
->
fd);
128
}
129
//
send data
130
void
SendData(
struct
myevent_s
*
ev)
131
{
132
fprintf(stderr,
"
JustIn:function=%s,line=%d,fd=%d\n
"
,__func__,__LINE__,ev
->
fd);
133
int
len;
134
//
send data
135
len
=
send(ev
->
fd, ev
->
send_buff, ev
->
send_len,
0
);
136
ev
->
send_len
=
0
;
137
fprintf(stderr,
"
sendlen=%d:function=%s,line=%d,fd=%d\n
"
,len,__func__,__LINE__,ev
->
fd);
138
if
(len
<
0
)
139
{
140
close(ev
->
fd);
141
fprintf(stderr,
"
err=%d:function=%s,line=%d,fd=%d\n
"
,errno,__func__,__LINE__,ev
->
fd);
142
}
else
143
{
144
//
let system known can recv
145
EventAdd(g_epollFd, EPOLLIN
|
EPOLLET,ev);
146
}
147
}
148
149
void
*
accept_thread_work(
void
*
arg)
150
{
151
while
(
1
)
152
{
153
int
*
plistenFd
=
(
int
*
)arg;
154
fprintf(stderr,
"
function=%s,line=%d,listenfd=%d\n
"
,__func__,__LINE__,
*
plistenFd);
155
struct
sockaddr_in sin;
156
socklen_t len
=
sizeof
(
struct
sockaddr_in);
157
int
nfd, i;
158
//
accept
159
if
((nfd
=
accept(
*
plistenFd, (
struct
sockaddr
*
)
&
sin,
&
len))
==
-
1
)
160
{
161
if
(errno
!=
EAGAIN
&&
errno
!=
EINTR)
162
{
163
fprintf(stderr,
"
%s: bad accept
"
, __func__);
164
}
165
continue
;
166
}
167
do
168
{
169
for
(i
=
0
; i
<
MAX_EVENTS; i
++
)
170
{
171
if
(g_Events[i].status
==
0
)
172
{
173
fprintf(stderr,
"
function=%s,line=%d,listenfd=%d,currentindex=%d\n
"
,__func__,__LINE__,
*
plistenFd,i);
174
break
;
175
}
176
}
177
if
(i
==
MAX_EVENTS)
178
{
179
fprintf(stderr,
"
max events:function=%s,line=%d,listenFd=%d\n
"
,__func__,__LINE__,
*
plistenFd);
180
break
;
181
}
182
//
set nonblocking
183
fprintf(stderr,
"
set nonblocking:function=%s,line=%d,listenfd=%d\n
"
,__func__,__LINE__,
*
plistenFd);
184
if
(fcntl(nfd, F_SETFL, O_NONBLOCK)
<
0
)
break
;
185
//
add a read event for receive data
186
EventSet(
&
g_Events[i], nfd,
0
);
187
EventAdd(g_epollFd, EPOLLIN
|
EPOLLET,
&
g_Events[i]);
188
fprintf(stderr,
"
new conn[%s:%d][time:%d]\n
"
, inet_ntoa(sin.sin_addr), ntohs(sin.sin_port),(
int
) g_Events[i].last_active);
189
}
while
(
0
);
190
}
191
return
NULL;
192
}
193
194
void
*
epoll_wait_thread_work(
void
*
arg)
195
{
196
fprintf(stderr,
"
justin:function=%s,line=%d\n
"
,__func__,__LINE__);
197
//
event loop
198
struct
epoll_event events[MAX_EVENTS];
199
200
int
checkPos
=
0
;
201
while
(
1
){
202
//
a simple timeout check here, every time 100, better to use a mini-heap, and add timer event
203
long
now
=
time(NULL);
204
int
i;
205
for
(i
=
0
; i
<
100
; i
++
, checkPos
++
)
//
doesn't check listen fd
206
{
207
if
(checkPos
==
MAX_EVENTS) checkPos
=
0
;
//
recycle
208
if
(g_Events[checkPos].status
!=
1
)
continue
;
209
long
duration
=
now
-
g_Events[checkPos].last_active;
210
if
(duration
>=
60
)
//
60s timeout
211
{
212
close(g_Events[checkPos].fd);
213
fprintf(stderr,
"
[fd=%d] timeout[%d--%d].\n
"
,(
int
) g_Events[checkPos].fd,(
int
) g_Events[checkPos].last_active, (
int
)now);
214
EventDel(g_epollFd,
&
g_Events[checkPos]);
215
}
216
}
217
//
wait for events to happen
218
int
fds
=
epoll_wait(g_epollFd, events, MAX_EVENTS,
1000
);
219
if
(fds
<
0
){
220
fprintf(stderr,
"
epoll_wait error, exit\n
"
);
221
break
;
222
}
223
for
(i
=
0
; i
<
fds; i
++
){
224
struct
myevent_s
*
ev
=
(
struct
myevent_s
*
)events[i].data.ptr;
225
if
(events[i].events
&
EPOLLIN)
//
read event
226
{
227
sem_wait(
&
bin_sem_recv_fd_produce);
228
fprintf(stderr,
"
readEvent:function=%s,line=%d:fd=%d\n
"
,__func__,__LINE__,ev
->
fd);
229
//
ev->call_back(ev->fd, events[i].events, ev->arg);
230
struct
QUEUE_RECV_FD_ITEM
*
item;
231
item
=
malloc(
sizeof
(
struct
QUEUE_RECV_FD_ITEM));
232
item
->
ev
=
ev;
233
pthread_mutex_lock(
&
queue_recv_fd_mutex);
234
TAILQ_INSERT_TAIL(
&
queue_recv_fd_header,item,recv_fd_entries);
235
pthread_mutex_unlock(
&
queue_recv_fd_mutex);
236
sem_post(
&
bin_sem_recv_fd_consume);
237
}
else
if
(events[i].events
&
EPOLLOUT)
//
write event
238
{
239
sem_post(
&
bin_sem_send_fd_consume);
240
fprintf(stderr,
"
post send fd consume:function=%s,line=%d:fd=%d\n
"
,__func__,__LINE__,ev
->
fd);
241
}
242
}
243
}
244
return
NULL;
245
}
246
247
void
*
recv_data_thread_work(
void
*
arg)
248
{
249
while
(
1
)
250
{
251
sem_wait(
&
bin_sem_recv_fd_consume);
252
fprintf(stderr,
"
justin:function=%s,line=%d\n
"
,__func__,__LINE__);
253
int
index
=
(
int
)arg;
254
fprintf(stderr,
"
recv thread id is %d\n
"
,index);
255
pthread_mutex_lock(
&
queue_recv_fd_mutex);
256
struct
QUEUE_RECV_FD_ITEM
*
item;
257
item
=
TAILQ_FIRST(
&
queue_recv_fd_header);
258
TAILQ_REMOVE(
&
queue_recv_fd_header,item,recv_fd_entries);
259
pthread_mutex_unlock(
&
queue_recv_fd_mutex);
260
RecvData(item
->
ev);
261
}
262
return
NULL;
263
}
264
265
void
*
send_data_thread_work(
void
*
arg)
266
{
267
while
(
1
)
268
{
269
sem_wait(
&
bin_sem_send_fd_consume);
270
fprintf(stderr,
"
justin:function=%s,line=%d\n
"
,__func__,__LINE__);
271
pthread_mutex_lock(
&
queue_send_fd_mutex);
272
struct
QUEUE_SEND_FD_ITEM
*
item;
273
item
=
TAILQ_FIRST(
&
queue_send_fd_header);
274
TAILQ_REMOVE(
&
queue_send_fd_header,item,send_fd_entries);
275
pthread_mutex_unlock(
&
queue_send_fd_mutex);
276
SendData(item
->
ev);
277
}
278
return
NULL;
279
}
280
281
void
*
logic_data_thread_work(
void
*
arg)
282
{
283
while
(
1
)
284
{
285
//
remove logic queue
286
sem_wait(
&
bin_sem_logic_data_consume);
287
//
for test
288
int
index
=
(
int
)arg;
289
fprintf(stderr,
"
logic thread id is %d\n
"
,index);
290
291
pthread_mutex_lock(
&
queue_logic_data_mutex);
292
struct
QUEUE_LOGIC_DATA_ITEM
*
item;
293
item
=
TAILQ_FIRST(
&
queue_logic_data_header);
294
TAILQ_REMOVE(
&
queue_logic_data_header,item,logic_data_entries);
295
pthread_mutex_unlock(
&
queue_logic_data_mutex);
296
//
logic header
297
switch
(item
->
ev
->
header.msg_type)
298
{
299
case
msg_lost:
300
{
301
rq_lost
*
rq
=
(rq_lost
*
)item
->
ev
->
recv_buff;
302
303
rs_lost rs;
304
rs.header.msg_type
=
msg_lost;
305
rs.header.size
=
sizeof
(rs_lost);
306
rs.header.length
=
0
;
307
308
if
(strcmp(rq
->
cardno,
"
12345
"
)
==
0
)
309
{
310
rs.is_ok
=
1
;
311
}
312
else
313
{
314
rs.is_ok
=
0
;
315
}
316
memcpy(
&
item
->
ev
->
header,
&
rs.header,
sizeof
(msg_header));
317
item
->
ev
->
send_len
=
sizeof
(rs);
318
memcpy(item
->
ev
->
send_buff,
&
rs,
sizeof
(rs));
319
break
;
320
}
321
}
322
323
//
add to send fd queue
324
sem_wait(
&
bin_sem_send_fd_produce);
325
fprintf(stderr,
"
after wait send fd produce\n
"
);
326
struct
QUEUE_SEND_FD_ITEM
*
sendItem;
327
sendItem
=
malloc(
sizeof
(
struct
QUEUE_SEND_FD_ITEM));
328
sendItem
->
ev
=
item
->
ev;
329
pthread_mutex_lock(
&
queue_send_fd_mutex);
330
TAILQ_INSERT_TAIL(
&
queue_send_fd_header,sendItem,send_fd_entries);
331
pthread_mutex_unlock(
&
queue_send_fd_mutex);
332
//
let system known can send
333
EventAdd(g_epollFd, EPOLLOUT
|
EPOLLET, item
->
ev);
334
}
335
return
NULL;
336
}
337
338
int
main(
int
argc,
char
**
argv)
339
{
340
int
res;
341
//
recv fd queue
342
TAILQ_INIT(
&
queue_recv_fd_header);
343
res
=
sem_init(
&
bin_sem_recv_fd_consume,
0
,
0
);
344
if
(res)
345
{
346
fprintf(stderr,
"
sem init consume failed\n
"
);
347
exit(EXIT_FAILURE);
348
}
349
350
res
=
sem_init(
&
bin_sem_recv_fd_produce,
0
,MAX_EVENTS);
351
if
(res)
352
{
353
fprintf(stderr,
"
sem init produce failed\n
"
);
354
exit(EXIT_FAILURE);
355
}
356
357
res
=
pthread_mutex_init(
&
queue_recv_fd_mutex,NULL);
358
if
(res
!=
0
)
359
{
360
perror(
"
create mutex for queue recv failed\n
"
);
361
exit(EXIT_FAILURE);
362
}
363
//
logic data queue
364
TAILQ_INIT(
&
queue_logic_data_header);
365
res
=
sem_init(
&
bin_sem_logic_data_consume,
0
,
0
);
366
if
(res)
367
{
368
fprintf(stderr,
"
sem init logic data consume failed\n
"
);
369
exit(EXIT_FAILURE);
370
}
371
372
res
=
sem_init(
&
bin_sem_logic_data_produce,
0
,MAX_EVENTS);
373
if
(res)
374
{
375
fprintf(stderr,
"
sem init logic data produce failed\n
"
);
376
exit(EXIT_FAILURE);
377
}
378
379
res
=
pthread_mutex_init(
&
queue_logic_data_mutex,NULL);
380
if
(res
!=
0
)
381
{
382
perror(
"
create mutex for queue logic data failed\n
"
);
383
exit(EXIT_FAILURE);
384
}
385
386
//
send fd queue
387
TAILQ_INIT(
&
queue_send_fd_header);
388
res
=
sem_init(
&
bin_sem_send_fd_consume,
0
,
0
);
389
if
(res)
390
{
391
fprintf(stderr,
"
sem init send fd consume failed\n
"
);
392
exit(EXIT_FAILURE);
393
}
394
395
res
=
sem_init(
&
bin_sem_send_fd_produce,
0
,MAX_EVENTS);
396
if
(res)
397
{
398
fprintf(stderr,
"
sem init send fd produce failed\n
"
);
399
exit(EXIT_FAILURE);
400
}
401
402
res
=
pthread_mutex_init(
&
queue_send_fd_mutex,NULL);
403
if
(res
!=
0
)
404
{
405
perror(
"
create mutex for queue send fd failed\n
"
);
406
exit(EXIT_FAILURE);
407
}
408
409
short
port
=
3342
;
//
default port
410
if
(argc
==
2
){
411
port
=
atoi(argv[
1
]);
412
}
413
//
create epoll
414
g_epollFd
=
epoll_create(MAX_EVENTS);
415
if
(g_epollFd
<=
0
)
416
{
417
fprintf(stderr,
"
create epoll failed:fd=%d:function=%s,line=%d\n
"
, g_epollFd,__func__,__LINE__);
418
exit(EXIT_FAILURE);
419
}
420
//
create & bind listen socket
421
int
listenFd
=
socket(AF_INET, SOCK_STREAM,
0
);
422
//
bind & listen
423
struct
sockaddr_in sin;
424
bzero(
&
sin,
sizeof
(sin));
425
sin.sin_family
=
AF_INET;
426
sin.sin_addr.s_addr
=
INADDR_ANY;
427
sin.sin_port
=
htons(port);
428
bind(listenFd, (
const
struct
sockaddr
*
)
&
sin,
sizeof
(sin));
429
listen(listenFd,
5
);
430
fprintf(stderr,
"
server running:port[%d]\n
"
, port);
431
//
create accept thread
432
433
void
*
thread_result;
434
pthread_t accept_t;
435
res
=
pthread_create(
&
accept_t,NULL,accept_thread_work,(
void
*
)
&
listenFd);
436
if
(res
!=
0
)
437
{
438
perror(
"
accept create failed\n
"
);
439
exit(EXIT_FAILURE);
440
}
441
442
//
create epoll wait thread
443
pthread_t epoll_wait_t;
444
res
=
pthread_create(
&
epoll_wait_t,NULL,epoll_wait_thread_work,NULL);
445
if
(res
!=
0
)
446
{
447
perror(
"
create epoll wait thread failed\n
"
);
448
exit(EXIT_FAILURE);
449
}
450
//
create two recv data thread
451
pthread_t recv_data_t;
452
res
=
pthread_create(
&
recv_data_t,NULL,recv_data_thread_work,(
void
*
)
1
);
453
if
(res
!=
0
)
454
{
455
perror(
"
create recv data thread failed\n
"
);
456
exit(EXIT_FAILURE);
457
}
458
459
pthread_t recv_data_t_1;
460
res
=
pthread_create(
&
recv_data_t_1,NULL,recv_data_thread_work,(
void
*
)
2
);
461
if
(res
!=
0
)
462
{
463
perror(
"
create recv data thread failed\n
"
);
464
exit(EXIT_FAILURE);
465
}
466
//
create two send data thread
467
pthread_t send_data_t;
468
res
=
pthread_create(
&
send_data_t,NULL,send_data_thread_work,(
void
*
)
1
);
469
if
(res
!=
0
)
470
{
471
perror(
"
create send data thread failed\n
"
);
472
exit(EXIT_FAILURE);
473
}
474
475
pthread_t send_data_t_1;
476
res
=
pthread_create(
&
send_data_t_1,NULL,send_data_thread_work,(
void
*
)
2
);
477
if
(res
!=
0
)
478
{
479
perror(
"
create send data thread failed\n
"
);
480
exit(EXIT_FAILURE);
481
}
482
483
//
create two logic work thread
484
pthread_t logic_work_t;
485
res
=
pthread_create(
&
logic_work_t,NULL,logic_data_thread_work,(
void
*
)
1
);
486
if
(res
!=
0
)
487
{
488
perror(
"
create logic work thread failed\n
"
);
489
exit(EXIT_FAILURE);
490
}
491
492
pthread_t logic_work_t_1;
493
res
=
pthread_create(
&
logic_work_t_1,NULL,logic_data_thread_work,(
void
*
)
2
);
494
if
(res
!=
0
)
495
{
496
perror(
"
create logic work thread failed\n
"
);
497
exit(EXIT_FAILURE);
498
}
499
500
//
wait child thread
501
res
=
pthread_join(accept_t,
&
thread_result);
502
if
(res
!=
0
)
503
{
504
perror(
"
accept thread join failed\n
"
);
505
exit(EXIT_FAILURE);
506
}
507
508
//
wait child thread
509
res
=
pthread_join(epoll_wait_t,
&
thread_result);
510
if
(res
!=
0
)
511
{
512
perror(
"
epoll wait thread join failed\n
"
);
513
exit(EXIT_FAILURE);
514
}
515
516
//
wait child thread
517
res
=
pthread_join(recv_data_t,
&
thread_result);
518
if
(res
!=
0
)
519
{
520
perror(
"
recv data thread join failed\n
"
);
521
exit(EXIT_FAILURE);
522
}
523
//
wait child thread
524
res
=
pthread_join(recv_data_t_1,
&
thread_result);
525
if
(res
!=
0
)
526
{
527
perror(
"
recv data thread join failed\n
"
);
528
exit(EXIT_FAILURE);
529
}
530
531
//
wait child thread
532
res
=
pthread_join(send_data_t,
&
thread_result);
533
if
(res
!=
0
)
534
{
535
perror(
"
send data thread join failed\n
"
);
536
exit(EXIT_FAILURE);
537
}
538
//
wait child thread
539
res
=
pthread_join(send_data_t_1,
&
thread_result);
540
if
(res
!=
0
)
541
{
542
perror(
"
send data thread join failed\n
"
);
543
exit(EXIT_FAILURE);
544
}
545
//
wait child thread
546
res
=
pthread_join(logic_work_t,
&
thread_result);
547
if
(res
!=
0
)
548
{
549
perror(
"
logic work thread join failed\n
"
);
550
exit(EXIT_FAILURE);
551
}
552
//
wait child thread
553
res
=
pthread_join(logic_work_t_1,
&
thread_result);
554
if
(res
!=
0
)
555
{
556
perror(
"
logic work thread join failed\n
"
);
557
exit(EXIT_FAILURE);
558
}
559
//
free resource
560
close(g_epollFd);
561
sem_destroy(
&
bin_sem_recv_fd_consume);
562
sem_destroy(
&
bin_sem_recv_fd_produce);
563
pthread_mutex_destroy(
&
queue_recv_fd_mutex);
564
565
sem_destroy(
&
bin_sem_logic_data_consume);
566
sem_destroy(
&
bin_sem_logic_data_produce);
567
pthread_mutex_destroy(
&
queue_logic_data_mutex);
568
569
sem_destroy(
&
bin_sem_send_fd_consume);
570
sem_destroy(
&
bin_sem_send_fd_produce);
571
pthread_mutex_destroy(
&
queue_send_fd_mutex);
572
return
0
;
573
服务器头
1
#ifndef _epoll_h_
2
#define
_epoll_h_
3
4
#include
"
sys/queue.h
"
5
#include
<
semaphore.h
>
6
#include
"
msg.h
"
7
8
#define
MAX_EVENTS 500
9
10
int
g_epollFd;
11
12
void
*
accept_thread_work(
void
*
arg);
13
void
*
epoll_wait_thread_work(
void
*
arg);
14
void
*
recv_data_thread_work(
void
*
arg);
15
void
*
send_data_thread_work(
void
*
arg);
16
void
*
logic_data_thread_work(
void
*
arg);
17
18
struct
myevent_s
19
{
20
int
fd;
21
int
status;
//
1: in epoll wait list, 0 not in
22
msg_header header;
23
char
recv_buff[
256
];
//
recv data buffer
24
int
recv_len;
25
char
send_buff[
256
];
//
send data buffer
26
int
send_len;
27
long
last_active;
//
last active time
28
};
29
30
struct
myevent_s g_Events[MAX_EVENTS
+
1
];
//
g_Events[MAX_EVENTS] is used by listen fd
31
32
//
recv fd queue
33
struct
QUEUE_RECV_FD_ITEM{
34
struct
myevent_s
*
ev;
35
TAILQ_ENTRY(QUEUE_RECV_FD_ITEM) recv_fd_entries;
36
};
37
38
TAILQ_HEAD(,QUEUE_RECV_FD_ITEM) queue_recv_fd_header;
39
40
sem_t bin_sem_recv_fd_produce;
41
sem_t bin_sem_recv_fd_consume;
42
43
pthread_mutex_t queue_recv_fd_mutex;
44
45
//
send fd queue
46
struct
QUEUE_SEND_FD_ITEM{
47
struct
myevent_s
*
ev;
48
TAILQ_ENTRY(QUEUE_SEND_FD_ITEM) send_fd_entries;
49
};
50
51
TAILQ_HEAD(,QUEUE_SEND_FD_ITEM) queue_send_fd_header;
52
53
sem_t bin_sem_send_fd_produce;
54
sem_t bin_sem_send_fd_consume;
55
56
pthread_mutex_t queue_send_fd_mutex;
57
58
//
logic data buff
59
struct
QUEUE_LOGIC_DATA_ITEM{
60
struct
myevent_s
*
ev;
61
TAILQ_ENTRY(QUEUE_LOGIC_DATA_ITEM) logic_data_entries;
62
};
63
64
TAILQ_HEAD(,QUEUE_LOGIC_DATA_ITEM) queue_logic_data_header;
65
66
sem_t bin_sem_logic_data_produce;
67
sem_t bin_sem_logic_data_consume;
68
69
pthread_mutex_t queue_logic_data_mutex;
70
71
#endif
72
}
转载于:https://www.cnblogs.com/nanshouyong326/archive/2010/12/21/1912894.html
相关资源:epoll多线程高并发服务器代码
转载请注明原文地址: https://win8.8miu.com/read-1482301.html