这是我从工程里抽出的测试代码.两个文件,
test_server.cpp
#include "test_server.h"
#include "ace/High_Res_Timer.h"
COnline_Server_Handler::COnline_Server_Handler(ACE_Reactor *reactor,
const ACE_Time_Value time_out = ACE_Time_Value(3600))
: ACE_Event_Handler(reactor), m_Peer(), m_LastTime(0), m_TimeOut(time_out),
m_TimerId(0)
{
this->reference_counting_policy().value(Reference_Counting_Policy::ENABLED);
//pthread_mutex_init(&m_ThreadMutex, NULL);
};
COnline_Server_Handler::~COnline_Server_Handler()
{
int nRet = 0;
nRet = this->reactor()->purge_pending_notifications(this);
ACE_DEBUG((LM_DEBUG, ACE_TEXT("server handler delete nRet[%d]\n"),nRet));
this->peer().close();
return;
}
int COnline_Server_Handler::open (void)
{
int nRet = 0;
ACE_INET_Addr iRemoteAddr;
ACE_TCHAR cAddr[32];
//ACE_TCHAR cHostBuff[64];
//pthread_mutex_lock(&m_ThreadMutex);
peer().get_remote_addr(iRemoteAddr);
memset(cAddr, 0, sizeof(ACE_TCHAR)*32);
iRemoteAddr.addr_to_string(cAddr, 32, TRUE);
#if 0
memset(cHostBuff, 0, sizeof(ACE_TCHAR)*64);
iRemoteAddr.addr_to_string(cHostBuff, 64, FALSE);
#endif /* 0 */
ACE_DEBUG((LM_DEBUG, ACE_TEXT("server acceptor client Ip[%s]\n"), cAddr));
m_Mask = 0;
nRet = this->reactor()->register_handler(this, ACE_Event_Handler::READ_MASK);
if(0 == nRet)
{
ACE_Time_Value reschedule(m_TimeOut.sec()/4);
m_TimerId = this->reactor()->schedule_timer(this, NULL, m_TimeOut, reschedule);
ACE_SET_BITS(m_Mask, ACE_Event_Handler::READ_MASK);
//this->peer().enable(ACE_NONBLOCK);
}
else
{
m_TimerId = 0;
ACE_DEBUG((LM_ERROR, ACE_TEXT("The server is over flow!!\n")));
}
//pthread_mutex_unlock(&m_ThreadMutex);
return nRet;
}
int COnline_Server_Handler::handle_input(ACE_HANDLE)
{
int nRet = 0;
char buf[8192];
memset(buf, 0, sizeof(buf));
m_LastTime = this->reactor()->timer_queue()->gettimeofday();
nRet = peer().recv(buf, sizeof(buf));
if(nRet > 0)
{
ACE_DEBUG((LM_DEBUG, ACE_TEXT("recv [%s]\n"), buf));
nRet = 0;
}
else if(nRet <= 0)
{
nRet = -1;
}
return nRet;
}
int COnline_Server_Handler::handle_output(ACE_HANDLE)
{
return 0;
}
int COnline_Server_Handler::handle_timeout(const ACE_Time_Value &now, const void *)
{
int nRet = 0;
//pthread_mutex_lock(&m_ThreadMutex);
ACE_DEBUG((LM_DEBUG, ACE_TEXT("server handler timeout\n")));
if((now - m_LastTime) >= m_TimeOut)
{
m_Mask = 0;
nRet = -1;
//this->reactor()->remove_handler(this, nMask);
}
//pthread_mutex_unlock(&m_ThreadMutex);
return nRet;
}
int COnline_Server_Handler::handle_close(ACE_HANDLE handle,
ACE_Reactor_Mask close_mask)
{
int nRet = 0;
//pthread_mutex_lock(&m_ThreadMutex);
if (close_mask == ACE_Event_Handler::WRITE_MASK)
{
ACE_CLR_BITS(m_Mask, ACE_Event_Handler::WRITE_MASK);
}
if(close_mask == ACE_Event_Handler::READ_MASK)
{
ACE_CLR_BITS(m_Mask, ACE_Event_Handler::READ_MASK);
}
if(0 == m_Mask)
{
ACE_DEBUG((LM_DEBUG, ACE_TEXT("server handler exit\n")));
if(this->get_handle() != ACE_INVALID_HANDLE)
{
#if 1
if(0 != m_TimerId)
{
this->reactor()->cancel_timer(m_TimerId, NULL, 1);
}
#endif /* 0 */
this->reactor()->remove_handler(this, ACE_Event_Handler::ALL_EVENTS_MASK|ACE_Event_Handler::DONT_CALL);
this->remove_reference();
//delete this;
nRet = -1; // return value ignored by reactor
}
}
(void)handle;
//pthread_mutex_unlock(&m_ThreadMutex);
return nRet;
}
COnline_Acceptor::COnline_Acceptor (ACE_Reactor * r = ACE_Reactor::instance ())
: ACE_Event_Handler(r)
{
this->reference_counting_policy().value(Reference_Counting_Policy::ENABLED);
};
COnline_Acceptor::~COnline_Acceptor ()
{
ACE_DEBUG((LM_DEBUG, ACE_TEXT("COnline_Acceptor delete\n")));
};
int COnline_Acceptor::open (const ACE_INET_Addr & local_addr)
{
if (m_Acceptor.open (local_addr, 1) == -1)
{
return -1;
}
return this->reactor()->register_handler
(this, ACE_Event_Handler::ACCEPT_MASK);
}
int COnline_Acceptor::handle_input (ACE_HANDLE)
{
COnline_Server_Handler *peer_handler = NULL;
peer_handler = new COnline_Server_Handler(this->reactor());
if (m_Acceptor.accept (peer_handler->peer ()) == -1)
{
peer_handler->handle_close ();
}
else if (peer_handler->open () == -1)
{
peer_handler->handle_close ();
}
return 0;
}
int COnline_Acceptor::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
{
ACE_DEBUG((LM_ERROR, ACE_TEXT("server acceptor handler exit!\n")));
m_Acceptor.close ();
this->remove_reference();
//delete this; /// Segmentation fault!why?
return 0;
}
static ACE_THR_FUNC_RETURN Stb_Thread(void *arg)
{
ACE_Reactor *reactor = ACE_static_cast(ACE_Reactor *, arg);
reactor->owner(ACE_OS::thr_self());
// ACE_LOG_MSG->open ("route_server", ACE_Log_Msg::STDERR|ACE_Log_Msg::MSG_CALLBACK);
// ACE_LOG_MSG->msg_callback(CRoute_Server::Instance()->m_LogCallBack);
reactor->restart(true);
reactor->run_reactor_event_loop();
return 0;
}
int ACE_TMAIN (int argc, ACE_TCHAR * argv[])
{
ACE_TP_Reactor *tp_reactor = new ACE_TP_Reactor(3000);
ACE_Reactor *m_StbReactor = new ACE_Reactor(tp_reactor);
ACE_High_Res_Timer::global_scale_factor();
m_StbReactor->timer_queue()->gettimeofday(&ACE_High_Res_Timer::gettimeofday_hr);
COnline_Acceptor* m_StbAcceptor = NULL;
ACE_NEW_RETURN (m_StbAcceptor, COnline_Acceptor( m_StbReactor), -1);
ACE_INET_Addr iStbPort("localhost:4008");
if (m_StbAcceptor->open(iStbPort) == -1)
{
m_StbReactor->end_reactor_event_loop();
ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("%p\n"), ACE_TEXT("accept")), -1);
}
ACE_Thread_Manager::instance()->spawn_n(20, Stb_Thread, m_StbReactor);
ACE_Thread_Manager::instance()->wait();
(void)argc;
(void)argv;
return 0;
}
/* EOF-------------------------------------------------------------------*/
test_server.h
/**
*
* Copyright (C), 2010-2015, AV Frontier Tech. Co., Ltd.
*
* @file acceptor_event_handler.h
*
* @author D43ZWW
*
* @date £º 2010.01.29
*
* @brief communicate event server handler used by ACE reactor
*
*/
#ifndef __ONLINE_EVENT_HANDLER_H
#define __ONLINE_EVENT_HANDLER_H
#include "test_basic.h"
#include "test_type.h"
/**
* @brief
* handler the server event
*
* date 2010.01.29
* author D43ZWW
*/
class COnline_Server_Handler : public ACE_Event_Handler
{
public:
ACE_Reactor_Mask m_Mask;
int m_Close;
private:
ACE_SOCK_Stream m_Peer;
ACE_Time_Value m_LastTime;
const ACE_Time_Value m_TimeOut;
public:
COnline_Server_Handler(ACE_Reactor *reactor,
const ACE_Time_Value time_out);
virtual ~COnline_Server_Handler();
virtual int open (void);
virtual int handle_input(ACE_HANDLE = ACE_INVALID_HANDLE);
virtual int handle_output(ACE_HANDLE = ACE_INVALID_HANDLE);
virtual int handle_timeout(const ACE_Time_Value &now, const void *act);
virtual int handle_close(ACE_HANDLE = ACE_INVALID_HANDLE ,
ACE_Reactor_Mask = 0);
virtual ACE_HANDLE get_handle (void) const
{
return m_Peer.get_handle ();
};
ACE_SOCK_Stream & peer ()
{
return m_Peer;
};
private:
long m_TimerId;
};
/**
* @brief acceptor the new connect
*
*
* date 2010.01.29
* author D43ZWW
*/
class COnline_Acceptor : public ACE_Event_Handler
{
protected:
// Factory that connects <ACE_SOCK_Stream>s passively.
ACE_SOCK_Acceptor m_Acceptor;
public:
typedef ACE_INET_Addr PEER_ADDR;
// Simple constructor.
COnline_Acceptor (ACE_Reactor * r);
virtual ~ COnline_Acceptor ();
// Initialization method.
virtual int open (const ACE_INET_Addr & local_addr);
// Called by a reactor when there's a new connection to accept.
virtual int handle_input (ACE_HANDLE = ACE_INVALID_HANDLE);
// Called when this object is destroyed, e.g., when it's
// removed from a reactor.
virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE,
ACE_Reactor_Mask = 0);
// Return the passive-mode socket's I/O handle.
virtual ACE_HANDLE get_handle () const
{
return m_Acceptor.get_handle ();
};
// Returns a reference to the underlying <m_Acceptor>.
ACE_SOCK_Acceptor & acceptor ()
{
return m_Acceptor;
};
};
#endif /* __ONLINE_EVENT_HANDLER_H */
/* EOF-------------------------------------------------------------------*/ |