ACE_TP_Reactor 同时注册READ WRITE只能收到output
最近在写一个跨平台的基于ACE的服务器socket中间件,用了ACE_TP_Reactor,同时注册了READ_MASK,WRIET_MASK,但handler只能收到hande_output回调,收不到handle_input回调,于是写了一个很简单的测试demo,发现还是同样的问题,代码如下:include "ace/Event_Handler.h"
#include "ace/SOCK_Acceptor.h"
#include "ace/SOCK_Stream.h"
#include "ace/Reactor.h"
#include "ace/OS.h"
#define USE_SELECT_REACTOR
#ifdef USE_SELECT_REACTOR
#include "ace/Select_Reactor.h"
#endif
#include <iostream>
class Acceptor : public ACE_Event_Handler
{
private:
~Acceptor() {}
public:
virtual int open(const ACE_INET_Addr & addr);
/// overrides from ACE_Event_Handler
virtual int handle_input(ACE_HANDLE handle = ACE_INVALID_HANDLE);
virtual int handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask);
virtual ACE_HANDLE get_handle() const;
private:
ACE_SOCK_Acceptor m_acceptor;
};
class Client_Handler : public ACE_Event_Handler
{
private:
~Client_Handler() {}
public:
Client_Handler();
virtual int open();
/// overrides from ACE_Event_Handler
virtual int handle_input(ACE_HANDLE handle = ACE_INVALID_HANDLE);
virtual int handle_output(ACE_HANDLE handle = ACE_INVALID_HANDLE);
virtual int handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask);
virtual ACE_HANDLE get_handle() const;
ACE_SOCK_Stream & peer ();
private:
ACE_SOCK_Stream m_stream;
char m_buffer;
};
static ACE_THR_FUNC_RETURN event_loop (void *arg) {
ACE_Reactor *reactor = static_cast<ACE_Reactor *> (arg);
reactor->owner (ACE_OS::thr_self ());
reactor->run_reactor_event_loop ();
return 0;
}
int main(int argc, char *argv[])
{
ACE_INET_Addr addr("0.0.0.0:4567");
#ifdef USE_SELECT_REACTOR
ACE_TP_Reactor select_reactor;
//ACE_Select_Reactorselect_reactor;
ACE_Reactor reactor(&select_reactor);
ACE_Reactor::instance(&reactor, 0);
#endif
Acceptor * acceptor = new Acceptor();
int ret = acceptor->open(addr);
if (ret == -1)
return -1;
const size_t N_THREADS = 4;
ACE_Thread_Manager::instance ()->spawn_n
(N_THREADS, event_loop, ACE_Reactor::instance ());
return ACE_Thread_Manager::instance ()->wait ();
// return ACE_Reactor::instance()->run_event_loop();
}
int g_totalConn = 0;
///
/// Implementation of class Acceptor
///
int Acceptor::open(const ACE_INET_Addr & addr)
{
int ret = m_acceptor.open(addr);
if (ret == -1)
return -1;
ret = ACE_Reactor::instance()->register_handler(this, ACCEPT_MASK);
return ret;
}
int Acceptor::handle_input(ACE_HANDLE handle)
{
ACE_INET_Addr remotePeer;
Client_Handler * client_handler = new Client_Handler();
int ret = m_acceptor.accept(client_handler->peer(), &remotePeer);
if (ret == -1)
{
client_handler->handle_close(ACE_INVALID_HANDLE, NULL_MASK);
return -1;
}
cout << "Received connection from " << remotePeer.get_host_name() << ":" << remotePeer.get_port_number() << endl;
return client_handler->open();
}
int Acceptor::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
{
m_acceptor.close();
delete this;
return -1; // return value ignored by reactor.
}
ACE_HANDLE Acceptor::get_handle() const
{
return this->m_acceptor.get_handle();
}
///
/// Implementation of class Client_Handler
///
Client_Handler::Client_Handler()
{
ACE_OS::memset(m_buffer, 0, sizeof(m_buffer));
}
int Client_Handler::open()
{
assert (m_stream.get_handle() != ACE_INVALID_HANDLE);
ACE_Time_Value timeout(1);
const char welcome [] = "Welcome to echo server";
m_stream.enable(ACE_NONBLOCK);
size_t size = m_stream.send(welcome, sizeof(welcome), &timeout);
if (size == 0)
return -1;
else if (size == -1)
{
assert (errno == EWOULDBLOCK);
}
m_stream.disable(ACE_NONBLOCK);
return ACE_Reactor::instance()->register_handler(this, READ_MASK | WRITE_MASK);
}
int Client_Handler::handle_input(ACE_HANDLE handle)
{
ACE_OS::memset(m_buffer, 0, sizeof(m_buffer));
m_stream.enable(ACE_NONBLOCK);
size_t size = m_stream.recv(m_buffer, sizeof(m_buffer));
if (size == 0) // connection loss
return -1;
else if (size == -1) // block
{
if (errno == EWOULDBLOCK)
return 0;
else
return -1;
}
// give us chance to reply to client
//ACE_Reactor::instance()->schedule_wakeup(this, WRITE_MASK);
if (size == sizeof(m_buffer)) // there may be more data available
return 1;
return 0;
}
int Client_Handler::handle_output(ACE_HANDLE handle)
{
// just echo
size_t size = sizeof(m_buffer) * (ACE_OS::strlen(m_buffer) + 1);
if (size > 1)
{
size_t num_bytes_sent = m_stream.send(m_buffer, size);
if (num_bytes_sent == 0) // connection loss
return -1;
else if (num_bytes_sent == -1) // block
{
assert (errno == EWOULDBLOCK);
}
if (num_bytes_sent == size)
{
//ACE_Reactor::instance()->cancel_wakeup(this, WRITE_MASK);
}
else
{
// only part of the buffer are sent, :(
char * temp = new char;
assert (temp != 0);
ACE_OS::memcpy(temp, m_buffer + num_bytes_sent, size - num_bytes_sent);
ACE_OS::memset(m_buffer, 0, sizeof(m_buffer));
ACE_OS::memcpy(m_buffer, temp, size - num_bytes_sent);
delete [] temp;
}
return 0;
}
else
{
//ACE_Reactor::instance()->cancel_wakeup(this, WRITE_MASK);
return 0;
}
}
int Client_Handler::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
{
if (close_mask == WRITE_MASK)
{
return 0;
}
else
{
ACE_INET_Addr remotePeer;
m_stream.get_remote_addr(remotePeer);
cout << "Disconnected: " << remotePeer.get_host_name() << ":" << remotePeer.get_port_number() << endl;
m_stream.close();
delete this;
return -1; // return value ignored by reactor
}
}
ACE_HANDLE Client_Handler::get_handle() const
{
return m_stream.get_handle();
}
ACE_SOCK_Stream & Client_Handler::peer ()
{
return this->m_stream;
}
学习一下,随便沙发 自己顶一下
虽然可以不注册WRITE_MASK,程序也能正常工作.但想知道下原因
望高人指点, 建议你看一下ACE下的example/C++NPv2/TP_Reactor_Logging_Server.cpp。这个有完整的TP_select实例。
另外如果要使用TP,不要在windows下测试,这个是基于linux下的。
页:
[1]