找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 5151|回复: 3

ACE_TP_Reactor 同时注册READ WRITE只能收到output

[复制链接]
发表于 2012-11-8 20:35:57 | 显示全部楼层 |阅读模式
     最近在写一个跨平台的基于ACE的服务器socket中间件,用了ACE_TP_Reactor,同时注册了READ_MASK,WRIET_MASK,但handler只能收到hande_output回调,收不到handle_input回调,
于是写了一个很简单的测试demo,发现还是同样的问题,代码如下:
  1. include "ace/Event_Handler.h"
  2. #include "ace/SOCK_Acceptor.h"
  3. #include "ace/SOCK_Stream.h"
  4. #include "ace/Reactor.h"
  5. #include "ace/OS.h"
  6. #define USE_SELECT_REACTOR
  7. #ifdef USE_SELECT_REACTOR
  8. #include "ace/Select_Reactor.h"
  9. #endif
  10. #include <iostream>
  11. class Acceptor : public ACE_Event_Handler
  12. {
  13. private:
  14.         ~Acceptor() {}
  15. public:
  16.         virtual int open(const ACE_INET_Addr & addr);
  17.         /// overrides from ACE_Event_Handler
  18.         virtual int handle_input(ACE_HANDLE handle = ACE_INVALID_HANDLE);
  19.         virtual int handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask);
  20.         virtual ACE_HANDLE get_handle() const;
  21. private:
  22.         ACE_SOCK_Acceptor m_acceptor;
  23. };
  24. class Client_Handler : public ACE_Event_Handler
  25. {
  26. private:
  27.         ~Client_Handler() {}
  28. public:
  29.         Client_Handler();
  30.         virtual int open();
  31.         /// overrides from ACE_Event_Handler
  32.         virtual int handle_input(ACE_HANDLE handle = ACE_INVALID_HANDLE);
  33.         virtual int handle_output(ACE_HANDLE handle = ACE_INVALID_HANDLE);
  34.         virtual int handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask);
  35.         virtual ACE_HANDLE get_handle() const;
  36.         ACE_SOCK_Stream & peer ();
  37. private:
  38.         ACE_SOCK_Stream m_stream;
  39.         char m_buffer[80];
  40. };
  41. static ACE_THR_FUNC_RETURN event_loop (void *arg) {
  42.         ACE_Reactor *reactor = static_cast<ACE_Reactor *> (arg);
  43.         reactor->owner (ACE_OS::thr_self ());
  44.         reactor->run_reactor_event_loop ();
  45.         return 0;
  46. }
  47. int main(int argc, char *argv[])
  48. {
  49.         ACE_INET_Addr addr("0.0.0.0:4567");
  50. #ifdef USE_SELECT_REACTOR
  51.         ACE_TP_Reactor select_reactor;
  52.         //ACE_Select_Reactor  select_reactor;
  53.         ACE_Reactor reactor(&select_reactor);
  54.         ACE_Reactor::instance(&reactor, 0);
  55. #endif
  56.         Acceptor * acceptor = new Acceptor();
  57.         int ret = acceptor->open(addr);
  58.         if (ret == -1)
  59.                 return -1;
  60.         const size_t N_THREADS = 4;
  61.         ACE_Thread_Manager::instance ()->spawn_n
  62.                 (N_THREADS, event_loop, ACE_Reactor::instance ());
  63.         return ACE_Thread_Manager::instance ()->wait ();
  64. //        return ACE_Reactor::instance()->run_event_loop();
  65. }
  66. int g_totalConn = 0;
  67. ///
  68. /// Implementation of class Acceptor
  69. ///
  70. int Acceptor::open(const ACE_INET_Addr & addr)
  71. {
  72.         int ret = m_acceptor.open(addr);
  73.         if (ret == -1)
  74.                 return -1;
  75.         ret = ACE_Reactor::instance()->register_handler(this, ACCEPT_MASK);
  76.         return ret;
  77. }
  78. int Acceptor::handle_input(ACE_HANDLE handle)
  79. {
  80.         ACE_INET_Addr remotePeer;
  81.         Client_Handler * client_handler = new Client_Handler();
  82.         int ret = m_acceptor.accept(client_handler->peer(), &remotePeer);
  83.         if (ret == -1)
  84.         {
  85.                 client_handler->handle_close(ACE_INVALID_HANDLE, NULL_MASK);
  86.                 return -1;
  87.         }
  88.         cout << "Received connection from " << remotePeer.get_host_name() << ":" << remotePeer.get_port_number() << endl;
  89.         return client_handler->open();
  90. }
  91. int Acceptor::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
  92. {
  93.         m_acceptor.close();
  94.         delete this;
  95.         return -1; // return value ignored by reactor.
  96. }
  97. ACE_HANDLE Acceptor::get_handle() const
  98. {
  99.         return this->m_acceptor.get_handle();
  100. }
  101. ///
  102. /// Implementation of class Client_Handler
  103. ///
  104. Client_Handler::Client_Handler()
  105. {
  106.         ACE_OS::memset(m_buffer, 0, sizeof(m_buffer));
  107. }
  108. int Client_Handler::open()
  109. {
  110.         assert (m_stream.get_handle() != ACE_INVALID_HANDLE);
  111.         ACE_Time_Value timeout(1);
  112.         const char welcome [] = "Welcome to echo server";
  113.         m_stream.enable(ACE_NONBLOCK);
  114.         size_t size = m_stream.send(welcome, sizeof(welcome), &timeout);
  115.         if (size == 0)
  116.                 return -1;
  117.         else if (size == -1)
  118.         {
  119.                 assert (errno == EWOULDBLOCK);
  120.         }
  121.         m_stream.disable(ACE_NONBLOCK);
  122.         return ACE_Reactor::instance()->register_handler(this, READ_MASK | WRITE_MASK);
  123. }
  124. int Client_Handler::handle_input(ACE_HANDLE handle)
  125. {
  126.         ACE_OS::memset(m_buffer, 0, sizeof(m_buffer));
  127.         m_stream.enable(ACE_NONBLOCK);
  128.         size_t size = m_stream.recv(m_buffer, sizeof(m_buffer));
  129.         if (size == 0) // connection loss
  130.                 return -1;
  131.         else if (size == -1) // block
  132.         {
  133.                 if (errno == EWOULDBLOCK)
  134.                         return 0;
  135.                 else
  136.                         return -1;
  137.         }
  138.         // give us chance to reply to client
  139.         //ACE_Reactor::instance()->schedule_wakeup(this, WRITE_MASK);
  140.         if (size == sizeof(m_buffer)) // there may be more data available
  141.                 return 1;
  142.         return 0;
  143. }
  144. int Client_Handler::handle_output(ACE_HANDLE handle)
  145. {
  146.         // just echo
  147.         size_t size = sizeof(m_buffer[0]) * (ACE_OS::strlen(m_buffer) + 1);
  148.         if (size > 1)
  149.         {
  150.                 size_t num_bytes_sent = m_stream.send(m_buffer, size);
  151.                 if (num_bytes_sent == 0) // connection loss
  152.                         return -1;
  153.                 else if (num_bytes_sent == -1) // block
  154.                 {
  155.                         assert (errno == EWOULDBLOCK);
  156.                 }
  157.                 if (num_bytes_sent == size)
  158.                 {
  159.                         //ACE_Reactor::instance()->cancel_wakeup(this, WRITE_MASK);
  160.                 }
  161.                 else
  162.                 {
  163.                         // only part of the buffer are sent, :(
  164.                         char * temp = new char[size - num_bytes_sent];
  165.                         assert (temp != 0);
  166.                         ACE_OS::memcpy(temp, m_buffer + num_bytes_sent, size - num_bytes_sent);
  167.                         ACE_OS::memset(m_buffer, 0, sizeof(m_buffer));
  168.                         ACE_OS::memcpy(m_buffer, temp, size - num_bytes_sent);
  169.                         delete [] temp;
  170.                 }
  171.                 return 0;
  172.         }
  173.         else
  174.         {
  175.                 //ACE_Reactor::instance()->cancel_wakeup(this, WRITE_MASK);
  176.                 return 0;
  177.         }
  178. }
  179. int Client_Handler::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
  180. {
  181.         if (close_mask == WRITE_MASK)
  182.         {
  183.                 return 0;
  184.         }
  185.         else
  186.         {
  187.                 ACE_INET_Addr remotePeer;
  188.                 m_stream.get_remote_addr(remotePeer);
  189.                 cout << "Disconnected: " << remotePeer.get_host_name() << ":" << remotePeer.get_port_number() << endl;
  190.                 m_stream.close();
  191.                 delete this;
  192.                 return -1; // return value ignored by reactor
  193.         }
  194. }
  195. ACE_HANDLE Client_Handler::get_handle() const
  196. {
  197.         return m_stream.get_handle();
  198. }
  199. ACE_SOCK_Stream & Client_Handler::peer ()
  200. {
  201.         return this->m_stream;
  202. }
复制代码
发表于 2012-11-8 23:47:33 | 显示全部楼层
学习一下,随便沙发
 楼主| 发表于 2012-11-9 00:34:32 | 显示全部楼层
自己顶一下
虽然可以不注册WRITE_MASK,程序也能正常工作.但想知道下原因
望高人指点,
发表于 2012-11-13 09:23:59 | 显示全部楼层
建议你看一下ACE下的example/C++NPv2/TP_Reactor_Logging_Server.cpp。这个有完整的TP_select实例。
另外如果要使用TP,不要在windows下测试,这个是基于linux下的。
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-12-22 16:35 , Processed in 0.014172 second(s), 5 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表