epoll 多线程 服务器

it2022-05-09  25

服务器   1  //     2  //  a simple agi server using epoll in linux   3  //     4  //  2010-12-20   5  //  by nsy   6  //   7  #include  < sys / socket.h >   8  #include  < sys / epoll.h >   9  #include  < netinet / in .h >  10  #include  < arpa / inet.h >  11  #include  < fcntl.h >  12  #include  < unistd.h >  13  #include  < stdio.h >  14  #include  < stdlib.h >  15  #include  < errno.h >  16  #include  < string .h >  17  #include  " CallSvr.h "  18  #include  < pthread.h >  19  #include  " epoll.h "  20   21  // test  22  #include  " msg.h "  23   24  //  set event  25  void  EventSet( struct  myevent_s  * ev,  int  fd, int  status)  26  {  27    ev -> fd  =  fd;  28    ev -> status  =  status;  29    ev -> last_active  =  time(NULL);  30    fprintf(stderr, " function=%s,line=%d,fd=%d\n " ,__func__,__LINE__,fd);  31  }  32  //  add/mod an event to epoll  33  void  EventAdd( int  epollFd,  int  events, struct  myevent_s  * ev)  34  {  35     struct  epoll_event epv  =  { 0 , { 0 }};  36     int  op;  37    epv.data.ptr  =  ev;  38    epv.events  =  events;  39     if (ev -> status  ==   1 ){  40      op  =  EPOLL_CTL_MOD;  41      fprintf(stderr, " mod:function=%s,line=%d,fd=%d,status=%d\n " ,__func__,__LINE__,ev -> fd,ev -> status);  42    }  43     else {  44      op  =  EPOLL_CTL_ADD;  45      ev -> status  =   1 ;  46      fprintf(stderr, " add:function=%s,line=%d,fd=%d,status=%d\n " ,__func__,__LINE__,ev -> fd,ev -> status);  47    }  48     if (epoll_ctl(epollFd, op, ev -> fd,  & epv)  <   0 )  49      {  50        fprintf(stderr, " failed:function=%s,line=%d,fd=%d:errno=%d\n " ,__func__,__LINE__,ev -> fd,errno);  51      }  52     else  53      {  54        fprintf(stderr, " success:function=%s,line=%d,fd=%d\n " ,__func__,__LINE__,ev -> fd);  55      }  56  }  57  //  delete an event from epoll  58  void  EventDel( int  epollFd, struct  myevent_s  * ev)  59  {  60     struct  epoll_event epv  =  { 0 , { 0 }};  61     if (ev -> status  !=   1 return ;  62    epv.data.ptr  =  ev;  63    ev -> status  =   0 ;  64    epoll_ctl(epollFd, EPOLL_CTL_DEL, ev -> fd,  & epv);  65    fprintf(stderr, " function=%s,line=%d,fd=%d\n " ,__func__,__LINE__,ev -> fd);  66  }  67   68  //  receive data  69  void  RecvData( struct  myevent_s  * ev)  70  {  71    msg_header header;  72     int  recvbytes;  73     if  ((recvbytes = recv(ev -> fd,  & header,  sizeof (msg_header),  0 ))  ==- 1  74      {  75        fprintf(stderr, " RecvHeaderErr:function=%s,line=%d,fd=%d\n " ,__func__,__LINE__,ev -> fd);  76         goto  errret;  77      }  78     if (recvbytes  ==   sizeof (msg_header))  79      {  80         switch (header.msg_type)  81      {  82       case  msg_lost:  83        {  84          rq_lost rq;  85           if  ((recvbytes = recv(ev -> fd, (( char * )( & rq)) + sizeof (msg_header),  sizeof (rq_lost) - sizeof (msg_header),  0 ))  ==- 1 )  86            {  87          fprintf(stderr, " RecvAfter:function=%s,line=%d,fd=%d\n " ,__func__,__LINE__,ev -> fd);  88           goto  errret;  89            }  90           else   if (recvbytes  >   0 )  91            {                  92          printf( " recv sucess%d,hope to recv %d\n " ,recvbytes, sizeof (rq_lost) - sizeof (msg_header));  93          printf( " cardno is '%s'\n " ,rq.cardno);  94          printf( " password is '%s'\n " ,rq.password);  95        96           // init ev recv  97          memcpy( & ev -> header, & header, sizeof (msg_header));  98          memcpy( & ev -> recv_buff, & rq, sizeof (rq_lost));  99          ev -> recv_len  =  recvbytes; 100          fprintf(stderr, " addtologicqueue:function=%s,line=%d,fd=%d\n " ,__func__,__LINE__,ev -> fd); 101           // add to logic queue 102          sem_wait( & bin_sem_logic_data_produce); 103           struct  QUEUE_LOGIC_DATA_ITEM  * item; 104          item  =  malloc( sizeof ( struct  QUEUE_LOGIC_DATA_ITEM)); 105          item -> ev  =  ev; 106          pthread_mutex_lock( & queue_logic_data_mutex); 107          TAILQ_INSERT_TAIL( & queue_logic_data_header,item,logic_data_entries); 108          pthread_mutex_unlock( & queue_logic_data_mutex); 109          sem_post( & bin_sem_logic_data_consume); 110          fprintf(stderr, " addtologicqueue--end:function=%s,line=%d,fd=%d\n " ,__func__,__LINE__,ev -> fd); 111            } else 112            { 113          fprintf(stderr, " RecvAfter:function=%s,line=%d,fd=%d:errno=%d\n " ,__func__,__LINE__,ev -> fd,errno); 114           goto  errret; 115            } 116           break ; 117        } 118      } 119         return ; // switch end function end 120      } else 121      { 122        fprintf(stderr, " function=%s,line=%d,fd=%d\n " ,__func__,__LINE__,ev -> fd); 123         goto  errret; 124      } 125   errret: 126    EventDel(g_epollFd, ev); 127    close(ev -> fd); 128  } 129  //  send data 130  void  SendData( struct  myevent_s  * ev) 131  { 132    fprintf(stderr, " JustIn:function=%s,line=%d,fd=%d\n " ,__func__,__LINE__,ev -> fd); 133     int  len; 134     //  send data 135    len  =  send(ev -> fd, ev -> send_buff, ev -> send_len,  0 ); 136    ev -> send_len  =   0 ; 137    fprintf(stderr, " sendlen=%d:function=%s,line=%d,fd=%d\n " ,len,__func__,__LINE__,ev -> fd); 138     if (len  <   0 )    139      { 140        close(ev -> fd); 141        fprintf(stderr, " err=%d:function=%s,line=%d,fd=%d\n " ,errno,__func__,__LINE__,ev -> fd); 142      } else 143      { 144         // let system known can recv 145        EventAdd(g_epollFd, EPOLLIN | EPOLLET,ev); 146      } 147  } 148  149  void   * accept_thread_work( void   * arg) 150  { 151     while ( 1 ) 152      { 153         int *  plistenFd  =  ( int * )arg; 154        fprintf(stderr, " function=%s,line=%d,listenfd=%d\n " ,__func__,__LINE__, * plistenFd); 155         struct  sockaddr_in sin; 156        socklen_t len  =   sizeof ( struct  sockaddr_in); 157         int  nfd, i; 158         //  accept 159         if ((nfd  =  accept( * plistenFd, ( struct  sockaddr * ) & sin,  & len))  ==   - 1 ) 160      { 161         if (errno  !=  EAGAIN  &&  errno  !=  EINTR) 162          { 163            fprintf(stderr, " %s: bad accept " , __func__); 164          } 165         continue ; 166      } 167         do 168      { 169         for (i  =   0 ; i  <  MAX_EVENTS; i ++ ) 170          { 171             if (g_Events[i].status  ==   0 ) 172          { 173            fprintf(stderr, " function=%s,line=%d,listenfd=%d,currentindex=%d\n " ,__func__,__LINE__, * plistenFd,i); 174             break ; 175          } 176          } 177         if (i  ==  MAX_EVENTS) 178          { 179            fprintf(stderr, " max events:function=%s,line=%d,listenFd=%d\n " ,__func__,__LINE__, * plistenFd); 180             break ; 181          } 182         //  set nonblocking 183        fprintf(stderr, " set nonblocking:function=%s,line=%d,listenfd=%d\n " ,__func__,__LINE__, * plistenFd); 184         if (fcntl(nfd, F_SETFL, O_NONBLOCK)  <   0 break ; 185         //  add a read event for receive data 186        EventSet( & g_Events[i], nfd, 0 ); 187        EventAdd(g_epollFd, EPOLLIN | EPOLLET,  & g_Events[i]); 188        fprintf(stderr, " new conn[%s:%d][time:%d]\n " , inet_ntoa(sin.sin_addr), ntohs(sin.sin_port),( int ) g_Events[i].last_active); 189      } while ( 0 ); 190      } 191     return  NULL; 192  } 193  194  void   * epoll_wait_thread_work( void   * arg) 195  { 196    fprintf(stderr, " justin:function=%s,line=%d\n " ,__func__,__LINE__); 197     //  event loop 198     struct  epoll_event events[MAX_EVENTS]; 199  200     int  checkPos  =   0 ; 201     while ( 1 ){ 202       //  a simple timeout check here, every time 100, better to use a mini-heap, and add timer event 203       long  now  =  time(NULL); 204       int  i; 205       for (i  =   0 ; i  <   100 ; i ++ , checkPos ++ //  doesn't check listen fd 206        { 207       if (checkPos  ==  MAX_EVENTS) checkPos  =   0 //  recycle 208       if (g_Events[checkPos].status  !=   1 continue ; 209       long  duration  =  now  -  g_Events[checkPos].last_active; 210       if (duration  >=   60 //  60s timeout 211        { 212          close(g_Events[checkPos].fd); 213          fprintf(stderr, " [fd=%d] timeout[%d--%d].\n " ,( int ) g_Events[checkPos].fd,( int ) g_Events[checkPos].last_active, ( int )now); 214          EventDel(g_epollFd,  & g_Events[checkPos]); 215        } 216        } 217       //  wait for events to happen 218       int  fds  =  epoll_wait(g_epollFd, events, MAX_EVENTS,  1000 ); 219       if (fds  <   0 ){ 220        fprintf(stderr, " epoll_wait error, exit\n " ); 221         break ; 222      } 223       for (i  =   0 ; i  <  fds; i ++ ){ 224         struct  myevent_s  * ev  =  ( struct  myevent_s * )events[i].data.ptr; 225         if (events[i].events & EPOLLIN)  //  read event 226      { 227        sem_wait( & bin_sem_recv_fd_produce); 228        fprintf(stderr, " readEvent:function=%s,line=%d:fd=%d\n " ,__func__,__LINE__,ev -> fd); 229         // ev->call_back(ev->fd, events[i].events, ev->arg); 230         struct  QUEUE_RECV_FD_ITEM  * item; 231        item  =  malloc( sizeof ( struct  QUEUE_RECV_FD_ITEM)); 232        item -> ev  =  ev; 233        pthread_mutex_lock( & queue_recv_fd_mutex); 234        TAILQ_INSERT_TAIL( & queue_recv_fd_header,item,recv_fd_entries); 235        pthread_mutex_unlock( & queue_recv_fd_mutex); 236        sem_post( & bin_sem_recv_fd_consume); 237      } else   if (events[i].events & EPOLLOUT)  //  write event 238      { 239        sem_post( & bin_sem_send_fd_consume); 240        fprintf(stderr, " post send fd consume:function=%s,line=%d:fd=%d\n " ,__func__,__LINE__,ev -> fd); 241      } 242      } 243    } 244     return  NULL; 245  } 246  247  void   * recv_data_thread_work( void   * arg) 248  { 249     while ( 1 ) 250      { 251        sem_wait( & bin_sem_recv_fd_consume); 252        fprintf(stderr, " justin:function=%s,line=%d\n " ,__func__,__LINE__); 253         int  index  =  ( int )arg; 254        fprintf(stderr, " recv thread id is %d\n " ,index); 255        pthread_mutex_lock( & queue_recv_fd_mutex); 256         struct  QUEUE_RECV_FD_ITEM  * item; 257        item  =  TAILQ_FIRST( & queue_recv_fd_header); 258        TAILQ_REMOVE( & queue_recv_fd_header,item,recv_fd_entries);  259        pthread_mutex_unlock( & queue_recv_fd_mutex); 260        RecvData(item -> ev); 261      } 262     return  NULL; 263  } 264  265  void   * send_data_thread_work( void   * arg) 266  { 267     while ( 1 ) 268      {   269        sem_wait( & bin_sem_send_fd_consume); 270        fprintf(stderr, " justin:function=%s,line=%d\n " ,__func__,__LINE__); 271        pthread_mutex_lock( & queue_send_fd_mutex); 272         struct  QUEUE_SEND_FD_ITEM  * item; 273        item  =  TAILQ_FIRST( & queue_send_fd_header); 274        TAILQ_REMOVE( & queue_send_fd_header,item,send_fd_entries);  275        pthread_mutex_unlock( & queue_send_fd_mutex); 276        SendData(item -> ev); 277      } 278     return  NULL; 279  } 280  281  void   * logic_data_thread_work( void   * arg) 282  { 283     while ( 1 ) 284      { 285         // remove logic queue 286        sem_wait( & bin_sem_logic_data_consume); 287         // for test 288         int  index  =  ( int )arg; 289        fprintf(stderr, " logic thread id is %d\n " ,index); 290  291        pthread_mutex_lock( & queue_logic_data_mutex); 292         struct  QUEUE_LOGIC_DATA_ITEM  * item; 293        item  =  TAILQ_FIRST( & queue_logic_data_header); 294        TAILQ_REMOVE( & queue_logic_data_header,item,logic_data_entries);  295        pthread_mutex_unlock( & queue_logic_data_mutex); 296         // logic header 297         switch (item -> ev -> header.msg_type) 298      { 299       case  msg_lost: 300        { 301          rq_lost *  rq  =  (rq_lost * )item -> ev -> recv_buff; 302           303          rs_lost rs; 304          rs.header.msg_type  =  msg_lost; 305          rs.header.size  =   sizeof (rs_lost); 306          rs.header.length  =   0 ; 307  308           if (strcmp(rq -> cardno, " 12345 " ) == 0 ) 309            { 310          rs.is_ok  =   1 ;                         311            } 312           else 313            { 314          rs.is_ok  =   0 ; 315            } 316          memcpy( & item -> ev -> header, & rs.header, sizeof (msg_header)); 317          item -> ev -> send_len  =   sizeof (rs); 318          memcpy(item -> ev -> send_buff, & rs, sizeof (rs)); 319           break ; 320        } 321      } 322      323         // add to send fd queue 324        sem_wait( & bin_sem_send_fd_produce); 325        fprintf(stderr, " after wait send fd produce\n " ); 326         struct  QUEUE_SEND_FD_ITEM  * sendItem; 327        sendItem  =  malloc( sizeof ( struct  QUEUE_SEND_FD_ITEM)); 328        sendItem -> ev  =  item -> ev; 329        pthread_mutex_lock( & queue_send_fd_mutex); 330        TAILQ_INSERT_TAIL( & queue_send_fd_header,sendItem,send_fd_entries); 331        pthread_mutex_unlock( & queue_send_fd_mutex); 332         // let system known can send 333        EventAdd(g_epollFd, EPOLLOUT | EPOLLET, item -> ev); 334      } 335     return  NULL; 336  } 337  338  int  main( int  argc,  char   ** argv) 339  { 340     int  res; 341     // recv fd queue 342    TAILQ_INIT( & queue_recv_fd_header); 343    res  =  sem_init( & bin_sem_recv_fd_consume, 0 , 0 ); 344     if (res) 345      { 346        fprintf(stderr, " sem init consume failed\n " ); 347        exit(EXIT_FAILURE); 348      } 349  350    res  =  sem_init( & bin_sem_recv_fd_produce, 0 ,MAX_EVENTS); 351     if (res) 352      { 353        fprintf(stderr, " sem init produce failed\n " ); 354        exit(EXIT_FAILURE); 355      } 356  357    res  =  pthread_mutex_init( & queue_recv_fd_mutex,NULL); 358     if (res != 0 ) 359      { 360        perror( " create mutex for queue recv failed\n " ); 361        exit(EXIT_FAILURE); 362      } 363     // logic data queue 364    TAILQ_INIT( & queue_logic_data_header); 365    res  =  sem_init( & bin_sem_logic_data_consume, 0 , 0 ); 366     if (res) 367      { 368        fprintf(stderr, " sem init logic data consume failed\n " ); 369        exit(EXIT_FAILURE); 370      } 371  372    res  =  sem_init( & bin_sem_logic_data_produce, 0 ,MAX_EVENTS); 373     if (res) 374      { 375        fprintf(stderr, " sem init logic data produce failed\n " ); 376        exit(EXIT_FAILURE); 377      } 378  379    res  =  pthread_mutex_init( & queue_logic_data_mutex,NULL); 380     if (res != 0 ) 381      { 382        perror( " create mutex for queue logic data failed\n " ); 383        exit(EXIT_FAILURE); 384      } 385  386     // send fd queue 387    TAILQ_INIT( & queue_send_fd_header); 388    res  =  sem_init( & bin_sem_send_fd_consume, 0 , 0 ); 389     if (res) 390      { 391        fprintf(stderr, " sem init send fd consume failed\n " ); 392        exit(EXIT_FAILURE); 393      } 394  395    res  =  sem_init( & bin_sem_send_fd_produce, 0 ,MAX_EVENTS); 396     if (res) 397      { 398        fprintf(stderr, " sem init send fd produce failed\n " ); 399        exit(EXIT_FAILURE); 400      } 401  402    res  =  pthread_mutex_init( & queue_send_fd_mutex,NULL); 403     if (res != 0 ) 404      { 405        perror( " create mutex for queue send fd failed\n " ); 406        exit(EXIT_FAILURE); 407      } 408  409     short  port  =   3342 //  default port 410     if (argc  ==   2 ){ 411      port  =  atoi(argv[ 1 ]); 412    } 413     //  create epoll 414    g_epollFd  =  epoll_create(MAX_EVENTS); 415     if (g_epollFd  <=   0 416      { 417        fprintf(stderr, " create epoll failed:fd=%d:function=%s,line=%d\n " , g_epollFd,__func__,__LINE__); 418        exit(EXIT_FAILURE); 419      } 420     //  create & bind listen socket 421     int  listenFd  =  socket(AF_INET, SOCK_STREAM,  0 ); 422     //  bind & listen 423     struct  sockaddr_in sin; 424    bzero( & sin,  sizeof (sin)); 425    sin.sin_family  =  AF_INET; 426    sin.sin_addr.s_addr  =  INADDR_ANY; 427    sin.sin_port  =  htons(port); 428    bind(listenFd, ( const   struct  sockaddr * ) & sin,  sizeof (sin)); 429    listen(listenFd,  5 ); 430    fprintf(stderr, " server running:port[%d]\n " , port); 431     // create accept thread 432  433     void   * thread_result; 434    pthread_t accept_t;   435    res  =  pthread_create( & accept_t,NULL,accept_thread_work,( void   * ) & listenFd); 436     if (res  !=   0 ) 437      { 438        perror( " accept create failed\n " ); 439        exit(EXIT_FAILURE); 440      } 441  442     // create epoll wait thread 443    pthread_t epoll_wait_t; 444    res  =  pthread_create( & epoll_wait_t,NULL,epoll_wait_thread_work,NULL); 445     if (res  !=   0 ) 446      { 447        perror( " create epoll wait thread failed\n " ); 448        exit(EXIT_FAILURE); 449      } 450     // create two recv data thread 451    pthread_t recv_data_t; 452    res  =  pthread_create( & recv_data_t,NULL,recv_data_thread_work,( void * ) 1 ); 453     if (res != 0 ) 454      { 455        perror( " create recv data thread failed\n " ); 456        exit(EXIT_FAILURE); 457      } 458  459    pthread_t recv_data_t_1; 460    res  =  pthread_create( & recv_data_t_1,NULL,recv_data_thread_work,( void * ) 2 ); 461     if (res != 0 ) 462      { 463        perror( " create recv data thread failed\n " ); 464        exit(EXIT_FAILURE); 465      } 466     // create two send data thread 467    pthread_t send_data_t; 468    res  =  pthread_create( & send_data_t,NULL,send_data_thread_work,( void * ) 1 ); 469     if (res != 0 ) 470      { 471        perror( " create send data thread failed\n " ); 472        exit(EXIT_FAILURE); 473      } 474  475    pthread_t send_data_t_1; 476    res  =  pthread_create( & send_data_t_1,NULL,send_data_thread_work,( void * ) 2 ); 477     if (res != 0 ) 478      { 479        perror( " create send data thread failed\n " ); 480        exit(EXIT_FAILURE); 481      } 482  483     // create two logic work thread 484    pthread_t logic_work_t; 485    res  =  pthread_create( & logic_work_t,NULL,logic_data_thread_work,( void * ) 1 ); 486     if (res != 0 ) 487      { 488        perror( " create logic work thread failed\n " ); 489        exit(EXIT_FAILURE); 490      } 491  492   pthread_t logic_work_t_1; 493    res  =  pthread_create( & logic_work_t_1,NULL,logic_data_thread_work,( void * ) 2 ); 494     if (res != 0 ) 495      { 496        perror( " create logic work thread failed\n " ); 497        exit(EXIT_FAILURE); 498      } 499  500     // wait child thread 501    res  =  pthread_join(accept_t, & thread_result); 502     if (res != 0 ) 503      { 504        perror( " accept thread join failed\n " ); 505        exit(EXIT_FAILURE); 506      } 507  508     // wait child thread 509    res  =  pthread_join(epoll_wait_t, & thread_result); 510     if (res != 0 ) 511      { 512        perror( " epoll wait thread join failed\n " ); 513        exit(EXIT_FAILURE); 514      } 515  516     // wait child thread 517    res  =  pthread_join(recv_data_t, & thread_result); 518     if (res != 0 ) 519      { 520        perror( " recv data thread join failed\n " ); 521        exit(EXIT_FAILURE);       522      } 523     // wait child thread 524    res  =  pthread_join(recv_data_t_1, & thread_result); 525     if (res != 0 ) 526      { 527        perror( " recv data thread join failed\n " ); 528        exit(EXIT_FAILURE);       529      } 530  531     // wait child thread 532    res  =  pthread_join(send_data_t, & thread_result); 533     if (res != 0 ) 534      { 535        perror( " send data thread join failed\n " ); 536        exit(EXIT_FAILURE);       537      } 538     // wait child thread 539    res  =  pthread_join(send_data_t_1, & thread_result); 540     if (res != 0 ) 541      { 542        perror( " send data thread join failed\n " ); 543        exit(EXIT_FAILURE);       544      } 545     // wait child thread 546    res  =  pthread_join(logic_work_t, & thread_result); 547     if (res != 0 ) 548      { 549        perror( " logic work thread join failed\n " ); 550        exit(EXIT_FAILURE);       551      } 552     // wait child thread 553    res  =  pthread_join(logic_work_t_1, & thread_result); 554     if (res != 0 ) 555      { 556        perror( " logic work thread join failed\n " ); 557        exit(EXIT_FAILURE);       558      } 559     //  free resource 560    close(g_epollFd); 561    sem_destroy( & bin_sem_recv_fd_consume); 562    sem_destroy( & bin_sem_recv_fd_produce); 563    pthread_mutex_destroy( & queue_recv_fd_mutex); 564  565    sem_destroy( & bin_sem_logic_data_consume); 566    sem_destroy( & bin_sem_logic_data_produce); 567    pthread_mutex_destroy( & queue_logic_data_mutex); 568  569    sem_destroy( & bin_sem_send_fd_consume); 570    sem_destroy( & bin_sem_send_fd_produce); 571    pthread_mutex_destroy( & queue_send_fd_mutex); 572     return   0 ; 573 服务器头  1  #ifndef _epoll_h_  2  #define  _epoll_h_  3   4  #include  " sys/queue.h "  5  #include  < semaphore.h >  6  #include  " msg.h "  7   8  #define  MAX_EVENTS 500  9  10  int  g_epollFd; 11  12  void   * accept_thread_work( void   * arg); 13  void   * epoll_wait_thread_work( void   * arg); 14  void   * recv_data_thread_work( void   * arg); 15  void   * send_data_thread_work( void   * arg); 16  void   * logic_data_thread_work( void   * arg); 17  18  struct  myevent_s 19  { 20     int  fd; 21     int  status;  //  1: in epoll wait list, 0 not in 22    msg_header header; 23     char  recv_buff[ 256 ];  //  recv data buffer 24     int  recv_len; 25     char  send_buff[ 256 ]; // send data buffer 26     int  send_len; 27     long  last_active;  //  last active time 28  }; 29  30  struct  myevent_s g_Events[MAX_EVENTS + 1 ];  //  g_Events[MAX_EVENTS] is used by listen fd 31  32  // recv fd queue 33  struct  QUEUE_RECV_FD_ITEM{ 34     struct  myevent_s *  ev; 35    TAILQ_ENTRY(QUEUE_RECV_FD_ITEM) recv_fd_entries; 36  }; 37  38  TAILQ_HEAD(,QUEUE_RECV_FD_ITEM) queue_recv_fd_header; 39  40  sem_t bin_sem_recv_fd_produce; 41  sem_t bin_sem_recv_fd_consume; 42  43  pthread_mutex_t queue_recv_fd_mutex; 44  45  // send fd queue 46  struct  QUEUE_SEND_FD_ITEM{ 47     struct  myevent_s *  ev; 48    TAILQ_ENTRY(QUEUE_SEND_FD_ITEM) send_fd_entries; 49  }; 50  51  TAILQ_HEAD(,QUEUE_SEND_FD_ITEM) queue_send_fd_header; 52  53  sem_t bin_sem_send_fd_produce; 54  sem_t bin_sem_send_fd_consume; 55  56  pthread_mutex_t queue_send_fd_mutex; 57  58  // logic data buff 59  struct  QUEUE_LOGIC_DATA_ITEM{ 60     struct  myevent_s *  ev; 61    TAILQ_ENTRY(QUEUE_LOGIC_DATA_ITEM) logic_data_entries; 62  }; 63  64  TAILQ_HEAD(,QUEUE_LOGIC_DATA_ITEM) queue_logic_data_header; 65  66  sem_t bin_sem_logic_data_produce; 67  sem_t bin_sem_logic_data_consume; 68  69  pthread_mutex_t queue_logic_data_mutex; 70  71  #endif 72 

 

  }

 

转载于:https://www.cnblogs.com/nanshouyong326/archive/2010/12/21/1912894.html

相关资源:epoll多线程高并发服务器代码

最新回复(0)