|
最近在写一个跨平台的基于ACE的服务器socket中间件,用了ACE_TP_Reactor,同时注册了READ_MASK,WRIET_MASK,但handler只能收到hande_output回调,收不到handle_input回调,
于是写了一个很简单的测试demo,发现还是同样的问题,代码如下:- include "ace/Event_Handler.h"
- #include "ace/SOCK_Acceptor.h"
- #include "ace/SOCK_Stream.h"
- #include "ace/Reactor.h"
- #include "ace/OS.h"
- #define USE_SELECT_REACTOR
- #ifdef USE_SELECT_REACTOR
- #include "ace/Select_Reactor.h"
- #endif
- #include <iostream>
- class Acceptor : public ACE_Event_Handler
- {
- private:
- ~Acceptor() {}
- public:
- virtual int open(const ACE_INET_Addr & addr);
- /// overrides from ACE_Event_Handler
- virtual int handle_input(ACE_HANDLE handle = ACE_INVALID_HANDLE);
- virtual int handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask);
- virtual ACE_HANDLE get_handle() const;
- private:
- ACE_SOCK_Acceptor m_acceptor;
- };
- class Client_Handler : public ACE_Event_Handler
- {
- private:
- ~Client_Handler() {}
- public:
- Client_Handler();
- virtual int open();
- /// overrides from ACE_Event_Handler
- virtual int handle_input(ACE_HANDLE handle = ACE_INVALID_HANDLE);
- virtual int handle_output(ACE_HANDLE handle = ACE_INVALID_HANDLE);
- virtual int handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask);
- virtual ACE_HANDLE get_handle() const;
- ACE_SOCK_Stream & peer ();
- private:
- ACE_SOCK_Stream m_stream;
- char m_buffer[80];
- };
- static ACE_THR_FUNC_RETURN event_loop (void *arg) {
- ACE_Reactor *reactor = static_cast<ACE_Reactor *> (arg);
- reactor->owner (ACE_OS::thr_self ());
- reactor->run_reactor_event_loop ();
- return 0;
- }
- int main(int argc, char *argv[])
- {
- ACE_INET_Addr addr("0.0.0.0:4567");
- #ifdef USE_SELECT_REACTOR
- ACE_TP_Reactor select_reactor;
- //ACE_Select_Reactor select_reactor;
- ACE_Reactor reactor(&select_reactor);
- ACE_Reactor::instance(&reactor, 0);
- #endif
- Acceptor * acceptor = new Acceptor();
- int ret = acceptor->open(addr);
- if (ret == -1)
- return -1;
- const size_t N_THREADS = 4;
- ACE_Thread_Manager::instance ()->spawn_n
- (N_THREADS, event_loop, ACE_Reactor::instance ());
- return ACE_Thread_Manager::instance ()->wait ();
- // return ACE_Reactor::instance()->run_event_loop();
- }
- int g_totalConn = 0;
- ///
- /// Implementation of class Acceptor
- ///
- int Acceptor::open(const ACE_INET_Addr & addr)
- {
- int ret = m_acceptor.open(addr);
- if (ret == -1)
- return -1;
- ret = ACE_Reactor::instance()->register_handler(this, ACCEPT_MASK);
- return ret;
- }
- int Acceptor::handle_input(ACE_HANDLE handle)
- {
- ACE_INET_Addr remotePeer;
- Client_Handler * client_handler = new Client_Handler();
- int ret = m_acceptor.accept(client_handler->peer(), &remotePeer);
- if (ret == -1)
- {
- client_handler->handle_close(ACE_INVALID_HANDLE, NULL_MASK);
- return -1;
- }
- cout << "Received connection from " << remotePeer.get_host_name() << ":" << remotePeer.get_port_number() << endl;
- return client_handler->open();
- }
- int Acceptor::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
- {
- m_acceptor.close();
- delete this;
- return -1; // return value ignored by reactor.
- }
- ACE_HANDLE Acceptor::get_handle() const
- {
- return this->m_acceptor.get_handle();
- }
- ///
- /// Implementation of class Client_Handler
- ///
- Client_Handler::Client_Handler()
- {
- ACE_OS::memset(m_buffer, 0, sizeof(m_buffer));
- }
- int Client_Handler::open()
- {
- assert (m_stream.get_handle() != ACE_INVALID_HANDLE);
- ACE_Time_Value timeout(1);
- const char welcome [] = "Welcome to echo server";
- m_stream.enable(ACE_NONBLOCK);
- size_t size = m_stream.send(welcome, sizeof(welcome), &timeout);
- if (size == 0)
- return -1;
- else if (size == -1)
- {
- assert (errno == EWOULDBLOCK);
- }
- m_stream.disable(ACE_NONBLOCK);
- return ACE_Reactor::instance()->register_handler(this, READ_MASK | WRITE_MASK);
- }
- int Client_Handler::handle_input(ACE_HANDLE handle)
- {
- ACE_OS::memset(m_buffer, 0, sizeof(m_buffer));
- m_stream.enable(ACE_NONBLOCK);
- size_t size = m_stream.recv(m_buffer, sizeof(m_buffer));
- if (size == 0) // connection loss
- return -1;
- else if (size == -1) // block
- {
- if (errno == EWOULDBLOCK)
- return 0;
- else
- return -1;
- }
- // give us chance to reply to client
- //ACE_Reactor::instance()->schedule_wakeup(this, WRITE_MASK);
- if (size == sizeof(m_buffer)) // there may be more data available
- return 1;
- return 0;
- }
- int Client_Handler::handle_output(ACE_HANDLE handle)
- {
- // just echo
- size_t size = sizeof(m_buffer[0]) * (ACE_OS::strlen(m_buffer) + 1);
- if (size > 1)
- {
- size_t num_bytes_sent = m_stream.send(m_buffer, size);
- if (num_bytes_sent == 0) // connection loss
- return -1;
- else if (num_bytes_sent == -1) // block
- {
- assert (errno == EWOULDBLOCK);
- }
- if (num_bytes_sent == size)
- {
- //ACE_Reactor::instance()->cancel_wakeup(this, WRITE_MASK);
- }
- else
- {
- // only part of the buffer are sent, :(
- char * temp = new char[size - num_bytes_sent];
- assert (temp != 0);
- ACE_OS::memcpy(temp, m_buffer + num_bytes_sent, size - num_bytes_sent);
- ACE_OS::memset(m_buffer, 0, sizeof(m_buffer));
- ACE_OS::memcpy(m_buffer, temp, size - num_bytes_sent);
- delete [] temp;
- }
- return 0;
- }
- else
- {
- //ACE_Reactor::instance()->cancel_wakeup(this, WRITE_MASK);
- return 0;
- }
- }
- int Client_Handler::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
- {
- if (close_mask == WRITE_MASK)
- {
- return 0;
- }
- else
- {
- ACE_INET_Addr remotePeer;
- m_stream.get_remote_addr(remotePeer);
- cout << "Disconnected: " << remotePeer.get_host_name() << ":" << remotePeer.get_port_number() << endl;
- m_stream.close();
- delete this;
- return -1; // return value ignored by reactor
- }
- }
- ACE_HANDLE Client_Handler::get_handle() const
- {
- return m_stream.get_handle();
- }
- ACE_SOCK_Stream & Client_Handler::peer ()
- {
- return this->m_stream;
- }
复制代码 |
|