帮忙看看这个ACE_TP_Reactor的使用问题
我是在使用ACE_TP_Reactor时碰到的.多线程访问TP_Reactor,当注册句柄超过2317时就段错误.其他epoll,select_reactor都好好的.
代码我在2楼贴
ACE_VERSION 5.7 or 5.6
I use the ACE development my App on fedora 12 before.It works well.But this week I get some problem when I want to use it on Centos 5.4.
I use the TP_Reactor and the Leader/Follow pattern.
ACE_TP_Reactor *tp_reactor = new ACE_TP_Reactor(MAX_FD_SET_SIZE);///
m_StbReactor = new ACE_Reactor(tp_reactor);
///Init the timer
ACE_High_Res_Timer::global_scale_factor();
m_StbReactor->timer_queue()->gettimeofday(&ACE_High_Res_Timer::gettimeofday_hr);
///run
ACE_Thread_Manager::instance()->spawn_n(THREAD_NUMS, Stb_Thread, m_StbReactor);///The threads is 10
Problems:
1, if MAX_FD_SET_SIZE > 3000. when I acceptor the client handlers over 2317. and register the READ_MASK on the TP_Reator.
It get a Segmentation fault.I gdb the core dump.like this:
Program received signal SIGSEGV, Segmentation fault.
0x004a13e0 in ACE_TP_Reactor::get_notify_handle (this=0x9c13598) at ../../ace/TP_Reactor.cpp:672
672 this->notify_handler_->notify_handle ();
(gdb) bt
#00x004a13e0 in ACE_TP_Reactor::get_notify_handle (this=0x9c13598) at ../../ace/TP_Reactor.cpp:672
#10x004a1bef in ACE_TP_Reactor::handle_notify_events (this=0x9c13598, guard=@0xb57712e4)
at ../../ace/TP_Reactor.cpp:351
#20x004a260a in ACE_TP_Reactor::dispatch_i (this=0x9c13598, max_wait_time=0x0, guard=@0xb57712e4)
at ../../ace/TP_Reactor.cpp:233
#30x004a26ff in ACE_TP_Reactor::handle_events (this=0x9c13598, max_wait_time=0x0)
at ../../ace/TP_Reactor.cpp:173
#40x00480677 in ACE_Reactor::run_reactor_event_loop (this=0x9c0fe40, eh=0) at ../../ace/Reactor.cpp:224
#50x0805190f in Stb_Thread (arg=0x9c0fe40) at login_server.cpp:63
#60x004a4498 in ACE_Thread_Adapter::invoke_i (this=0x9c19370) at ../../ace/Thread_Adapter.cpp:149
#70x004a4676 in ACE_Thread_Adapter::invoke (this=0x9c19370) at ../../ace/Thread_Adapter.cpp:98
#80x0040d901 in ace_thread_adapter (args=0x9c19370) at ../../ace/Base_Thread_Adapter.cpp:124
#90x00d5e73b in start_thread () from /lib/libpthread.so.0
#10 0x002f0cfe in clone () from /lib/libc.so.6
I use no notify on my application.
my config.h
#ifndef _ACE_CONFIG_H_
#define _ACE_CONFIG_H_
//#define _UNICODE
//#ifdef _UNICODE
//#define ACE_HAS_WCHAR
//#define ACE_USES_WCHAR
//#endif//_UNICODE
#define ACE_HAS_STANDARD_CPP_LIBRARY 1
#define ACE_HAS_EVENT_POLL
#define FD_SETSIZE 10240
#define ACE_HAS_REACTOR_NOTIFICATION_QUEUE 1
//#define ACE_DISABLE_NOTIFY_PIPE_DEFAULT 1
#define ACE_AS_STATIC_LIBS 1
#include "ace/config-linux.h"
#endif//_ACE_CONFIG_H_ 这是我从工程里抽出的测试代码.两个文件,
test_server.cpp
#include "test_server.h"
#include "ace/High_Res_Timer.h"
COnline_Server_Handler::COnline_Server_Handler(ACE_Reactor *reactor,
const ACE_Time_Value time_out = ACE_Time_Value(3600))
: ACE_Event_Handler(reactor), m_Peer(), m_LastTime(0), m_TimeOut(time_out),
m_TimerId(0)
{
this->reference_counting_policy().value(Reference_Counting_Policy::ENABLED);
//pthread_mutex_init(&m_ThreadMutex, NULL);
};
COnline_Server_Handler::~COnline_Server_Handler()
{
int nRet = 0;
nRet = this->reactor()->purge_pending_notifications(this);
ACE_DEBUG((LM_DEBUG, ACE_TEXT("server handler delete nRet[%d]\n"),nRet));
this->peer().close();
return;
}
int COnline_Server_Handler::open (void)
{
int nRet = 0;
ACE_INET_Addr iRemoteAddr;
ACE_TCHAR cAddr;
//ACE_TCHAR cHostBuff;
//pthread_mutex_lock(&m_ThreadMutex);
peer().get_remote_addr(iRemoteAddr);
memset(cAddr, 0, sizeof(ACE_TCHAR)*32);
iRemoteAddr.addr_to_string(cAddr, 32, TRUE);
#if 0
memset(cHostBuff, 0, sizeof(ACE_TCHAR)*64);
iRemoteAddr.addr_to_string(cHostBuff, 64, FALSE);
#endif/* 0 */
ACE_DEBUG((LM_DEBUG, ACE_TEXT("server acceptor client Ip[%s]\n"), cAddr));
m_Mask = 0;
nRet = this->reactor()->register_handler(this, ACE_Event_Handler::READ_MASK);
if(0 == nRet)
{
ACE_Time_Value reschedule(m_TimeOut.sec()/4);
m_TimerId = this->reactor()->schedule_timer(this, NULL, m_TimeOut, reschedule);
ACE_SET_BITS(m_Mask, ACE_Event_Handler::READ_MASK);
//this->peer().enable(ACE_NONBLOCK);
}
else
{
m_TimerId = 0;
ACE_DEBUG((LM_ERROR, ACE_TEXT("The server is over flow!!\n")));
}
//pthread_mutex_unlock(&m_ThreadMutex);
return nRet;
}
int COnline_Server_Handler::handle_input(ACE_HANDLE)
{
int nRet = 0;
char buf;
memset(buf, 0, sizeof(buf));
m_LastTime = this->reactor()->timer_queue()->gettimeofday();
nRet = peer().recv(buf, sizeof(buf));
if(nRet > 0)
{
ACE_DEBUG((LM_DEBUG, ACE_TEXT("recv [%s]\n"), buf));
nRet = 0;
}
else if(nRet <= 0)
{
nRet = -1;
}
return nRet;
}
int COnline_Server_Handler::handle_output(ACE_HANDLE)
{
return 0;
}
int COnline_Server_Handler::handle_timeout(const ACE_Time_Value &now, const void *)
{
int nRet = 0;
//pthread_mutex_lock(&m_ThreadMutex);
ACE_DEBUG((LM_DEBUG, ACE_TEXT("server handler timeout\n")));
if((now - m_LastTime) >= m_TimeOut)
{
m_Mask = 0;
nRet = -1;
//this->reactor()->remove_handler(this, nMask);
}
//pthread_mutex_unlock(&m_ThreadMutex);
return nRet;
}
int COnline_Server_Handler::handle_close(ACE_HANDLE handle,
ACE_Reactor_Mask close_mask)
{
int nRet = 0;
//pthread_mutex_lock(&m_ThreadMutex);
if (close_mask == ACE_Event_Handler::WRITE_MASK)
{
ACE_CLR_BITS(m_Mask, ACE_Event_Handler::WRITE_MASK);
}
if(close_mask == ACE_Event_Handler::READ_MASK)
{
ACE_CLR_BITS(m_Mask, ACE_Event_Handler::READ_MASK);
}
if(0 == m_Mask)
{
ACE_DEBUG((LM_DEBUG, ACE_TEXT("server handler exit\n")));
if(this->get_handle() != ACE_INVALID_HANDLE)
{
#if 1
if(0 != m_TimerId)
{
this->reactor()->cancel_timer(m_TimerId, NULL, 1);
}
#endif/* 0 */
this->reactor()->remove_handler(this, ACE_Event_Handler::ALL_EVENTS_MASK|ACE_Event_Handler::DONT_CALL);
this->remove_reference();
//delete this;
nRet = -1; // return value ignored by reactor
}
}
(void)handle;
//pthread_mutex_unlock(&m_ThreadMutex);
return nRet;
}
COnline_Acceptor::COnline_Acceptor (ACE_Reactor * r = ACE_Reactor::instance ())
: ACE_Event_Handler(r)
{
this->reference_counting_policy().value(Reference_Counting_Policy::ENABLED);
};
COnline_Acceptor::~COnline_Acceptor ()
{
ACE_DEBUG((LM_DEBUG, ACE_TEXT("COnline_Acceptor delete\n")));
};
int COnline_Acceptor::open (const ACE_INET_Addr & local_addr)
{
if (m_Acceptor.open (local_addr, 1) == -1)
{
return -1;
}
return this->reactor()->register_handler
(this, ACE_Event_Handler::ACCEPT_MASK);
}
int COnline_Acceptor::handle_input (ACE_HANDLE)
{
COnline_Server_Handler *peer_handler = NULL;
peer_handler = new COnline_Server_Handler(this->reactor());
if (m_Acceptor.accept (peer_handler->peer ()) == -1)
{
peer_handler->handle_close ();
}
else if (peer_handler->open () == -1)
{
peer_handler->handle_close ();
}
return 0;
}
int COnline_Acceptor::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
{
ACE_DEBUG((LM_ERROR, ACE_TEXT("server acceptor handler exit!\n")));
m_Acceptor.close ();
this->remove_reference();
//delete this; /// Segmentation fault!why?
return 0;
}
static ACE_THR_FUNC_RETURN Stb_Thread(void *arg)
{
ACE_Reactor *reactor = ACE_static_cast(ACE_Reactor *, arg);
reactor->owner(ACE_OS::thr_self());
// ACE_LOG_MSG->open ("route_server", ACE_Log_Msg::STDERR|ACE_Log_Msg::MSG_CALLBACK);
// ACE_LOG_MSG->msg_callback(CRoute_Server::Instance()->m_LogCallBack);
reactor->restart(true);
reactor->run_reactor_event_loop();
return 0;
}
int ACE_TMAIN (int argc, ACE_TCHAR * argv[])
{
ACE_TP_Reactor *tp_reactor = new ACE_TP_Reactor(3000);
ACE_Reactor *m_StbReactor = new ACE_Reactor(tp_reactor);
ACE_High_Res_Timer::global_scale_factor();
m_StbReactor->timer_queue()->gettimeofday(&ACE_High_Res_Timer::gettimeofday_hr);
COnline_Acceptor* m_StbAcceptor = NULL;
ACE_NEW_RETURN (m_StbAcceptor, COnline_Acceptor( m_StbReactor), -1);
ACE_INET_Addr iStbPort("localhost:4008");
if (m_StbAcceptor->open(iStbPort) == -1)
{
m_StbReactor->end_reactor_event_loop();
ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("%p\n"), ACE_TEXT("accept")), -1);
}
ACE_Thread_Manager::instance()->spawn_n(20, Stb_Thread, m_StbReactor);
ACE_Thread_Manager::instance()->wait();
(void)argc;
(void)argv;
return 0;
}
/* EOF-------------------------------------------------------------------*/
test_server.h
/**
*
* Copyright (C), 2010-2015, AV Frontier Tech. Co., Ltd.
*
* @file acceptor_event_handler.h
*
* @author D43ZWW
*
* @date £º 2010.01.29
*
* @brief communicate event server handler used by ACE reactor
*
*/
#ifndef __ONLINE_EVENT_HANDLER_H
#define __ONLINE_EVENT_HANDLER_H
#include "test_basic.h"
#include "test_type.h"
/**
* @brief
* handler the server event
*
* date2010.01.29
* authorD43ZWW
*/
class COnline_Server_Handler : public ACE_Event_Handler
{
public:
ACE_Reactor_Mask m_Mask;
int m_Close;
private:
ACE_SOCK_Stream m_Peer;
ACE_Time_Value m_LastTime;
const ACE_Time_Value m_TimeOut;
public:
COnline_Server_Handler(ACE_Reactor *reactor,
const ACE_Time_Value time_out);
virtual ~COnline_Server_Handler();
virtual int open (void);
virtual int handle_input(ACE_HANDLE = ACE_INVALID_HANDLE);
virtual int handle_output(ACE_HANDLE = ACE_INVALID_HANDLE);
virtual int handle_timeout(const ACE_Time_Value &now, const void *act);
virtual int handle_close(ACE_HANDLE = ACE_INVALID_HANDLE ,
ACE_Reactor_Mask= 0);
virtual ACE_HANDLE get_handle (void) const
{
return m_Peer.get_handle ();
};
ACE_SOCK_Stream & peer ()
{
return m_Peer;
};
private:
long m_TimerId;
};
/**
* @brief acceptor the new connect
*
*
* date2010.01.29
* authorD43ZWW
*/
class COnline_Acceptor : public ACE_Event_Handler
{
protected:
// Factory that connects <ACE_SOCK_Stream>s passively.
ACE_SOCK_Acceptor m_Acceptor;
public:
typedef ACE_INET_Addr PEER_ADDR;
// Simple constructor.
COnline_Acceptor (ACE_Reactor * r);
virtual ~ COnline_Acceptor ();
// Initialization method.
virtual int open (const ACE_INET_Addr & local_addr);
// Called by a reactor when there's a new connection to accept.
virtual int handle_input (ACE_HANDLE = ACE_INVALID_HANDLE);
// Called when this object is destroyed, e.g., when it's
// removed from a reactor.
virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE,
ACE_Reactor_Mask = 0);
// Return the passive-mode socket's I/O handle.
virtual ACE_HANDLE get_handle () const
{
return m_Acceptor.get_handle ();
};
// Returns a reference to the underlying <m_Acceptor>.
ACE_SOCK_Acceptor & acceptor ()
{
return m_Acceptor;
};
};
#endif/* __ONLINE_EVENT_HANDLER_H */
/* EOF-------------------------------------------------------------------*/ 系统中的ulimit -a 命令调用中,open file的限制数量是多少?
另外:
#define FD_SETSIZE 10240
这个太大了,反复扫描效率是很差的。
回复 #3 winston 的帖子
ulimit -a open file 也是10240FD_SETSIZE 我看是最大的限制数.所以设大一点
然后我在程序配置里反应堆的初始化时设一个值 大概5000,在这里来限制连接数.然后就这里出问题了,我打印ACE中(max_handlers)最大句柄数都是10240,
[ 本帖最后由 wendy 于 2010-4-22 12:11 编辑 ] dump不太像是rdsize的问题,应该是线程相关的,试试单线程,如果工作正常,就可确认,修复也就容易啦。
页:
[1]