找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 3990|回复: 0

[转帖]采用C++的ACE库实现的一个通用的udp通信服务器程序

[复制链接]
发表于 2008-7-13 22:43:55 | 显示全部楼层 |阅读模式
转自:[url=http://blog.csdn.net/itclock/archive/2006/08/08/1036647.aspx]http://blog.csdn.net/itclock/archive/2006/08/08/1036647.aspx[/url]
作者:itclock

采用C++的ACE库实现的一个通用的udp通信服务器程序
全部源代码如下:
  1. ACE_Server.cpp
  2. #include "ace/SOCK_Acceptor.h"
  3. #include "ace/Acceptor.h"
  4. #include "ace/Thread_Manager.h"
  5. #include "ace/TP_Reactor.h"
  6. #include "ace/Reactor.h"
  7. #include "ace/INET_Addr.h"
  8. #include "ace/OS.h"
  9. #include "Request_Handler.h"
  10. #include "Server.h"
  11. #include "Constants.h"
  12. using namespace ACE_Server;
  13. int main(int argc, char *argv[])
  14. {     
  15. ACE_INET_Addr local_addr(SERVER_PORT_NUM);
  16. Request_Handler *endpoint;
  17. ACE_NEW_RETURN (endpoint,
  18.   Request_Handler (local_addr),
  19.   -1);
  20. // Read data from other side.
  21. if (ACE_Reactor::instance ()->register_handler
  22.   (endpoint,
  23.   ACE_Event_Handler::READ_MASK) == -1)
  24.   ACE_ERROR_RETURN ((LM_ERROR,
  25.   "ACE_Reactor::register_handler"),
  26.   -1);
  27. Server server_tp;
  28. server_tp.activate(THR_NEW_LWP | THR_JOINABLE, SERVER_THREAD_POOL_SIZE);
  29. ACE_Thread_Manager::instance()->wait();
  30. return 0;
  31. }
  32. Constants.h
  33. #ifndef __CONSTANTS_H_
  34. #define __CONSTANTS_H_
  35. namespace ACE_Server
  36. {
  37. static const size_t SERVER_THREAD_POOL_SIZE = 5;   //进行数据接收的线程池大小
  38. static const size_t TASK_THREAD_POOL_SIZE = 5; //进行数据处理的线程池大小
  39. static const size_t BUFFER_SIZE = 4096;   //数据缓冲区大小
  40. static const size_t SERVER_PORT_NUM = 10101;   //服务器的通信端口号
  41. }
  42. #endif
  43. Server.h
  44. #ifndef __SERVER_H_
  45. #define __SERVER_H_
  46. #include "ace/Task.h"
  47. namespace ACE_Server
  48. {
  49. class Server: public ACE_Task_Base
  50. {
  51. public:
  52.   virtual int svc(void);
  53. };
  54. }
  55. #endif
  56. Server.cpp
  57. #include "ace/Reactor.h"
  58. #include "Server.h"
  59. namespace ACE_Server
  60. {
  61. int Server::svc(void)
  62. {
  63.   int result = ACE_Reactor::instance()->run_reactor_event_loop();
  64.   if(result == -1)
  65.    return -1;
  66.   return 0;
  67. }
  68. }
  69. Request_Handler.h
  70. #ifndef __REQUEST_HANDLER_H_
  71. #define __REQUEST_HANDLER_H_
  72. #include "ace/Svc_Handler.h"
  73. #include "ace/SOCK_Stream.h"
  74. #include "ace/SOCK_Dgram.h"
  75. #include "ace/Synch.h"
  76. #include "ace/Thread_Manager.h"
  77. #include "Task_Manager.h"
  78. namespace ACE_Server
  79. {
  80. class Request_Handler: public ACE_Event_Handler
  81. {
  82. public:
  83.   Request_Handler(const ACE_INET_Addr &local_addr);
  84.   virtual ACE_HANDLE get_handle (void) const;
  85.   virtual int handle_close (ACE_HANDLE handle,
  86.    ACE_Reactor_Mask close_mask);
  87.   virtual int handle_timeout (const ACE_Time_Value & tv,
  88.    const void *arg = 0);
  89. protected:
  90.   virtual int handle_input(ACE_HANDLE fd = ACE_INVALID_HANDLE);
  91. private:
  92.   static Task_Manager task_mgr;
  93.   ACE_SOCK_Dgram endpoint_;
  94. };
  95. }
  96. #endif
  97. Request_Handler.cpp
  98. #include "ace/OS.h"
  99. #include "ace/Message_Block.h"
  100. #include "ace/Thread_Manager.h"
  101. #include "ace/Svc_Handler.h"
  102. #include "ace/SOCK_Stream.h"
  103. #include "ace/Synch.h"
  104. #include "ace/Reactor.h"
  105. #include "Request_Handler.h"
  106. #include "Task_Manager.h"
  107. #include "Constants.h"
  108. namespace ACE_Server
  109. {
  110. Task_Manager Request_Handler::task_mgr;
  111. Request_Handler::Request_Handler(const ACE_INET_Addr &local_addr)
  112.   : endpoint_ (local_addr)
  113. {
  114.   //this->reactor(ACE_Reactor::instance());
  115.   task_mgr.activate();
  116. }
  117. ACE_HANDLE Request_Handler::get_handle (void) const
  118. {
  119.   return this->endpoint_.get_handle ();
  120. }
  121. int Request_Handler::handle_close (ACE_HANDLE handle,
  122.   ACE_Reactor_Mask)
  123. {
  124.   ACE_UNUSED_ARG (handle);
  125.   ACE_DEBUG ((LM_DEBUG,
  126.    "(%P|%t) handle_close\n"));
  127.   this->endpoint_.close ();
  128.   delete this;
  129.   return 0;
  130. }
  131. int Request_Handler::handle_timeout (const ACE_Time_Value &,
  132.   const void *)
  133. {
  134.   ACE_DEBUG ((LM_DEBUG,
  135.    "(%P|%t) timed out for endpoint\n"));
  136.   return 0;
  137. }
  138. int Request_Handler::handle_input(ACE_HANDLE fd)
  139. {
  140.   char buf[BUFSIZ];
  141.   ACE_INET_Addr from_addr;
  142.   ACE_DEBUG ((LM_DEBUG,
  143.    "(%P|%t) activity occurred on handle %d!\n",
  144.    this->endpoint_.get_handle ()));
  145.   ssize_t n = this->endpoint_.recv (buf,
  146.    sizeof buf,
  147.    from_addr);
  148.   if (n == -1)
  149.    ACE_ERROR ((LM_ERROR,
  150.    "%p\n",
  151.    "handle_input"));
  152.   else
  153.   {
  154.    ACE_DEBUG ((LM_DEBUG,
  155.     "(%P|%t) buf of size %d = %*s\n",
  156.     n,
  157.     n,
  158.     buf));
  159.    ACE_Message_Block *mb = NULL;
  160.    ACE_NEW_RETURN(mb, ACE_Message_Block(n, ACE_Message_Block::MB_DATA, 0, buf), -1);
  161.    mb->wr_ptr(n);
  162.    task_mgr.putq(mb);
  163.   }
  164.   return 0;
  165. }
  166. }
  167. Task_Manager.h
  168. #ifndef __TASK_MANAGER_H_
  169. #define __TASK_MANAGER_H_
  170. #include "ace/Task.h"
  171. #include "ace/Synch.h"
  172. namespace ACE_Server
  173. {
  174. class Task_Manager: public ACE_Task<ACE_MT_SYNCH>
  175. {
  176. public:
  177.   virtual int svc(void);
  178. };
  179. }
  180. #endif
  181. Task_Manager.cpp
  182. #include "ace/Message_Block.h"
  183. #include "Task_Manager.h"
  184. #include "Task_Worker.h"
  185. #include "Constants.h"
  186. namespace ACE_Server
  187. {
  188. int Task_Manager::svc(void)
  189. {
  190.   Task_Worker task_tp;
  191.   task_tp.activate(THR_NEW_LWP | THR_JOINABLE, TASK_THREAD_POOL_SIZE);
  192.   while(1)
  193.   {
  194.    ACE_Message_Block *mb = NULL;
  195.    if(this->getq(mb) < 0)
  196.    {
  197.     task_tp.msg_queue()->deactivate();
  198.     task_tp.wait();
  199.    }
  200.    task_tp.putq(mb);
  201.   }
  202.   return 0;
  203. }
  204. }
  205. Task_Worker.h
  206. #ifndef __TASK_WORKER_H_
  207. #define __TASK_WORKER_H_
  208. #include "ace/Task.h"
  209. #include "ace/Synch.h"
  210. #include "ace/Message_Block.h"
  211. namespace ACE_Server
  212. {
  213. class Task_Worker: public ACE_Task<ACE_MT_SYNCH>
  214. {
  215. public:
  216.   virtual int svc(void);
  217. private:
  218.   void process_task(ACE_Message_Block *mb);
  219. };
  220. }
  221. #endif
  222. Task_Worker.cpp
  223. #include "ace/OS.h"
  224. #include "ace/Message_Block.h"
  225. #include "Task_Worker.h"
  226. namespace ACE_Server
  227. {
  228. int Task_Worker::svc(void)
  229. {
  230.   while(1)
  231.   {
  232.    ACE_Message_Block *mb = NULL;
  233.    if(this->getq(mb) == -1)
  234.    {
  235.     continue;
  236.    }
  237.    process_task(mb);
  238.   }
  239.   return 0;
  240. }
  241. void Task_Worker::process_task(ACE_Message_Block *mb)
  242. {
  243.   //进行数据处理,数据的起始地址为mb->rd_ptr(),长度为mb->length()
  244.   ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%t) Processing task: %s length %d\n"), mb->rd_ptr(), mb->length()));
  245.   ACE_OS::sleep(3);  //模拟数据处理过程
  246.   mb->release();
  247. }
  248. }
复制代码
注意:代码请先读懂再测试,不要贸然使用,因为可能隐藏bug.
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-11-22 01:43 , Processed in 0.013512 second(s), 7 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表