找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 4855|回复: 0

ACE高效PROACTOR编程框架一ClientHandle

[复制链接]
发表于 2008-9-10 22:36:36 | 显示全部楼层 |阅读模式
原文地址:[url=http://blog.vckbase.com/bastet/archive/2005/08/14/10865.aspx]http://blog.vckbase.com/bastet/archive/2005/08/14/10865.aspx[/url]
1、WIN32下面用proactor可以达到几乎RAW IOCP的效率,由于封装关系,应该是差那么一点。


客户端处理类的常规写法:
  1. //处理客户端连接消息
  2. class ClientHandler : public ACE_Service_Handler
  3. {
  4. public:
  5. /**构造函数
  6.   *
  7.   *
  8.   */
  9. ClientHandler(unsigned int client_recv_buf_size=SERVER_CLIENT_RECEIVE_BUF_SIZE)
  10.   :_read_msg_block(client_recv_buf_size),_io_count(0)
  11. {
  12. }
  13. ~ClientHandler(){}
  14. /**
  15.   *初始化,因为可能要用到ClientHandler内存池,而这个池又不一定要用NEW
  16.   */
  17. void init();
  18. /**清理函数,因为可能要用到内存池
  19.   *
  20.   */
  21. void fini();
  22. //检查是否超时的函数
  23. void check_time_out(time_t cur_time);
  24. public:
  25. /**客户端连接服务器成功后调用
  26.   *
  27.   * \param handle 套接字句柄
  28.   * \param &message_block 第一次读到的数据(未用)
  29.   */
  30. //由Acceptor来调用!!!
  31. virtual void open (ACE_HANDLE handle,ACE_Message_Block &message_block);
  32. /**处理网络读操作结束消息
  33.   *
  34.   * \param &result 读操作结果
  35.   */
  36. virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
  37. /**处理网络写操作结束消息
  38.   *
  39.   * \param &result 写操作结果
  40.   */
  41. virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
  42. private:
  43. //**生成一个网络读请求
  44.   *
  45.   * \param void
  46.   * \return 0-成功,-1失败
  47.   */
  48. int  initiate_read_stream  (void);
  49. /**生成一个写请求
  50.   *
  51.   * \param mb 待发送的数据
  52.   * \param nBytes 待发送数据大小
  53.   * \return 0-成功,-1失败
  54.   */
  55. int  initiate_write_stream (ACE_Message_Block & mb, size_t nBytes );
  56. /**
  57.   *
  58.   * \return 检查是否可以删除,用的是一个引用计数。每一个外出IO的时候+1,每一个IO成功后-1
  59.   */
  60. int check_destroy();
  61. //异步读
  62. ACE_Asynch_Read_Stream _rs;
  63. //异步写
  64. ACE_Asynch_Write_Stream _ws;
  65. //接收缓冲区只要一个就够了,因为压根就没想过要多读,我直到现在也不是很清楚为什么要多读,多读的话要考虑很多问题
  66. ACE_Message_Block _read_msg_block;
  67. //套接字句柄,这个可以不要了,因为基类就有个HANDLER在里面的。
  68. //ACE_HANDLE _handle;
  69. //一个锁,客户端反正有东东要锁的,注意,要用ACE_Recursive_Thread_Mutex而不是用ACE_Thread_Mutex,这里面是可以重入的,而且在WIN32下是直接的EnterCriticalSection,可以达到很高的效率
  70. ACE_Recursive_Thread_Mutex _lock;
  71. //在外IO数量,其实就是引用计数啦,没啥的。为0的时候就把这个东东关掉啦。
  72. long _io_count;
  73. //检查超时用的,过段时间没东东就CLOSE他了。
  74. time_t _last_net_io;
  75. private:
  76. //本来想用另外一种模型的,只用1个或者2个外出读,后来想想,反正一般内存都是足够的,就不管了。
  77. //ACE_Message_Block _send_msg_blocks[2];
  78. //ACE_Message_Block &_sending_msg_block;
  79. //ACE_Message_Block &_idle_msg_block;
  80. private:
  81. public:
  82. //TODO:move to prriva and use friend class!!!
  83. //只是为了效率更高,不用STL的LIST是因为到现在我没有可用的Node_Allocator,所以效率上会有问题。
  84. ClientHandler *_next;
  85. ClientHandler *next(){return _next;}
  86. void next(ClientHandler *obj){_next=obj;}
  87. };
  88. //这是具体实现,有些地方比较乱,懒得管了,锁的有些地方不对。懒得改了,反正在出错或者有瓶颈的时候再做也不迟。
  89. void ClientHandler::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
  90. {
  91. _last_net_io=ACE_OS::time(NULL);
  92. int byterecved=result.bytes_transferred ();
  93. if ( (result.success ()) && (byterecved != 0))
  94. {
  95.   //ACE_DEBUG ((LM_DEBUG,  "Receiver completed:%d\n",byterecved));
  96. //处理完数据
  97.   if(handle_received_data()==true)
  98.   {
  99.    //ACE_DEBUG ((LM_DEBUG,  "go on reading...\n"));
  100. //把东东推到头部,处理粘包
  101.    _read_msg_block.crunch();
  102.    initiate_read_stream();
  103.   }
  104. }
  105. //这个地方不想用ACE_Atom_op,因为反正要有一个锁,而且一般都会用锁,不管了。假如不在意的话,应该直接用ACE_Atom_Op以达到最好的效率
  106. {
  107.   ACE_Guard<ACE_Recursive_Thread_Mutex> locker (_lock);
  108.   _io_count--;
  109. }
  110. check_destroy ();
  111. }
  112. void ClientHandler::init()
  113. {
  114. //初始化数据,并不在构造函数里做。
  115. _last_net_io=ACE_OS::time(NULL);
  116. _read_msg_block.rd_ptr(_read_msg_block.base());
  117. _read_msg_block.wr_ptr(_read_msg_block.base());
  118. this->handle(ACE_INVALID_HANDLE);
  119. }
  120. bool ClientHandler::handle_received_data()
  121. {
  122. ...........自己处理
  123. return true;
  124. }
  125. //==================================================================
  126. void ClientHandler::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
  127. {
  128. //发送成功,RELEASE掉
  129. //这个不可能有多个RELEASE,直接XX掉
  130. //result.message_block ().release ();
  131. MsgBlockManager::get_instance().release_msg_block(&result.message_block());
  132. {
  133.   ACE_Guard<ACE_Recursive_Thread_Mutex> locker (_lock);
  134.   _io_count--;
  135. }
  136. check_destroy ();
  137. }
  138. //bool ClientHandler::destroy ()
  139. //{
  140. // FUNC_ENTER;
  141. // ClientManager::get_instance().release_client_handle(this);
  142. // FUNC_LEAVE;
  143. // return false ;
  144. //}
  145. int  ClientHandler::initiate_read_stream  (void)
  146. {
  147. ACE_Guard<ACE_Recursive_Thread_Mutex> locker (_lock);
  148. //考虑到粘包的呀
  149. if (_rs.read (_read_msg_block, _read_msg_block.space()) == -1)
  150. {
  151.   ACE_ERROR_RETURN ((LM_ERROR,"%p\n","ACE_Asynch_Read_Stream::read"),-1);
  152. }
  153. _io_count++;
  154. return 0;
  155. }
  156. /**生成一个写请求
  157. *
  158. * \param mb 待发送的数据
  159. * \param nBytes 待发送数据大小
  160. * \return 0-成功,-1失败
  161. */
  162. int  ClientHandler::initiate_write_stream (ACE_Message_Block & mb, size_t nBytes )
  163. {
  164. ACE_Guard<ACE_Recursive_Thread_Mutex> locker (_lock);
  165. if (_ws.write (mb , nBytes ) == -1)
  166. {
  167.   mb.release ();
  168.   ACE_ERROR_RETURN((LM_ERROR,"%p\n","ACE_Asynch_Write_File::write"),-1);
  169. }
  170. _io_count++;
  171. return 0;
  172. }
  173. void ClientHandler::open (ACE_HANDLE handle,ACE_Message_Block &message_block)
  174. {
  175. //FUNC_ENTER;
  176. _last_net_io=ACE_OS::time(NULL);
  177. _io_count=0;
  178. if(_ws.open(*this,this->handle())==-1)
  179. {
  180.   ACE_ERROR ((LM_ERROR,"%p\n","ACE_Asynch_Write_Stream::open"));
  181. }
  182. else if (_rs.open (*this, this->handle()) == -1)
  183. {
  184.   ACE_ERROR ((LM_ERROR,"%p\n","ACE_Asynch_Read_Stream::open"));
  185. }
  186. else
  187. {
  188.   initiate_read_stream ();
  189. }
  190. check_destroy();
  191. //FUNC_LEAVE;
  192. }
  193. void ClientHandler::fini()
  194. {
  195. }
  196. void ClientHandler::check_time_out(time_t cur_time)
  197. {
  198. //ACE_Guard<ACE_Recursive_Thread_Mutex> locker (_lock);
  199. //ACE_DEBUG((LM_DEBUG,"cur_time is %u,last io is %u\n",cur_time,_last_net_io));
  200. //检测是否已经为0了
  201. if(this->handle()==ACE_INVALID_HANDLE)
  202.   return;
  203. if(cur_time-_last_net_io>CLIENT_TIME_OUT_SECONDS)
  204. {
  205.   ACE_OS::shutdown(this->handle(),SD_BOTH);
  206.   ACE_OS::closesocket(this->handle());
  207.   this->handle(ACE_INVALID_HANDLE);
  208. }
  209. }
  210. int ClientHandler::check_destroy()
  211. {
  212. {
  213.   ACE_Guard<ACE_Recursive_Thread_Mutex> locker (_lock);
  214.   if (_io_count> 0)
  215.    return 1;
  216. }
  217. ACE_OS::shutdown(this->handle(),SD_BOTH);
  218. ACE_OS::closesocket(this->handle());
  219. this->handle(ACE_INVALID_HANDLE);
  220. //这个地方给内存池吧。
  221. ClientManager::get_instance().release_client_handle(this);
  222. //delete this;
  223. return 0;
  224. }
复制代码
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-12-4 16:10 , Processed in 0.015015 second(s), 5 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表