leowang 发表于 2012-11-8 20:35:57

ACE_TP_Reactor 同时注册READ WRITE只能收到output

   最近在写一个跨平台的基于ACE的服务器socket中间件,用了ACE_TP_Reactor,同时注册了READ_MASK,WRIET_MASK,但handler只能收到hande_output回调,收不到handle_input回调,
于是写了一个很简单的测试demo,发现还是同样的问题,代码如下:include "ace/Event_Handler.h"
#include "ace/SOCK_Acceptor.h"
#include "ace/SOCK_Stream.h"
#include "ace/Reactor.h"
#include "ace/OS.h"

#define USE_SELECT_REACTOR

#ifdef USE_SELECT_REACTOR
#include "ace/Select_Reactor.h"
#endif

#include <iostream>

class Acceptor : public ACE_Event_Handler
{
private:
        ~Acceptor() {}
public:
        virtual int open(const ACE_INET_Addr & addr);

        /// overrides from ACE_Event_Handler
        virtual int handle_input(ACE_HANDLE handle = ACE_INVALID_HANDLE);
        virtual int handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask);
        virtual ACE_HANDLE get_handle() const;

private:
        ACE_SOCK_Acceptor m_acceptor;
};

class Client_Handler : public ACE_Event_Handler
{
private:
        ~Client_Handler() {}
public:
        Client_Handler();
        virtual int open();

        /// overrides from ACE_Event_Handler
        virtual int handle_input(ACE_HANDLE handle = ACE_INVALID_HANDLE);
        virtual int handle_output(ACE_HANDLE handle = ACE_INVALID_HANDLE);
        virtual int handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask);
        virtual ACE_HANDLE get_handle() const;

        ACE_SOCK_Stream & peer ();

private:
        ACE_SOCK_Stream m_stream;
        char m_buffer;
};

static ACE_THR_FUNC_RETURN event_loop (void *arg) {
        ACE_Reactor *reactor = static_cast<ACE_Reactor *> (arg);

        reactor->owner (ACE_OS::thr_self ());
        reactor->run_reactor_event_loop ();
        return 0;
}

int main(int argc, char *argv[])
{
        ACE_INET_Addr addr("0.0.0.0:4567");

#ifdef USE_SELECT_REACTOR
        ACE_TP_Reactor select_reactor;
        //ACE_Select_Reactorselect_reactor;
        ACE_Reactor reactor(&select_reactor);
        ACE_Reactor::instance(&reactor, 0);
#endif

        Acceptor * acceptor = new Acceptor();
        int ret = acceptor->open(addr);
        if (ret == -1)
                return -1;

        const size_t N_THREADS = 4;
        ACE_Thread_Manager::instance ()->spawn_n
                (N_THREADS, event_loop, ACE_Reactor::instance ());

        return ACE_Thread_Manager::instance ()->wait ();
//        return ACE_Reactor::instance()->run_event_loop();
}

int g_totalConn = 0;
///
/// Implementation of class Acceptor
///
int Acceptor::open(const ACE_INET_Addr & addr)
{
        int ret = m_acceptor.open(addr);
        if (ret == -1)
                return -1;

        ret = ACE_Reactor::instance()->register_handler(this, ACCEPT_MASK);
        return ret;
}

int Acceptor::handle_input(ACE_HANDLE handle)
{
        ACE_INET_Addr remotePeer;
        Client_Handler * client_handler = new Client_Handler();
        int ret = m_acceptor.accept(client_handler->peer(), &remotePeer);
        if (ret == -1)
        {
                client_handler->handle_close(ACE_INVALID_HANDLE, NULL_MASK);
                return -1;
        }

        cout << "Received connection from " << remotePeer.get_host_name() << ":" << remotePeer.get_port_number() << endl;

        return client_handler->open();
}

int Acceptor::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
{
        m_acceptor.close();
        delete this;
        return -1; // return value ignored by reactor.
}

ACE_HANDLE Acceptor::get_handle() const
{
        return this->m_acceptor.get_handle();
}

///
/// Implementation of class Client_Handler
///
Client_Handler::Client_Handler()
{
        ACE_OS::memset(m_buffer, 0, sizeof(m_buffer));
}

int Client_Handler::open()
{
        assert (m_stream.get_handle() != ACE_INVALID_HANDLE);
        ACE_Time_Value timeout(1);
        const char welcome [] = "Welcome to echo server";
        m_stream.enable(ACE_NONBLOCK);
        size_t size = m_stream.send(welcome, sizeof(welcome), &timeout);
        if (size == 0)
                return -1;
        else if (size == -1)
        {
                assert (errno == EWOULDBLOCK);
        }

        m_stream.disable(ACE_NONBLOCK);

        return ACE_Reactor::instance()->register_handler(this, READ_MASK | WRITE_MASK);
}

int Client_Handler::handle_input(ACE_HANDLE handle)
{
        ACE_OS::memset(m_buffer, 0, sizeof(m_buffer));
        m_stream.enable(ACE_NONBLOCK);
        size_t size = m_stream.recv(m_buffer, sizeof(m_buffer));
        if (size == 0) // connection loss
                return -1;
        else if (size == -1) // block
        {
                if (errno == EWOULDBLOCK)
                        return 0;
                else
                        return -1;
        }

        // give us chance to reply to client
        //ACE_Reactor::instance()->schedule_wakeup(this, WRITE_MASK);

        if (size == sizeof(m_buffer)) // there may be more data available
                return 1;

        return 0;
}

int Client_Handler::handle_output(ACE_HANDLE handle)
{
        // just echo
        size_t size = sizeof(m_buffer) * (ACE_OS::strlen(m_buffer) + 1);
        if (size > 1)
        {
                size_t num_bytes_sent = m_stream.send(m_buffer, size);
                if (num_bytes_sent == 0) // connection loss
                        return -1;
                else if (num_bytes_sent == -1) // block
                {
                        assert (errno == EWOULDBLOCK);
                }

                if (num_bytes_sent == size)
                {
                        //ACE_Reactor::instance()->cancel_wakeup(this, WRITE_MASK);
                }
                else
                {
                        // only part of the buffer are sent, :(
                        char * temp = new char;
                        assert (temp != 0);
                        ACE_OS::memcpy(temp, m_buffer + num_bytes_sent, size - num_bytes_sent);
                        ACE_OS::memset(m_buffer, 0, sizeof(m_buffer));
                        ACE_OS::memcpy(m_buffer, temp, size - num_bytes_sent);
                        delete [] temp;
                }

                return 0;
        }
        else
        {
                //ACE_Reactor::instance()->cancel_wakeup(this, WRITE_MASK);
                return 0;
        }
}

int Client_Handler::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
{
        if (close_mask == WRITE_MASK)
        {
                return 0;
        }
        else
        {
                ACE_INET_Addr remotePeer;
                m_stream.get_remote_addr(remotePeer);
                cout << "Disconnected: " << remotePeer.get_host_name() << ":" << remotePeer.get_port_number() << endl;
                m_stream.close();
                delete this;
                return -1; // return value ignored by reactor
        }
}

ACE_HANDLE Client_Handler::get_handle() const
{
        return m_stream.get_handle();
}

ACE_SOCK_Stream & Client_Handler::peer ()
{
        return this->m_stream;
}


tulipcaicai 发表于 2012-11-8 23:47:33

学习一下,随便沙发

leowang 发表于 2012-11-9 00:34:32

自己顶一下
虽然可以不注册WRITE_MASK,程序也能正常工作.但想知道下原因
望高人指点,

freeeyes 发表于 2012-11-13 09:23:59

建议你看一下ACE下的example/C++NPv2/TP_Reactor_Logging_Server.cpp。这个有完整的TP_select实例。
另外如果要使用TP,不要在windows下测试,这个是基于linux下的。
页: [1]
查看完整版本: ACE_TP_Reactor 同时注册READ WRITE只能收到output