| 
 | 
 
     最近在写一个跨平台的基于ACE的服务器socket中间件,用了ACE_TP_Reactor,同时注册了READ_MASK,WRIET_MASK,但handler只能收到hande_output回调,收不到handle_input回调, 
于是写了一个很简单的测试demo,发现还是同样的问题,代码如下:- include "ace/Event_Handler.h"
 - #include "ace/SOCK_Acceptor.h"
 - #include "ace/SOCK_Stream.h"
 - #include "ace/Reactor.h"
 - #include "ace/OS.h"
 - #define USE_SELECT_REACTOR
 - #ifdef USE_SELECT_REACTOR
 - #include "ace/Select_Reactor.h"
 - #endif
 - #include <iostream>
 - class Acceptor : public ACE_Event_Handler
 - {
 - private:
 -         ~Acceptor() {}
 - public:
 -         virtual int open(const ACE_INET_Addr & addr);
 -         /// overrides from ACE_Event_Handler
 -         virtual int handle_input(ACE_HANDLE handle = ACE_INVALID_HANDLE);
 -         virtual int handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask);
 -         virtual ACE_HANDLE get_handle() const;
 - private:
 -         ACE_SOCK_Acceptor m_acceptor;
 - };
 - class Client_Handler : public ACE_Event_Handler
 - {
 - private:
 -         ~Client_Handler() {}
 - public:
 -         Client_Handler();
 -         virtual int open();
 -         /// overrides from ACE_Event_Handler
 -         virtual int handle_input(ACE_HANDLE handle = ACE_INVALID_HANDLE);
 -         virtual int handle_output(ACE_HANDLE handle = ACE_INVALID_HANDLE);
 -         virtual int handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask);
 -         virtual ACE_HANDLE get_handle() const;
 -         ACE_SOCK_Stream & peer ();
 - private:
 -         ACE_SOCK_Stream m_stream;
 -         char m_buffer[80];
 - };
 - static ACE_THR_FUNC_RETURN event_loop (void *arg) {
 -         ACE_Reactor *reactor = static_cast<ACE_Reactor *> (arg);
 -         reactor->owner (ACE_OS::thr_self ());
 -         reactor->run_reactor_event_loop ();
 -         return 0;
 - }
 - int main(int argc, char *argv[]) 
 - {
 -         ACE_INET_Addr addr("0.0.0.0:4567");
 - #ifdef USE_SELECT_REACTOR
 -         ACE_TP_Reactor select_reactor;
 -         //ACE_Select_Reactor  select_reactor;
 -         ACE_Reactor reactor(&select_reactor);
 -         ACE_Reactor::instance(&reactor, 0);
 - #endif
 -         Acceptor * acceptor = new Acceptor();
 -         int ret = acceptor->open(addr);
 -         if (ret == -1)
 -                 return -1;
 -         const size_t N_THREADS = 4;
 -         ACE_Thread_Manager::instance ()->spawn_n
 -                 (N_THREADS, event_loop, ACE_Reactor::instance ());
 -         return ACE_Thread_Manager::instance ()->wait ();
 - //        return ACE_Reactor::instance()->run_event_loop();
 - }
 - int g_totalConn = 0;
 - ///
 - /// Implementation of class Acceptor
 - /// 
 - int Acceptor::open(const ACE_INET_Addr & addr)
 - {
 -         int ret = m_acceptor.open(addr);
 -         if (ret == -1)
 -                 return -1;
 -         ret = ACE_Reactor::instance()->register_handler(this, ACCEPT_MASK);
 -         return ret;
 - }
 - int Acceptor::handle_input(ACE_HANDLE handle)
 - {
 -         ACE_INET_Addr remotePeer;
 -         Client_Handler * client_handler = new Client_Handler();
 -         int ret = m_acceptor.accept(client_handler->peer(), &remotePeer);
 -         if (ret == -1)
 -         {
 -                 client_handler->handle_close(ACE_INVALID_HANDLE, NULL_MASK);
 -                 return -1;
 -         }
 -         cout << "Received connection from " << remotePeer.get_host_name() << ":" << remotePeer.get_port_number() << endl;
 -         return client_handler->open();
 - }
 - int Acceptor::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
 - {
 -         m_acceptor.close();
 -         delete this;
 -         return -1; // return value ignored by reactor.
 - }
 - ACE_HANDLE Acceptor::get_handle() const
 - {
 -         return this->m_acceptor.get_handle();
 - }
 - ///
 - /// Implementation of class Client_Handler
 - /// 
 - Client_Handler::Client_Handler()
 - {
 -         ACE_OS::memset(m_buffer, 0, sizeof(m_buffer));
 - }
 - int Client_Handler::open()
 - {
 -         assert (m_stream.get_handle() != ACE_INVALID_HANDLE);
 -         ACE_Time_Value timeout(1);
 -         const char welcome [] = "Welcome to echo server";
 -         m_stream.enable(ACE_NONBLOCK);
 -         size_t size = m_stream.send(welcome, sizeof(welcome), &timeout);
 -         if (size == 0)
 -                 return -1;
 -         else if (size == -1)
 -         {
 -                 assert (errno == EWOULDBLOCK);
 -         }
 -         m_stream.disable(ACE_NONBLOCK);
 -         return ACE_Reactor::instance()->register_handler(this, READ_MASK | WRITE_MASK);
 - }
 - int Client_Handler::handle_input(ACE_HANDLE handle)
 - {
 -         ACE_OS::memset(m_buffer, 0, sizeof(m_buffer));
 -         m_stream.enable(ACE_NONBLOCK);
 -         size_t size = m_stream.recv(m_buffer, sizeof(m_buffer));
 -         if (size == 0) // connection loss
 -                 return -1;
 -         else if (size == -1) // block
 -         {
 -                 if (errno == EWOULDBLOCK)
 -                         return 0;
 -                 else 
 -                         return -1;
 -         }
 -         // give us chance to reply to client
 -         //ACE_Reactor::instance()->schedule_wakeup(this, WRITE_MASK);
 -         if (size == sizeof(m_buffer)) // there may be more data available
 -                 return 1;
 -         return 0;
 - }
 - int Client_Handler::handle_output(ACE_HANDLE handle)
 - {
 -         // just echo
 -         size_t size = sizeof(m_buffer[0]) * (ACE_OS::strlen(m_buffer) + 1);
 -         if (size > 1)
 -         {
 -                 size_t num_bytes_sent = m_stream.send(m_buffer, size);
 -                 if (num_bytes_sent == 0) // connection loss
 -                         return -1;
 -                 else if (num_bytes_sent == -1) // block
 -                 {
 -                         assert (errno == EWOULDBLOCK);
 -                 }
 -                 if (num_bytes_sent == size)
 -                 {
 -                         //ACE_Reactor::instance()->cancel_wakeup(this, WRITE_MASK);
 -                 }
 -                 else
 -                 {
 -                         // only part of the buffer are sent, :(
 -                         char * temp = new char[size - num_bytes_sent];
 -                         assert (temp != 0);
 -                         ACE_OS::memcpy(temp, m_buffer + num_bytes_sent, size - num_bytes_sent);
 -                         ACE_OS::memset(m_buffer, 0, sizeof(m_buffer));
 -                         ACE_OS::memcpy(m_buffer, temp, size - num_bytes_sent);
 -                         delete [] temp;
 -                 }
 -                 return 0;
 -         }
 -         else
 -         {
 -                 //ACE_Reactor::instance()->cancel_wakeup(this, WRITE_MASK);
 -                 return 0;
 -         }
 - }
 - int Client_Handler::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
 - {
 -         if (close_mask == WRITE_MASK)
 -         {
 -                 return 0;
 -         }
 -         else
 -         {
 -                 ACE_INET_Addr remotePeer;
 -                 m_stream.get_remote_addr(remotePeer);
 -                 cout << "Disconnected: " << remotePeer.get_host_name() << ":" << remotePeer.get_port_number() << endl;
 -                 m_stream.close();
 -                 delete this;
 -                 return -1; // return value ignored by reactor
 -         }
 - }
 - ACE_HANDLE Client_Handler::get_handle() const
 - {
 -         return m_stream.get_handle();
 - }
 - ACE_SOCK_Stream & Client_Handler::peer ()
 - {
 -         return this->m_stream;
 - }
 
  复制代码 |   
 
 
 
 |