找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 5188|回复: 1

Reactor框架写的一个简单udp服务器框架的问题

[复制链接]
发表于 2008-9-10 22:40:18 | 显示全部楼层 |阅读模式
我用ACE的Reactor框架写的一个简单udp服务器框架如下:我对其进行压力测试,结果发现问题很大。
当每秒接受的数据包到11000个(每个包1400字节,总共42K字节)左右时,服务器耗尽了所有的cpu,但内存占用非常少不到5%。pc是赛阳2.6,512M内存,系统为redhead9.0,2.6内核。各位看看问题出在什么地方!
以下内容为程序代码:
  1. #include "ace/OS_main.h"
  2. #include "ace/OS_NS_string.h"
  3. #include "ace/OS_NS_unistd.h"
  4. #include "ace/Reactor.h"
  5. #include "ace/Process.h"
  6. #include "ace/SOCK_Dgram.h"
  7. #include "ace/INET_Addr.h"
  8. #include "ace/Log_Msg.h"
  9. #include "ace/Thread_Manager.h"
  10. #include "ace/Task_T.h"
  11. #include "ace/Shared_Memory_Pool.h"
  12. //#include "ace/Process_Mutex.h"
  13. #include "ace/Malloc.h"
  14. #include "ace/Malloc_T.h"
  15. #include "ace/Shared_Memory_MM.h"
  16. #include "client.h"
  17. #define SERVER_PORT 10101
  18. #define POOLNAME "mypoll"
  19. typedef ACE_Malloc<ACE_SHARED_MEMORY_POOL, ACE_SYNCH_MUTEX> myalloc;
  20. static const size_t TASK_THREAD_POOL_SIZE = 5;
  21. myalloc shm_malloc(POOLNAME);
  22. static int packages = 0;
  23. static int total_bytes = 0;
  24. class Task_Worker: public ACE_Task<ACE_MT_SYNCH>
  25. {
  26.     public:
  27.         virtual int svc(void)
  28.         {
  29.             while(1)
  30.             {
  31.                 ACE_Message_Block *mb = NULL;
  32.                 if(this->getq(mb) == -1)
  33.                 {
  34.                     continue;
  35.                 }
  36.                 process_task(mb);
  37.             }
  38.             return 0;
  39.         }
  40.     private:
  41.         void process_task(ACE_Message_Block *mb)
  42.         {
  43.             PKG_HEADER header;
  44.             memcpy(&header, mb->rd_ptr(), sizeof(PKG_HEADER));
  45.             header.magic_num = ntohl(header.magic_num);
  46.             header.index = ntohl(header.index);
  47.             header.check_sum = ntohl(header.check_sum);
  48.             //printf("magic[%d]index[%d]checksum[%d]\n", header.magic_num, header.index, header.check_sum);
  49.             mb->release();
  50.         }
  51. };
  52. class Task_Manager: public ACE_Task<ACE_MT_SYNCH>
  53. {
  54.     public:
  55.         virtual int svc(void)
  56.         {
  57.             Task_Worker task_tp;
  58.             task_tp.activate(THR_NEW_LWP | THR_JOINABLE, TASK_THREAD_POOL_SIZE);
  59.             while(1)
  60.             {
  61.                 ACE_Message_Block *mb = NULL;
  62.                 if(this->getq(mb) < 0)
  63.                 {
  64.                     task_tp.msg_queue()->deactivate();
  65.                     task_tp.wait();
  66.                 }
  67.                 task_tp.putq(mb);
  68.             }
  69.             return 0;
  70.         }
  71. };
  72. class Dgram_Endpoint : public ACE_Event_Handler
  73. {
  74. public:
  75.   Task_Manager task_mgr;
  76.   Dgram_Endpoint (const ACE_INET_Addr &local_addr);
  77.   virtual ACE_HANDLE get_handle (void) const;
  78.   virtual int handle_input (ACE_HANDLE handle);
  79.   virtual int handle_timeout (const ACE_Time_Value & tv,const void *arg = 0);
  80.   virtual int handle_close (ACE_HANDLE handle,ACE_Reactor_Mask close_mask);
  81.   virtual int handle_signal (int signum, siginfo_t*, ucontext_t*);
  82.   int send (const char *buf, size_t len, const ACE_INET_Addr &);
  83. private:
  84.   ACE_SOCK_Dgram endpoint_;
  85. };
  86. int Dgram_Endpoint::send (const char *buf,size_t len,const ACE_INET_Addr &addr)
  87. {
  88.   return this->endpoint_.send (buf, len, addr);
  89. }
  90. Dgram_Endpoint::Dgram_Endpoint (const ACE_INET_Addr &local_addr)
  91.   : endpoint_(local_addr)
  92. {
  93.     task_mgr.activate();
  94. }
  95. ACE_HANDLE Dgram_Endpoint::get_handle (void) const
  96. {
  97.   return this->endpoint_.get_handle();
  98. }
  99. int Dgram_Endpoint::handle_close (ACE_HANDLE handle,ACE_Reactor_Mask)
  100. {
  101.     ACE_DEBUG((LM_DEBUG, "************handle_close***********\n"));
  102.     ACE_UNUSED_ARG (handle);
  103.     this->endpoint_.close();
  104.     delete this;
  105.     return 0;
  106. }
  107. static int is_heart_package(char *buf, size_t size)
  108. {
  109.     unsigned long data;
  110.     if (size<4)
  111.         return -1;
  112.     memcpy(&data, buf, sizeof(data));
  113.     data = ntohl(data);
  114.     if (data == 0x12345678)
  115.     {
  116.         //printf("yes it's heart beat package\n");
  117.         return 0;
  118.     }
  119.     else
  120.         return -1;
  121. }
  122. int Dgram_Endpoint::handle_input (ACE_HANDLE)
  123. {
  124.   char buf[BUFSIZ];
  125.   ACE_INET_Addr from_addr;
  126.   char address[32];
  127.   ssize_t nbytes = this->endpoint_.recv (buf, sizeof(buf), from_addr);
  128.   packages++;
  129.   total_bytes += nbytes;
  130.   
  131. #if 0
  132.   if (nbytes == -1)
  133.     ACE_ERROR ((LM_ERROR,"%p","handle_input error\n"));
  134.   else
  135.     ACE_DEBUG ((LM_DEBUG, "[%d]bytes from[%s] received:%s\n", nbytes, address, buf));
  136. #endif
  137.   if (0 == is_heart_package(buf, nbytes))
  138.   {
  139.       return 0;
  140.   }
  141.   ACE_Message_Block *mb = NULL;
  142.   //mb = shm_malloc.malloc(sizeof(ACE_Message_Block));
  143.   ACE_NEW_RETURN(mb, ACE_Message_Block(nbytes, ACE_Message_Block::MB_DATA, 0, buf), -1);
  144.   mb->wr_ptr(nbytes);
  145.   this->task_mgr.putq(mb);
  146.   return 0;
  147. }
  148. int Dgram_Endpoint::handle_timeout (const ACE_Time_Value &,const void *)
  149. {
  150.   ACE_DEBUG ((LM_DEBUG,"[%d] packages [%d]K bytes are received per secondt\n", packages, total_bytes/1024 ));
  151.   packages = 0;
  152.   total_bytes = 0;
  153.   return 0;
  154. }
  155. int Dgram_Endpoint::handle_signal (int signum, siginfo_t* siginfo, ucontext_t* context)
  156. {
  157.     return ACE_Event_Handler::handle_signal (signum, siginfo, context);
  158. }
  159. int ACE_TMAIN (int argc, ACE_TCHAR *argv[])
  160. {
  161.     ACE_INET_Addr local_addr(SERVER_PORT);
  162.     Dgram_Endpoint *endpoint;
  163.     ACE_NEW_RETURN (endpoint,Dgram_Endpoint (local_addr),-1);
  164.     if (ACE_Reactor::instance ()->register_handler(endpoint,ACE_Event_Handler::READ_MASK) == -1)
  165.     {
  166.         ACE_ERROR_RETURN ((LM_ERROR,"ACE_Reactor::register_handler"),-1);
  167.     }
  168.     ACE_Time_Value time_out(1);
  169.     ACE_Reactor::instance()->schedule_timer(endpoint, (void *)"time out",  ACE_Time_Value::zero, time_out);
  170. #if 0
  171.     if (-1 == ACE_Reactor::instance()->register_handler(SIGINT, endpoint))
  172.     {
  173.         ACE_ERROR_RETURN((LM_ERROR, "fail to register SIGINT handler"), -1);
  174.     }
  175. #endif
  176.     ACE_Reactor::instance()->run_event_loop();
  177.   return 0;
  178. }
复制代码
 楼主| 发表于 2008-9-10 22:40:27 | 显示全部楼层
除非用ACE_Dev_Poll_Reactor,各个平台的服务器端,都不推荐用Reactor框架。效率不高,不是ACE效率不高,而是各个操作系统实现中,这种模式下(select)效率很差。除非不用,否则没办法。

如果你要用Reactor,推荐你用ACE_Dev_Poll_Reactor,很简单,初始化的时候,替换默认的select_reactor即可。你可以试试EPoll的效率。或者如果能用AIO,试试Proactor(兼容性不够好)。

至于如何编译使用ACE_Dev_Poll_Reactor,本站有说明,照着做就可以了。
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-4-30 12:56 , Processed in 0.012526 second(s), 7 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表