找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 4487|回复: 0

使用epoll实现客户端UDP并发

[复制链接]
发表于 2008-9-21 15:22:55 | 显示全部楼层 |阅读模式
网络程序为了支持并发,可以采用select,多线程等技术.
但是对于select,readhat linux系统只支持最大1024个描述符.
因此要想同时并发超过1024,就无法使用select模式.
而使用多线程,并发数达到1000时将严重影响系统的性能.

而使用epoll可以避免以上的缺陷.
下面是一个使用epoll实现客户端UDP并发.是我为写压力测试程序而写的.
发送使用一个独立的线程,接收使用epoll调用.

在程序开始要先设置系统能打开的最大描述符限制.即setrlimt调用.

在linux readhat enterprise 4环境下测试通过。其它环境我没测过。

g++ -o  udp_epoll_c udp_epoll_c.cpp -lpthread

  1. /***************************************************************************
  2.                file:   udp_epoll_c.cpp
  3.               -------------------
  4.     begin                : 2006/01/17
  5.     copyright            : (C) 2005 by 张荐林
  6.     email                : zhangjianlin_8 at 126.com
  7. ***************************************************************************/
  8. /***************************************************************************
  9. *                                                                         *
  10. *   This program is free software; you can redistribute it and/or modify  *
  11. *   it under the terms of the GNU General Public License as published by  *
  12. *   the Free Software Foundation; either version 2 of the License, or     *
  13. *   (at your option) any later version.                                   *
  14. *                                                                         *
  15. ***************************************************************************/
  16. #include <errno.h>
  17. #include <iostream>
  18. #include <sys/socket.h>
  19. #include <sys/epoll.h>
  20. #include <netinet/in.h>
  21. #include <arpa/inet.h>
  22. #include <sys/socket.h>
  23. #include <sys/types.h>
  24. #include <netinet/in.h>
  25. #include <fcntl.h>
  26. #include <unistd.h>
  27. #include <stdio.h>
  28. #include <string>
  29. #include <sys/resource.h>
  30. #include <pthread.h>
  31. #include <vector>
  32. using namespace std;
  33. int Read(int fd,void *buffer,unsigned int length)
  34. {
  35. unsigned int nleft;
  36. int nread;
  37. char *ptr;
  38. ptr = (char *)buffer;
  39. nleft = length;
  40. while(nleft > 0)
  41. {
  42.   if((nread = read(fd, ptr, nleft))< 0)
  43.   {
  44.    if(errno == EINTR)
  45.     nread = 0;
  46.    else
  47.     return -1;
  48.   }
  49.   else if(nread == 0)
  50.   {
  51.    break;
  52.   }
  53.   nleft -= nread;
  54.   ptr += nread;
  55. }
  56. return length - nleft;
  57. }
  58. int Write(int fd,const void *buffer,unsigned int length)
  59. {
  60. unsigned int nleft;
  61. int nwritten;
  62. const char *ptr;
  63. ptr = (const char *)buffer;
  64. nleft = length;
  65. while(nleft > 0)
  66. {
  67.   if((nwritten = write(fd, ptr, nleft))<=0)
  68.   {
  69.    if(errno == EINTR)
  70.     nwritten=0;
  71.    else
  72.     return -1;
  73.   }
  74.   nleft -= nwritten;
  75.   ptr += nwritten;
  76. }
  77. return length;
  78. }
  79. int CreateThread(void *(*start_routine)(void *), void *arg = NULL, pthread_t *thread = NULL, pthread_attr_t *pAttr = NULL)
  80. {
  81. pthread_attr_t thr_attr;
  82. if(pAttr == NULL)
  83. {
  84.   pAttr = &thr_attr;
  85.   pthread_attr_init(pAttr);
  86.   pthread_attr_setstacksize(pAttr, 1024 * 1024);  // 1 M的堆栈
  87.    pthread_attr_setdetachstate(pAttr,  PTHREAD_CREATE_DETACHED);
  88. }
  89. pthread_t tid;
  90. if(thread == NULL)
  91. {
  92.   thread = &tid;
  93. }
  94. int r = pthread_create(thread, pAttr, start_routine, arg);
  95. pthread_attr_destroy(pAttr);
  96. return r;
  97. }
  98. static int SetRLimit()
  99. {
  100. struct rlimit rlim;
  101. rlim.rlim_cur = 20480;
  102. rlim.rlim_max = 20480;
  103. if (setrlimit(RLIMIT_NOFILE, &rlim) != 0)
  104. {
  105.   perror("setrlimit");
  106. }
  107. else
  108. {
  109.   printf("setrlimit ok\n");
  110. }
  111. return 0;
  112. }
  113. int setnonblocking(int sock)
  114. {
  115. int opts;
  116. opts=fcntl(sock,F_GETFL);
  117. if(opts<0)
  118. {
  119.   return -1;
  120. }
  121. opts = opts|O_NONBLOCK;
  122. if(fcntl(sock,F_SETFL,opts)<0)
  123. {
  124.   return -1;
  125. }   
  126. return 0;
  127. }
  128. int ConnectToUdperver(const char *host, unsigned short port)
  129. {
  130. int sock = socket(AF_INET, SOCK_DGRAM, 0);
  131. if(sock < 0)
  132. {
  133.   perror("socket");
  134.         return -1;
  135. }
  136. struct sockaddr_in addr;
  137. memset(&addr, 0, sizeof(addr));
  138. addr.sin_family = AF_INET;
  139. addr.sin_port = htons(port);
  140. addr.sin_addr.s_addr = inet_addr(host);
  141. if(connect(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0)
  142. {
  143.   perror("bind");
  144.   close(sock);
  145.      return -1;
  146. }
  147. return sock;
  148. }
  149. void *SendThread(void *arg)
  150. {
  151. vector<int> sockets;
  152. sockets = *((vector<int> *)arg);
  153. int n = 0;
  154. char data[1024];
  155. int i = 0;
  156. while(1)
  157. {
  158.   for(vector<int>::iterator itr = sockets.begin(), last = sockets.end(); itr != last; ++itr)
  159.   {
  160.    sprintf(data, "test data %d\n", i++);
  161.    n = Write(*itr, "hello", 5);
  162.    printf("socket %d write to server[ret = %d]:%s",*itr, n, data);  
  163.   }
  164.   sleep(1);
  165. }
  166. }
  167. int main(int argc, char **argv)
  168. {
  169. SetRLimit();
  170. printf("FD_SETSIZE= %d\n", FD_SETSIZE);
  171. if (argc != 3)
  172. {
  173.   printf("usage: %s <IPaddress> <PORT>\n", argv[0]);
  174.   return 1;
  175. }
  176. int epfd = epoll_create(20480);
  177. if(epfd < 0)
  178. {
  179.   perror("epoll_create");
  180.   return 1;
  181. }
  182. struct epoll_event event;
  183. struct epoll_event ev[20480];
  184. vector<int> sockets;
  185. for(int i = 0; i < 3000; i++)
  186. {
  187.   int sockfd = ConnectToUdperver(argv[1], (unsigned short)(atoi(argv[2])));
  188.   if(sockfd < 0)
  189.   {
  190.    printf("Cannot connect udp server %s %s\n", argv[1], argv[2]);
  191.    return 1;
  192.   }
  193.   
  194.   sockets.push_back(sockfd);
  195.   setnonblocking(sockfd);
  196.   event.data.fd = sockfd;
  197.      event.events = EPOLLIN|EPOLLET;
  198.      if(0 != epoll_ctl(epfd,EPOLL_CTL_ADD,sockfd,&event))
  199.   {
  200.    perror("epoll_ctl");
  201.   }
  202. }
  203. if(0 != CreateThread(SendThread, (void *)&sockets))
  204. {
  205.   perror("CreateThread");
  206.   return 2;
  207. }
  208. int nfds = 0;  
  209. while(1)
  210. {
  211.         nfds=epoll_wait(epfd,ev,20480,500);
  212.   if(nfds < 0)
  213.   {
  214.    perror("epoll_wait");
  215.    break;
  216.   }
  217.   else if(nfds == 0)
  218.   {
  219.    printf("epoll_wait timeout!\n");
  220.    continue;
  221.   }
  222.   for(int i = 0; i < nfds; i++)
  223.   {
  224.    if(ev[i].events & EPOLLIN)
  225.    {
  226.     printf("can read for %d now\n", ev[i].data.fd);
  227.     char data[1024] = {0};
  228.     int n = read(ev[i].data.fd, data, sizeof(data));
  229.     printf("Received %d bytes from server!\n", n);
  230.    }
  231.   }
  232. }
  233. for(vector<int>::iterator itr = sockets.begin(), last = sockets.end(); itr != last; itr++)
  234. {
  235.   close(*itr);
  236. }
  237. close(epfd);
  238. return 0;
  239. }
复制代码
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-11-22 08:29 , Processed in 0.016925 second(s), 7 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表