wendy 发表于 2010-4-21 17:21:35

帮忙看看这个ACE_TP_Reactor的使用问题

我是在使用ACE_TP_Reactor时碰到的.
多线程访问TP_Reactor,当注册句柄超过2317时就段错误.其他epoll,select_reactor都好好的.
代码我在2楼贴

ACE_VERSION 5.7 or 5.6
    I use the ACE development my App on fedora 12 before.It works well.But this week I get some problem when I want to use it on Centos 5.4.

I use the TP_Reactor and the Leader/Follow pattern.
ACE_TP_Reactor *tp_reactor = new ACE_TP_Reactor(MAX_FD_SET_SIZE);///
m_StbReactor = new ACE_Reactor(tp_reactor);
///Init the timer
ACE_High_Res_Timer::global_scale_factor();
m_StbReactor->timer_queue()->gettimeofday(&ACE_High_Res_Timer::gettimeofday_hr);

///run
ACE_Thread_Manager::instance()->spawn_n(THREAD_NUMS, Stb_Thread, m_StbReactor);///The threads is 10

Problems:
1, if MAX_FD_SET_SIZE > 3000. when I acceptor the client handlers over 2317. and register the READ_MASK on the TP_Reator.
It get a Segmentation fault.I gdb the core dump.like this:

Program received signal SIGSEGV, Segmentation fault.

0x004a13e0 in ACE_TP_Reactor::get_notify_handle (this=0x9c13598) at ../../ace/TP_Reactor.cpp:672
672   this->notify_handler_->notify_handle ();
(gdb) bt
#00x004a13e0 in ACE_TP_Reactor::get_notify_handle (this=0x9c13598) at ../../ace/TP_Reactor.cpp:672
#10x004a1bef in ACE_TP_Reactor::handle_notify_events (this=0x9c13598, guard=@0xb57712e4)
    at ../../ace/TP_Reactor.cpp:351
#20x004a260a in ACE_TP_Reactor::dispatch_i (this=0x9c13598, max_wait_time=0x0, guard=@0xb57712e4)
    at ../../ace/TP_Reactor.cpp:233
#30x004a26ff in ACE_TP_Reactor::handle_events (this=0x9c13598, max_wait_time=0x0)
    at ../../ace/TP_Reactor.cpp:173
#40x00480677 in ACE_Reactor::run_reactor_event_loop (this=0x9c0fe40, eh=0) at ../../ace/Reactor.cpp:224
#50x0805190f in Stb_Thread (arg=0x9c0fe40) at login_server.cpp:63
#60x004a4498 in ACE_Thread_Adapter::invoke_i (this=0x9c19370) at ../../ace/Thread_Adapter.cpp:149
#70x004a4676 in ACE_Thread_Adapter::invoke (this=0x9c19370) at ../../ace/Thread_Adapter.cpp:98
#80x0040d901 in ace_thread_adapter (args=0x9c19370) at ../../ace/Base_Thread_Adapter.cpp:124
#90x00d5e73b in start_thread () from /lib/libpthread.so.0
#10 0x002f0cfe in clone () from /lib/libc.so.6


I use no notify on my application.



my config.h
#ifndef _ACE_CONFIG_H_
#define _ACE_CONFIG_H_

//#define _UNICODE
//#ifdef _UNICODE
//#define ACE_HAS_WCHAR
//#define ACE_USES_WCHAR
//#endif//_UNICODE

#define ACE_HAS_STANDARD_CPP_LIBRARY 1
#define ACE_HAS_EVENT_POLL

#define FD_SETSIZE 10240
#define ACE_HAS_REACTOR_NOTIFICATION_QUEUE 1
//#define ACE_DISABLE_NOTIFY_PIPE_DEFAULT 1
#define ACE_AS_STATIC_LIBS 1
#include "ace/config-linux.h"
#endif//_ACE_CONFIG_H_

wendy 发表于 2010-4-21 17:25:22

这是我从工程里抽出的测试代码.两个文件,

test_server.cpp

#include "test_server.h"
#include "ace/High_Res_Timer.h"




COnline_Server_Handler::COnline_Server_Handler(ACE_Reactor *reactor,
      const ACE_Time_Value time_out = ACE_Time_Value(3600))
      : ACE_Event_Handler(reactor), m_Peer(), m_LastTime(0), m_TimeOut(time_out),
         m_TimerId(0)
{
    this->reference_counting_policy().value(Reference_Counting_Policy::ENABLED);
    //pthread_mutex_init(&m_ThreadMutex, NULL);
};




COnline_Server_Handler::~COnline_Server_Handler()
{
    int nRet = 0;
    nRet = this->reactor()->purge_pending_notifications(this);
    ACE_DEBUG((LM_DEBUG, ACE_TEXT("server handler delete nRet[%d]\n"),nRet));
    this->peer().close();

    return;
}

int COnline_Server_Handler::open (void)
{
    int nRet = 0;
    ACE_INET_Addr iRemoteAddr;
    ACE_TCHAR cAddr;
    //ACE_TCHAR cHostBuff;
    //pthread_mutex_lock(&m_ThreadMutex);

    peer().get_remote_addr(iRemoteAddr);
    memset(cAddr, 0, sizeof(ACE_TCHAR)*32);
    iRemoteAddr.addr_to_string(cAddr, 32, TRUE);
    #if 0
    memset(cHostBuff, 0, sizeof(ACE_TCHAR)*64);
    iRemoteAddr.addr_to_string(cHostBuff, 64, FALSE);
    #endif/* 0 */

    ACE_DEBUG((LM_DEBUG, ACE_TEXT("server acceptor client Ip[%s]\n"), cAddr));
    m_Mask = 0;
    nRet = this->reactor()->register_handler(this, ACE_Event_Handler::READ_MASK);

    if(0 == nRet)
    {
      ACE_Time_Value reschedule(m_TimeOut.sec()/4);
      m_TimerId = this->reactor()->schedule_timer(this, NULL, m_TimeOut, reschedule);
      ACE_SET_BITS(m_Mask, ACE_Event_Handler::READ_MASK);
      //this->peer().enable(ACE_NONBLOCK);
    }
    else
    {
      m_TimerId = 0;
      ACE_DEBUG((LM_ERROR, ACE_TEXT("The server is over flow!!\n")));
    }
    //pthread_mutex_unlock(&m_ThreadMutex);
    return nRet;
}



int COnline_Server_Handler::handle_input(ACE_HANDLE)
{
    int nRet = 0;
    char buf;
    memset(buf, 0, sizeof(buf));

    m_LastTime = this->reactor()->timer_queue()->gettimeofday();

    nRet = peer().recv(buf, sizeof(buf));
    if(nRet > 0)
    {
      ACE_DEBUG((LM_DEBUG, ACE_TEXT("recv [%s]\n"), buf));
      nRet = 0;
    }
    else if(nRet <= 0)
    {
      nRet = -1;
    }
    return nRet;
}



int COnline_Server_Handler::handle_output(ACE_HANDLE)
{
    return 0;
}

int COnline_Server_Handler::handle_timeout(const ACE_Time_Value &now, const void *)
{
    int nRet = 0;
    //pthread_mutex_lock(&m_ThreadMutex);
    ACE_DEBUG((LM_DEBUG, ACE_TEXT("server handler timeout\n")));
    if((now - m_LastTime) >= m_TimeOut)
    {
      m_Mask = 0;
      nRet = -1;
      //this->reactor()->remove_handler(this, nMask);
    }
    //pthread_mutex_unlock(&m_ThreadMutex);
    return nRet;
}


int COnline_Server_Handler::handle_close(ACE_HANDLE handle,
                        ACE_Reactor_Mask close_mask)
{
    int nRet = 0;
    //pthread_mutex_lock(&m_ThreadMutex);
    if (close_mask == ACE_Event_Handler::WRITE_MASK)
    {
      ACE_CLR_BITS(m_Mask, ACE_Event_Handler::WRITE_MASK);
    }

    if(close_mask == ACE_Event_Handler::READ_MASK)
    {
      ACE_CLR_BITS(m_Mask, ACE_Event_Handler::READ_MASK);
    }
    if(0 == m_Mask)
    {

      ACE_DEBUG((LM_DEBUG, ACE_TEXT("server handler exit\n")));
      if(this->get_handle() != ACE_INVALID_HANDLE)
      {
            #if 1
            if(0 != m_TimerId)
            {
                this->reactor()->cancel_timer(m_TimerId, NULL, 1);
            }
            #endif/* 0 */


            this->reactor()->remove_handler(this, ACE_Event_Handler::ALL_EVENTS_MASK|ACE_Event_Handler::DONT_CALL);
            this->remove_reference();

            //delete this;
            nRet = -1; // return value ignored by reactor
      }
    }
    (void)handle;
    //pthread_mutex_unlock(&m_ThreadMutex);
    return nRet;
}



COnline_Acceptor::COnline_Acceptor (ACE_Reactor * r = ACE_Reactor::instance ())
                     : ACE_Event_Handler(r)
{
    this->reference_counting_policy().value(Reference_Counting_Policy::ENABLED);
};



COnline_Acceptor::~COnline_Acceptor ()
{
    ACE_DEBUG((LM_DEBUG, ACE_TEXT("COnline_Acceptor delete\n")));
};

int COnline_Acceptor::open (const ACE_INET_Addr & local_addr)
{
    if (m_Acceptor.open (local_addr, 1) == -1)
    {
      return -1;
    }
    return this->reactor()->register_handler
      (this, ACE_Event_Handler::ACCEPT_MASK);
}



int COnline_Acceptor::handle_input (ACE_HANDLE)
{
    COnline_Server_Handler *peer_handler = NULL;

    peer_handler = new COnline_Server_Handler(this->reactor());

    if (m_Acceptor.accept (peer_handler->peer ()) == -1)
    {
      peer_handler->handle_close ();
    }
    else if (peer_handler->open () == -1)
    {
      peer_handler->handle_close ();
    }
    return 0;
}



int COnline_Acceptor::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
{
    ACE_DEBUG((LM_ERROR, ACE_TEXT("server acceptor handler exit!\n")));
    m_Acceptor.close ();

    this->remove_reference();
    //delete this; /// Segmentation fault!why?
    return 0;
}


static ACE_THR_FUNC_RETURN Stb_Thread(void *arg)
{
    ACE_Reactor *reactor = ACE_static_cast(ACE_Reactor *, arg);

    reactor->owner(ACE_OS::thr_self());

//    ACE_LOG_MSG->open ("route_server", ACE_Log_Msg::STDERR|ACE_Log_Msg::MSG_CALLBACK);
//      ACE_LOG_MSG->msg_callback(CRoute_Server::Instance()->m_LogCallBack);
    reactor->restart(true);
    reactor->run_reactor_event_loop();
    return 0;
}


int ACE_TMAIN (int argc, ACE_TCHAR * argv[])
{
    ACE_TP_Reactor *tp_reactor = new ACE_TP_Reactor(3000);
    ACE_Reactor *m_StbReactor = new ACE_Reactor(tp_reactor);

    ACE_High_Res_Timer::global_scale_factor();
    m_StbReactor->timer_queue()->gettimeofday(&ACE_High_Res_Timer::gettimeofday_hr);

    COnline_Acceptor* m_StbAcceptor = NULL;
    ACE_NEW_RETURN (m_StbAcceptor, COnline_Acceptor( m_StbReactor), -1);
   
    ACE_INET_Addr iStbPort("localhost:4008");

    if (m_StbAcceptor->open(iStbPort) == -1)
    {
      m_StbReactor->end_reactor_event_loop();
      ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("%p\n"), ACE_TEXT("accept")), -1);
    }


    ACE_Thread_Manager::instance()->spawn_n(20, Stb_Thread, m_StbReactor);

    ACE_Thread_Manager::instance()->wait();
    (void)argc;
    (void)argv;
    return 0;
}

/* EOF-------------------------------------------------------------------*/


test_server.h

/**
*
* Copyright (C), 2010-2015, AV Frontier Tech. Co., Ltd.
*
* @file acceptor_event_handler.h
*
* @author    D43ZWW
*
* @date £º    2010.01.29
*
* @brief    communicate event server handler used by ACE reactor
*
*/

#ifndef __ONLINE_EVENT_HANDLER_H
#define __ONLINE_EVENT_HANDLER_H





#include "test_basic.h"
#include "test_type.h"

/**
* @brief
*   handler the server event
*
* date2010.01.29
* authorD43ZWW
*/
class COnline_Server_Handler : public ACE_Event_Handler
{
public:
    ACE_Reactor_Mask m_Mask;
    int         m_Close;
private:
    ACE_SOCK_Stream m_Peer;
    ACE_Time_Value m_LastTime;
    const ACE_Time_Value m_TimeOut;
public:
    COnline_Server_Handler(ACE_Reactor *reactor,
      const ACE_Time_Value time_out);

    virtual ~COnline_Server_Handler();

    virtual int open (void);

    virtual int handle_input(ACE_HANDLE = ACE_INVALID_HANDLE);

    virtual int handle_output(ACE_HANDLE = ACE_INVALID_HANDLE);

    virtual int handle_timeout(const ACE_Time_Value &now, const void *act);

    virtual int handle_close(ACE_HANDLE = ACE_INVALID_HANDLE ,
                            ACE_Reactor_Mask= 0);

    virtual ACE_HANDLE get_handle (void) const
    {
      return m_Peer.get_handle ();
    };

    ACE_SOCK_Stream & peer ()
    {
      return m_Peer;
    };

private:

    long m_TimerId;
};




/**
* @brief acceptor the new connect
*
*
* date2010.01.29
* authorD43ZWW
*/
class COnline_Acceptor : public ACE_Event_Handler
{
protected:
    // Factory that connects <ACE_SOCK_Stream>s passively.
    ACE_SOCK_Acceptor m_Acceptor;

public:

    typedef ACE_INET_Addr PEER_ADDR;

    // Simple constructor.
    COnline_Acceptor (ACE_Reactor * r);

    virtual ~ COnline_Acceptor ();

    // Initialization method.
    virtual int open (const ACE_INET_Addr & local_addr);

    // Called by a reactor when there's a new connection to accept.
    virtual int handle_input (ACE_HANDLE = ACE_INVALID_HANDLE);

    // Called when this object is destroyed, e.g., when it's
    // removed from a reactor.
    virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE,
                  ACE_Reactor_Mask = 0);

    // Return the passive-mode socket's I/O handle.
    virtual ACE_HANDLE get_handle () const
    {
      return m_Acceptor.get_handle ();
    };

    // Returns a reference to the underlying <m_Acceptor>.
    ACE_SOCK_Acceptor & acceptor ()
    {
      return m_Acceptor;
    };

};





#endif/* __ONLINE_EVENT_HANDLER_H */
/* EOF-------------------------------------------------------------------*/

winston 发表于 2010-4-21 21:11:54

系统中的ulimit -a 命令调用中,open file的限制数量是多少?
另外:
#define FD_SETSIZE 10240
这个太大了,反复扫描效率是很差的。

wendy 发表于 2010-4-22 12:09:55

回复 #3 winston 的帖子

ulimit -a open file 也是10240
FD_SETSIZE 我看是最大的限制数.所以设大一点
然后我在程序配置里反应堆的初始化时设一个值 大概5000,在这里来限制连接数.然后就这里出问题了,我打印ACE中(max_handlers)最大句柄数都是10240,

[ 本帖最后由 wendy 于 2010-4-22 12:11 编辑 ]

steven99ca 发表于 2010-4-23 23:33:02

dump不太像是rdsize的问题,应该是线程相关的,试试单线程,如果工作正常,就可确认,修复也就容易啦。
页: [1]
查看完整版本: 帮忙看看这个ACE_TP_Reactor的使用问题