找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 10446|回复: 4

帮忙看看这个ACE_TP_Reactor的使用问题

[复制链接]
发表于 2010-4-21 17:21:35 | 显示全部楼层 |阅读模式
我是在使用ACE_TP_Reactor时碰到的.
多线程访问TP_Reactor,当注册句柄超过2317时就段错误.其他epoll,select_reactor都好好的.
代码我在2楼贴

ACE_VERSION 5.7 or 5.6

    I use the ACE development my App on fedora 12 before.It works well.But this week I get some problem when I want to use it on Centos 5.4.

I use the TP_Reactor and the Leader/Follow pattern.
ACE_TP_Reactor *tp_reactor = new ACE_TP_Reactor(MAX_FD_SET_SIZE);///
m_StbReactor = new ACE_Reactor(tp_reactor);

///Init the timer
ACE_High_Res_Timer::global_scale_factor();
m_StbReactor->timer_queue()->gettimeofday(&ACE_High_Res_Timer::gettimeofday_hr);


///run
ACE_Thread_Manager::instance()->spawn_n(THREAD_NUMS, Stb_Thread, m_StbReactor);///The threads is 10

Problems:
1, if MAX_FD_SET_SIZE > 3000. when I acceptor the client handlers over 2317. and register the READ_MASK on the TP_Reator.
It get a Segmentation fault.I gdb the core dump.like this:

Program received signal SIGSEGV, Segmentation fault.
[Switching to Thread 0xb5771b90 (LWP 8875)]
0x004a13e0 in ACE_TP_Reactor::get_notify_handle (this=0x9c13598) at ../../ace/TP_Reactor.cpp:672
672     this->notify_handler_->notify_handle ();
(gdb) bt
#0  0x004a13e0 in ACE_TP_Reactor::get_notify_handle (this=0x9c13598) at ../../ace/TP_Reactor.cpp:672
#1  0x004a1bef in ACE_TP_Reactor::handle_notify_events (this=0x9c13598,
[email=guard=@0xb57712e4]guard=@0xb57712e4[/email])
    at ../../ace/TP_Reactor.cpp:351
#2  0x004a260a in ACE_TP_Reactor::dispatch_i (this=0x9c13598, max_wait_time=0x0,
[email=guard=@0xb57712e4]guard=@0xb57712e4[/email])
    at ../../ace/TP_Reactor.cpp:233
#3  0x004a26ff in ACE_TP_Reactor::handle_events (this=0x9c13598, max_wait_time=0x0)
    at ../../ace/TP_Reactor.cpp:173
#4  0x00480677 in ACE_Reactor::run_reactor_event_loop (this=0x9c0fe40, eh=0) at ../../ace/Reactor.cpp:224
#5  0x0805190f in Stb_Thread (arg=0x9c0fe40) at login_server.cpp:63
#6  0x004a4498 in ACE_Thread_Adapter::invoke_i (this=0x9c19370) at ../../ace/Thread_Adapter.cpp:149
#7  0x004a4676 in ACE_Thread_Adapter::invoke (this=0x9c19370) at ../../ace/Thread_Adapter.cpp:98
#8  0x0040d901 in ace_thread_adapter (args=0x9c19370) at ../../ace/Base_Thread_Adapter.cpp:124
#9  0x00d5e73b in start_thread () from /lib/libpthread.so.0
#10 0x002f0cfe in clone () from /lib/libc.so.6



I use no notify on my application.



my config.h
#ifndef _ACE_CONFIG_H_
#define _ACE_CONFIG_H_


//#define _UNICODE
//#ifdef _UNICODE
//#define ACE_HAS_WCHAR
//#define ACE_USES_WCHAR
//#endif//_UNICODE


#define ACE_HAS_STANDARD_CPP_LIBRARY 1
#define ACE_HAS_EVENT_POLL


#define FD_SETSIZE 10240
#define ACE_HAS_REACTOR_NOTIFICATION_QUEUE 1
//#define ACE_DISABLE_NOTIFY_PIPE_DEFAULT 1
#define ACE_AS_STATIC_LIBS 1
#include "ace/config-linux.h"
#endif//_ACE_CONFIG_H_
 楼主| 发表于 2010-4-21 17:25:22 | 显示全部楼层
这是我从工程里抽出的测试代码.两个文件,

test_server.cpp

#include "test_server.h"
#include "ace/High_Res_Timer.h"




COnline_Server_Handler::COnline_Server_Handler(ACE_Reactor *reactor,
        const ACE_Time_Value time_out = ACE_Time_Value(3600))
        : ACE_Event_Handler(reactor), m_Peer(), m_LastTime(0), m_TimeOut(time_out),
         m_TimerId(0)
{
    this->reference_counting_policy().value(Reference_Counting_Policy::ENABLED);
    //pthread_mutex_init(&m_ThreadMutex, NULL);
};




COnline_Server_Handler::~COnline_Server_Handler()
{
    int nRet = 0;
    nRet = this->reactor()->purge_pending_notifications(this);
    ACE_DEBUG((LM_DEBUG, ACE_TEXT("server handler delete nRet[%d]\n"),nRet));
    this->peer().close();

    return;
}

int COnline_Server_Handler::open (void)
{
    int nRet = 0;
    ACE_INET_Addr iRemoteAddr;
    ACE_TCHAR cAddr[32];
    //ACE_TCHAR cHostBuff[64];
    //pthread_mutex_lock(&m_ThreadMutex);

    peer().get_remote_addr(iRemoteAddr);
    memset(cAddr, 0, sizeof(ACE_TCHAR)*32);
    iRemoteAddr.addr_to_string(cAddr, 32, TRUE);
    #if 0
    memset(cHostBuff, 0, sizeof(ACE_TCHAR)*64);
    iRemoteAddr.addr_to_string(cHostBuff, 64, FALSE);
    #endif  /* 0 */

    ACE_DEBUG((LM_DEBUG, ACE_TEXT("server acceptor client Ip[%s]\n"), cAddr));
    m_Mask = 0;
    nRet = this->reactor()->register_handler(this, ACE_Event_Handler::READ_MASK);

    if(0 == nRet)
    {
        ACE_Time_Value reschedule(m_TimeOut.sec()/4);
        m_TimerId = this->reactor()->schedule_timer(this, NULL, m_TimeOut, reschedule);
        ACE_SET_BITS(m_Mask, ACE_Event_Handler::READ_MASK);
        //this->peer().enable(ACE_NONBLOCK);
    }
    else
    {
        m_TimerId = 0;
        ACE_DEBUG((LM_ERROR, ACE_TEXT("The server is over flow!!\n")));
    }
    //pthread_mutex_unlock(&m_ThreadMutex);
    return nRet;
}



int COnline_Server_Handler::handle_input(ACE_HANDLE)
{
    int nRet = 0;
    char buf[8192];
    memset(buf, 0, sizeof(buf));

    m_LastTime = this->reactor()->timer_queue()->gettimeofday();

    nRet = peer().recv(buf, sizeof(buf));
    if(nRet > 0)
    {
        ACE_DEBUG((LM_DEBUG, ACE_TEXT("recv [%s]\n"), buf));
        nRet = 0;
    }
    else if(nRet <= 0)
    {
        nRet = -1;
    }
    return nRet;
}



int COnline_Server_Handler::handle_output(ACE_HANDLE)
{
    return 0;
}

int COnline_Server_Handler::handle_timeout(const ACE_Time_Value &now, const void *)
{
    int nRet = 0;
    //pthread_mutex_lock(&m_ThreadMutex);
    ACE_DEBUG((LM_DEBUG, ACE_TEXT("server handler timeout\n")));
    if((now - m_LastTime) >= m_TimeOut)
    {
        m_Mask = 0;
        nRet = -1;
        //this->reactor()->remove_handler(this, nMask);
    }
    //pthread_mutex_unlock(&m_ThreadMutex);
    return nRet;
}


int COnline_Server_Handler::handle_close(ACE_HANDLE handle,
                        ACE_Reactor_Mask close_mask)
{
    int nRet = 0;
    //pthread_mutex_lock(&m_ThreadMutex);
    if (close_mask == ACE_Event_Handler::WRITE_MASK)
    {
        ACE_CLR_BITS(m_Mask, ACE_Event_Handler::WRITE_MASK);
    }

    if(close_mask == ACE_Event_Handler::READ_MASK)
    {
        ACE_CLR_BITS(m_Mask, ACE_Event_Handler::READ_MASK);
    }
    if(0 == m_Mask)
    {

        ACE_DEBUG((LM_DEBUG, ACE_TEXT("server handler exit\n")));
        if(this->get_handle() != ACE_INVALID_HANDLE)
        {
            #if 1
            if(0 != m_TimerId)
            {
                this->reactor()->cancel_timer(m_TimerId, NULL, 1);
            }
            #endif  /* 0 */


            this->reactor()->remove_handler(this, ACE_Event_Handler::ALL_EVENTS_MASK|ACE_Event_Handler::DONT_CALL);
            this->remove_reference();

            //delete this;
            nRet = -1; // return value ignored by reactor
        }
    }
    (void)handle;
    //pthread_mutex_unlock(&m_ThreadMutex);
    return nRet;
}



COnline_Acceptor::COnline_Acceptor (ACE_Reactor * r = ACE_Reactor::instance ())
                     : ACE_Event_Handler(r)
{
    this->reference_counting_policy().value(Reference_Counting_Policy::ENABLED);
};



COnline_Acceptor::~COnline_Acceptor ()
{
    ACE_DEBUG((LM_DEBUG, ACE_TEXT("COnline_Acceptor delete\n")));
};

int COnline_Acceptor::open (const ACE_INET_Addr & local_addr)
{
    if (m_Acceptor.open (local_addr, 1) == -1)
    {
        return -1;
    }
    return this->reactor()->register_handler
      (this, ACE_Event_Handler::ACCEPT_MASK);
}



int COnline_Acceptor::handle_input (ACE_HANDLE)
{
    COnline_Server_Handler *peer_handler = NULL;

    peer_handler = new COnline_Server_Handler(this->reactor());

    if (m_Acceptor.accept (peer_handler->peer ()) == -1)
    {
        peer_handler->handle_close ();
    }
    else if (peer_handler->open () == -1)
    {
        peer_handler->handle_close ();
    }
    return 0;
}



int COnline_Acceptor::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
{
    ACE_DEBUG((LM_ERROR, ACE_TEXT("server acceptor handler exit!\n")));
    m_Acceptor.close ();

    this->remove_reference();
    //delete this; /// Segmentation fault!why?
    return 0;
}


static ACE_THR_FUNC_RETURN Stb_Thread(void *arg)
{
    ACE_Reactor *reactor = ACE_static_cast(ACE_Reactor *, arg);

    reactor->owner(ACE_OS::thr_self());

//    ACE_LOG_MSG->open ("route_server", ACE_Log_Msg::STDERR|ACE_Log_Msg::MSG_CALLBACK);
//        ACE_LOG_MSG->msg_callback(CRoute_Server::Instance()->m_LogCallBack);
    reactor->restart(true);
    reactor->run_reactor_event_loop();
    return 0;
}


int ACE_TMAIN (int argc, ACE_TCHAR * argv[])
{
    ACE_TP_Reactor *tp_reactor = new ACE_TP_Reactor(3000);
    ACE_Reactor *m_StbReactor = new ACE_Reactor(tp_reactor);

    ACE_High_Res_Timer::global_scale_factor();
    m_StbReactor->timer_queue()->gettimeofday(&ACE_High_Res_Timer::gettimeofday_hr);

    COnline_Acceptor* m_StbAcceptor = NULL;
    ACE_NEW_RETURN (m_StbAcceptor, COnline_Acceptor( m_StbReactor), -1);
   
    ACE_INET_Addr iStbPort("localhost:4008");

    if (m_StbAcceptor->open(iStbPort) == -1)
    {
        m_StbReactor->end_reactor_event_loop();
        ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("%p\n"), ACE_TEXT("accept")), -1);
    }


    ACE_Thread_Manager::instance()->spawn_n(20, Stb_Thread, m_StbReactor);

    ACE_Thread_Manager::instance()->wait();
    (void)argc;
    (void)argv;
    return 0;
}

/* EOF-------------------------------------------------------------------*/


test_server.h

/**
*
* Copyright (C), 2010-2015, AV Frontier Tech. Co., Ltd.
*
* @file acceptor_event_handler.h
*
* @author    D43ZWW
*
* @date £º    2010.01.29
*
* @brief    communicate event server handler used by ACE reactor
*
*/

#ifndef __ONLINE_EVENT_HANDLER_H
#define __ONLINE_EVENT_HANDLER_H





#include "test_basic.h"
#include "test_type.h"

/**
* @brief
*   handler the server event
*
* date  2010.01.29
* author  D43ZWW
*/
class COnline_Server_Handler : public ACE_Event_Handler
{
public:
    ACE_Reactor_Mask m_Mask;
    int         m_Close;
private:
    ACE_SOCK_Stream m_Peer;
    ACE_Time_Value m_LastTime;
    const ACE_Time_Value m_TimeOut;
public:
    COnline_Server_Handler(ACE_Reactor *reactor,
        const ACE_Time_Value time_out);

    virtual ~COnline_Server_Handler();

    virtual int open (void);

    virtual int handle_input(ACE_HANDLE = ACE_INVALID_HANDLE);

    virtual int handle_output(ACE_HANDLE = ACE_INVALID_HANDLE);

    virtual int handle_timeout(const ACE_Time_Value &now, const void *act);

    virtual int handle_close(ACE_HANDLE = ACE_INVALID_HANDLE ,
                            ACE_Reactor_Mask  = 0);

    virtual ACE_HANDLE get_handle (void) const
    {
      return m_Peer.get_handle ();
    };

    ACE_SOCK_Stream & peer ()
    {
      return m_Peer;
    };

private:

    long m_TimerId;
};




/**
* @brief acceptor the new connect
*
*
* date  2010.01.29
* author  D43ZWW
*/
class COnline_Acceptor : public ACE_Event_Handler
{
protected:
    // Factory that connects <ACE_SOCK_Stream>s passively.
    ACE_SOCK_Acceptor m_Acceptor;

public:

    typedef ACE_INET_Addr PEER_ADDR;

    // Simple constructor.
    COnline_Acceptor (ACE_Reactor * r);

    virtual ~ COnline_Acceptor ();

    // Initialization method.
    virtual int open (const ACE_INET_Addr & local_addr);

    // Called by a reactor when there's a new connection to accept.
    virtual int handle_input (ACE_HANDLE = ACE_INVALID_HANDLE);

    // Called when this object is destroyed, e.g., when it's
    // removed from a reactor.
    virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE,
                    ACE_Reactor_Mask = 0);

    // Return the passive-mode socket's I/O handle.
    virtual ACE_HANDLE get_handle () const
    {
      return m_Acceptor.get_handle ();
    };

    // Returns a reference to the underlying <m_Acceptor>.
    ACE_SOCK_Acceptor & acceptor ()
    {
      return m_Acceptor;
    };

};





#endif  /* __ONLINE_EVENT_HANDLER_H */
/* EOF-------------------------------------------------------------------*/
发表于 2010-4-21 21:11:54 | 显示全部楼层
系统中的ulimit -a 命令调用中,open file的限制数量是多少?
另外:
#define FD_SETSIZE 10240
这个太大了,反复扫描效率是很差的。
 楼主| 发表于 2010-4-22 12:09:55 | 显示全部楼层

回复 #3 winston 的帖子

ulimit -a open file 也是10240
FD_SETSIZE 我看是最大的限制数.所以设大一点
然后我在程序配置里反应堆的初始化时设一个值 大概5000,在这里来限制连接数.然后就这里出问题了,我打印ACE中(max_handlers)最大句柄数都是10240,

[ 本帖最后由 wendy 于 2010-4-22 12:11 编辑 ]
发表于 2010-4-23 23:33:02 | 显示全部楼层
dump不太像是rdsize的问题,应该是线程相关的,试试单线程,如果工作正常,就可确认,修复也就容易啦。
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-11-22 12:09 , Processed in 0.035759 second(s), 6 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表