找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 6630|回复: 7

ACE_Proactor在ACE_Task<ACE_MT_SYNCH>中正常停止的问题

[复制链接]
发表于 2012-6-25 15:02:54 | 显示全部楼层 |阅读模式
PigProactorManager.cpp PigProactorManager.h
我在主线程中使用
     for (int i = 0; i < 400; i++)
     {
         App_PigIPManager::instance()->Init();
         App_PigAceProactorManager::instance()->Init();
         App_PigAceProactorManager::instance()->StartProactor();
         App_PigAceProactorManager::instance()->StopProactor();
         App_PigAceProactorManager::instance()->Fini();
         App_PigIPManager::instance()->Fini();
     }
出现了Unhandled exception at 0x56dd19f8 (ACEd.dll) in CenterServer.exe: 0xC0000005: Access violation reading location 0xfeeeff56.
断点:  // Call the Task's svc() hook method.
  int const svc_status = t->svc ();
出现错误.不知道是哪里出了问题.


<PigProactorManager.h>
      #ifndef _PIG_PROACTOR_MANAGER_H_
#define _PIG_PROACTOR_MANAGER_H_
// [6/21/2012 xiaopig]
// 描述:  各类ACE事件反应器及反应器管理器
//          CPigAceProactor:反应器消息循环线程,用来循环触发ACE_Proactor的消息.
//          CPigAceProactorManager:用来管理CPigAceProactor线程队列.
#include "PigDefine.h"
#include "ace/Synch_Traits.h"
#include "ace/Task.h"
#include "ace/Singleton.h"
#include "ace/Proactor.h"
#include "ace/WIN32_Proactor.h"
#include <map>
using namespace std;
class CPigAceProactor : public ACE_Task<ACE_MT_SYNCH>
{
public:
    CPigAceProactor();
    ~CPigAceProactor();
    bool Init(int nProactorID, int nThreadCount);           //初始化处理器类型,处理器使用的线程数
    bool Start();                                           //根据配置好的参数启动处理器
    void Stop();                                            //停止处理器
    void Fini();                                            //释放资源,初始化参数
    CString GetError();                                     //返回最近一次错误信息
    int GetThreadCount();                                   //内部处理器使用的线程数
    ACE_Proactor* GetProactor();                            //返回内部处理器
    bool IsRun();                                           //当前线程是否正在运行.
    void   SetProactorID(uint32 u4ProactorID);              //设置内部处理器的ID
    uint32 GetProactorID();                                 //返回内部处理器的ID
protected:
    virtual int open(void *args = 0);
    virtual int svc(void);

private:
    ACE_Proactor* m_pProactor;                      //内部消息处理器
    int           m_nThreadCount;                   //处理器使用的线程数,即开了m_nThreadCount来反应m_pProactor消息处理器.
    CString       m_strError;                       //最近一次产生的错误消息
    bool          m_blRun;                          //反应器是否在运行标记
    uint32        m_u4ProactorID;                   //反应器的编号:PIG_PROACTOR_CLIENTDEFINE,PIG_PROACTOR_POSTDEFINE,PIG_PROACTOR_UDPDEFINE
    int           m_nProactorIO;                    //Poractor被当前线程池调用次数,当为0时对m_pProactor进行清理.
};
// [6/21/2012 xiaopig]
// 描述:  处理器线程管理器
//          用来管理处理器线程资源.
//          此实例只在主线程中使用,所以不需要使用锁.
//          初始化三个Proactor:
class CPigAceProactorManager
{
public:
    CPigAceProactorManager(void);
    ~CPigAceProactorManager(void);
    bool Init();
    void Fini();

    bool StartProactor();
    bool StopProactor();
    CPigAceProactor* GetPigAceProactor(int nProactorID);
   
    CString GetError();
private:
    typedef map<int, CPigAceProactor*> mapPigAceProactor;
    mapPigAceProactor m_mapPigAceProactor;
    CString m_strError;
};
// ACE_Recursive_Thread_Mutex:多于一个线程调用App_ProactorManager实例
// ACE_Null_Mutex:单线程调用App_ProactorManager实例
typedef ACE_Singleton<CPigAceProactorManager, ACE_Null_Mutex> App_PigAceProactorManager;
#endif





<PigProactorManager.cpp>
#include "StdAfx.h"
#include "PigProactorManager.h"
//构造函数
CPigAceProactor::CPigAceProactor()
{
    m_pProactor = NULL;                             //内部消息处理器
    m_nThreadCount = 1;                             //处理器使用的线程数,即开了m_nThreadCount来反应m_pProactor消息处理器.
    m_strError = "";                                //最近一次产生的错误消息
    m_blRun = false;                                //反应器是否在运行标记
    m_u4ProactorID = PIG_PROACTOR_CLIENTDEFINE;     //反应器的编号:PIG_PROACTOR_CLIENTDEFINE,PIG_PROACTOR_POSTDEFINE,PIG_PROACTOR_UDPDEFINE
    m_nProactorIO = 0;
}
//析构函数
CPigAceProactor::~CPigAceProactor()
{
    Fini();
}
/************************************************************************/
/*
开发日期:  2012-6-21 xiaopig
版本:      1.0
参数:      nProactorID: 反应器ID = PIG_PROACTOR_CLIENTDEFINE,PIG_PROACTOR_POSTDEFINE,PIG_PROACTOR_UDPDEFINE
            int nThreadCount:启动当前反应器的线程数
返回值:    略
功能描述:  初始化处理器类型,处理器使用的线程数
注意:      略
*/
/************************************************************************/
bool CPigAceProactor::Init(int nProactorID, int nThreadCount)           //
{
    PIG_SAFE_DELETE(m_pProactor);
    m_u4ProactorID = nProactorID;
    m_nThreadCount = nThreadCount;
    if (nProactorID == PIG_PROACTOR_CLIENTDEFINE)   //默认反应器,采用CPU * 2个数线程执行
    {
        //得到CPU的个数,监听线程根据CPU数量来创建多个事件回调。
        m_nThreadCount = ACE_OS::num_processors_online() * 2;
    }
    //请在stdafx.h的最开始的地方定义一下;#define FD_SETSIZE 1024
    //注意:要与ACE_wrappers\ace\config-win32-common.h中定义的或者你自己定义的保持一致
    //否则下面调用将出现错误.
    ACE_WIN32_Proactor* pWin32Proactor = new ACE_WIN32_Proactor(m_nThreadCount);
    if(NULL == pWin32Proactor)
    {
        m_u4ProactorID = PIG_PROACTOR_CLIENTDEFINE;
        m_nThreadCount = 1;
        m_strError = "CPigAceProactor::Init()调用new ACE_WIN32_Proactor(m_nThreadCount)失败";
        return false;
    }
    //第二个参数为TRUE,表示m_pProactor->close()调用时,pWin32Proactor会自动删除.
    m_pProactor = new ACE_Proactor(pWin32Proactor, true);
    if(NULL == m_pProactor)
    {
        PIG_SAFE_DELETE(pWin32Proactor);
        m_u4ProactorID = PIG_PROACTOR_CLIENTDEFINE;
        m_nThreadCount = 1;
        m_strError = "CPigAceProactor::Init()调用new ACE_Proactor(pWin32Proactor, true)失败";
        return false;
    }
    return true;
}
/************************************************************************/
/*
开发日期:  2012-6-21 xiaopig
版本:      1.0
参数:      略
返回值:    略
功能描述:  根据配置好的参数启动处理器,即启动线程.
注意:      略
*/
/************************************************************************/
bool CPigAceProactor::Start()                                          
{
    if(m_nThreadCount > 0)
    {
        if(0 == open())
        {
            return true;
        }
        else
        {
            return false;
        }
    }
    else
    {
        if(0 == svc())
        {
            return true;
        }
        else
        {
            return false;
        }
    }
}
/************************************************************************/
/*
开发日期:  2012-6-21 xiaopig
版本:      1.0
参数:      略
返回值:    略
功能描述:  停止反应器,等待停止.
注意:      略
*/
/************************************************************************/
void CPigAceProactor::Stop()                                          
{
    if(NULL == m_pProactor)
    {
        m_strError = "CPigAceProactor::Stop() m_pProactor为空.";
        return ;
    }
    while (m_nProactorIO > 0)
    {
        m_pProactor->proactor_end_event_loop();
        m_blRun = false;
        wait();
    }
    return ;
}
/************************************************************************/
/*
开发日期:  2012-6-21 xiaopig
版本:      1.0
参数:      略
返回值:    略
功能描述:  停止反应器,清理资源,初始化参数
注意:      略
*/
/************************************************************************/
void CPigAceProactor::Fini()                                         
{
    Stop();
    PIG_SAFE_DELETE(m_pProactor);
    m_strError = "";
    m_nThreadCount = 1;
    m_blRun = false;
    m_u4ProactorID = PIG_PROACTOR_CLIENTDEFINE;
    m_nProactorIO = 0;
}
/************************************************************************/
/*
开发日期:  2012-6-21 xiaopig
版本:      1.0
参数:      略
返回值:    略
功能描述:  返回最近一次错误信息
注意:      略
*/
/************************************************************************/
CString CPigAceProactor::GetError()
{
    return m_strError;
}
/************************************************************************/
/*
开发日期:  2012-6-21 xiaopig
版本:      1.0
参数:      略
返回值:    略
功能描述:  内部处理器使用的线程数
注意:      略
*/
/************************************************************************/
int CPigAceProactor::GetThreadCount()                                   
{
    return m_nThreadCount;
}
/************************************************************************/
/*
开发日期:  2012-6-21 xiaopig
版本:      1.0
参数:      略
返回值:    略
功能描述:  返回内部处理器
注意:      略
*/
/************************************************************************/
ACE_Proactor* CPigAceProactor::GetProactor()                           
{
    return m_pProactor;
}
/************************************************************************/
/*
开发日期:  2012-6-21 xiaopig
版本:      1.0
参数:      略
返回值:    略
功能描述:  当前线程是否正在运行.
注意:      略
*/
/************************************************************************/
bool CPigAceProactor::IsRun()                                          
{
    return m_blRun;
}
/************************************************************************/
/*
开发日期:  2012-6-21 xiaopig
版本:      1.0
参数:      略
返回值:    略
功能描述:  设置内部处理器的ID
注意:      略
*/
/************************************************************************/
void CPigAceProactor::SetProactorID(uint32 u4ProactorID)              
{
    m_u4ProactorID = u4ProactorID;
}
/************************************************************************/
/*
开发日期:  2012-6-21 xiaopig
版本:      1.0
参数:      略
返回值:    PIG_PROACTOR_CLIENTDEFINE,PIG_PROACTOR_POSTDEFINE,PIG_PROACTOR_UDPDEFINE
功能描述:  返回内部处理器的ID
注意:      略
*/
/************************************************************************/
uint32 CPigAceProactor::GetProactorID()                                 
{
    return m_u4ProactorID;
}
/************************************************************************/
/*
开发日期:  2012-6-21 xiaopig
版本:      1.0
参数:      略
返回值:    -1 = 启动不成功 0 = 启动成功
功能描述:  继承自父类的启动线程函数.
            启动预定义数量的线程,来执行ACE Proactor反应器
注意:      THR_CANCEL_DISABLE : 不允许这个线程被取消;
            THR_CANCEL_ENABLE  : 允许这个线程被取消;
            THR_CANCEL_DEFERRED: 只允许延迟的取消;
            THR_BOUND          : 创建一个线程,并绑定到一个可由内核调度的实体上;
            THR_NEW_LWP        : 创建一个内核级线程;该标志影响进程的并发属性;对"未绑定线程"来说,其预想的并发级别是增1,也就是添加一个新的内核线程到可用的线程池中,以运行用户线程;在不支持N:M混合线程模型的OS平台上,该标志会被忽略;
            THR_DETACHED       : 创建一个分离的线程;这就意味着这个线程的退出状态不能被其它线程访问;当这个线程退出的时候,其线程ID和它所占用的资源会被OS自动回收;
            THR_JOINABLE       : 允许新创建的线程被"会合(join)";这就意味着这个线程的退出状态能够被其它线程访问,可以通过join()方法来访问它的退出状态,但是它的线程ID和它所占用的资源不会被OS回收;所有ACE线程创建方法的默认行为都是THR_JOINABLE;
            THR_SUSPENDED      : 创建一个线程,但让其处在挂起状态;
            THR_DAEMON         : 创建一个看守(daemon)线程;
            THR_SCHED_FIFO     : 如果可用,使用FIFO政策调度新创建的线程;
            THR_SCHED_RR       : 如果可用,使用round-robin方案调度新创建的线程;
            THR_SCHED_DEFAULT  : 使用操作系统上可用的无论哪种默认调度方案;
            THR_SCOPE_SYSTEM   : 新线程在系统调度争用空间中创建,永久绑定于新创建的内核线程;
            THR_SCOPE_PROCESS  : 新线程在进程调度争用空间中创建,也就是说,它将作为一个用户线程运行;
            线程的这些属性标志通过逻辑或运算符"|"串在一起,并把它作为ACE_Task_Base::active()方法的第一个参数传递给该方法;
*/
/************************************************************************/
int CPigAceProactor::open(void *args/* = 0*/)
{
    if(activate(THR_NEW_LWP | THR_BOUND | THR_DETACHED, m_nThreadCount)  == -1)
    {
        m_blRun = false;
        m_strError = "[CPigAceProactor::Open]执行[activate]出错]";
        return -1;
    }
    else
    {
        m_blRun = true;
        return 0;
    }
}
/************************************************************************/
/*
开发日期:  2012-6-21 xiaopig
版本:      1.0
参数:      略
返回值:    略
功能描述:  线程执行体
            以中断的方式运行ACE Proactor中的消息,线程启动后将中断在m_pProactor->proactor_run_event_loop()上
            直到m_pProactor->proactor_end_event_loop();调用才退出.
注意:      略
*/
/************************************************************************/
int CPigAceProactor::svc(void)
{
    if(NULL == m_pProactor)
    {
        m_strError = "[CPigAceProactor::Svc]m_pProactor为空.\n";
        return -1;
    }
    else
    {
        m_blRun = true;
        if (m_pProactor != NULL)
        {
            m_nProactorIO++;
            m_pProactor->proactor_run_event_loop();             //中断的方式运行,直到用户调用proactor_end_event_loop()才返回.
            m_nProactorIO--;
        }
        m_blRun = false;
        return 0;
    }
}

//////////////////////////////////////////////////////////////////////////
/*
    开发日期:  2012-6-21 xiaopig
    描述:
    处理器线程管理器
*/
//////////////////////////////////////////////////////////////////////////
//构造函数
CPigAceProactorManager::CPigAceProactorManager(void)
{
    m_mapPigAceProactor.clear();
    m_strError = "";
}
//析构函数
CPigAceProactorManager::~CPigAceProactorManager(void)
{
    Fini();
}
/************************************************************************/
/*
开发日期:  2012-6-21 xiaopig
版本:      1.0
参数:      略
返回值:    略
功能描述:  创建三个反应器
            TCP客户端-服务器之间的反应器,远程客户端连接到当前服务器所使用的反应器
            #define PIG_PROACTOR_CLIENTDEFINE   0
            TCP服务器-服务器之间的反应器,当前服务器连接到其它服务器时的反应器
            #define PIG_PROACTOR_POSTDEFINE     1
            UDP客户端-服务器,服务器-服务器之间的反应器(共用一个反应器)
            #define PIG_PROACTOR_UDPDEFINE      2
注意:      三个反应器必需按0,1,2三个顺序创建.
*/
/************************************************************************/
bool CPigAceProactorManager::Init()
{
    Fini();
    CPigAceProactor* pPigAceProactor = new CPigAceProactor();
    if (pPigAceProactor == NULL)
    {
        m_strError = "[CPigAceProactorManager::Init]创建pPigAceProactor失败.";
        return false;
    }
    pPigAceProactor->Init(PIG_PROACTOR_CLIENTDEFINE, 1);
    m_mapPigAceProactor.insert(mapPigAceProactor::value_type(PIG_PROACTOR_CLIENTDEFINE, pPigAceProactor));
    pPigAceProactor = new CPigAceProactor();
    if (pPigAceProactor == NULL)
    {
        m_strError = "[CPigAceProactorManager::Init]创建pPigAceProactor失败.";
        return false;
    }
    pPigAceProactor->Init(PIG_PROACTOR_POSTDEFINE, 1);
    m_mapPigAceProactor.insert(mapPigAceProactor::value_type(PIG_PROACTOR_POSTDEFINE, pPigAceProactor));
    pPigAceProactor = new CPigAceProactor();
    if (pPigAceProactor == NULL)
    {
        m_strError = "[CPigAceProactorManager::Init]创建pPigAceProactor失败.";
        return false;
    }
    pPigAceProactor->Init(PIG_PROACTOR_UDPDEFINE, 1);
    m_mapPigAceProactor.insert(mapPigAceProactor::value_type(PIG_PROACTOR_UDPDEFINE, pPigAceProactor));
    return true;
}
/************************************************************************/
/*
开发日期:  2012-6-21 xiaopig
版本:      1.0
参数:      略
返回值:    略
功能描述:  清理资源.
            1)停止所有Proactor运行
            2)回收Proactor内存.
            3)初始化参数
注意:      略
*/
/************************************************************************/
void CPigAceProactorManager::Fini()
{
    StopProactor();
    mapPigAceProactor::iterator b = m_mapPigAceProactor.begin();
    mapPigAceProactor::iterator e = m_mapPigAceProactor.end();
    for(b; b!= e; b++)
    {
        int nProactorID           = (int)b->first;
        CPigAceProactor* pPigAceProactor = (CPigAceProactor* )b->second;
        PIG_SAFE_DELETE(pPigAceProactor);
    }
    m_mapPigAceProactor.clear();
    m_strError = "";
}
/************************************************************************/
/*
开发日期:  2012-6-21 xiaopig
版本:      1.0
参数:      略
返回值:    略
功能描述:  启动所有反应器
注意:      略
*/
/************************************************************************/
bool CPigAceProactorManager::StartProactor()
{
    StopProactor();
    mapPigAceProactor::iterator b = m_mapPigAceProactor.begin();
    mapPigAceProactor::iterator e = m_mapPigAceProactor.end();
    for(b; b!= e; b++)
    {
        int nProactorID           = (int)b->first;
        CPigAceProactor* pPigAceProactor = (CPigAceProactor* )b->second;
        if(NULL != pPigAceProactor)
        {
            if(!pPigAceProactor->Start())
            {
                m_strError = pPigAceProactor->GetError();
                return false;
            }
        }
    }
    return true;
}
/************************************************************************/
/*
开发日期:  2012-6-21 xiaopig
版本:      1.0
参数:      略
返回值:    略
功能描述:  停止所有Proactor处理器运行,即停止线程运行.
注意:      略
*/
/************************************************************************/
bool CPigAceProactorManager::StopProactor()
{
    mapPigAceProactor::iterator b = m_mapPigAceProactor.begin();
    mapPigAceProactor::iterator e = m_mapPigAceProactor.end();
    for(b; b!= e; b++)
    {
        int nProactorID           = (int)b->first;
        CPigAceProactor* pPigAceProactor = (CPigAceProactor* )b->second;
        if(NULL != pPigAceProactor)
        {
            pPigAceProactor->Stop();
        }
    }
    return true;
}
/************************************************************************/
/*
开发日期:  2012-6-21 xiaopig
版本:      1.0
参数:      int nProactorID
返回值:    略
功能描述:  返回指定类型的处理器.
注意:      略
*/
/************************************************************************/
CPigAceProactor* CPigAceProactorManager::GetPigAceProactor(int nProactorID)
{
    mapPigAceProactor::iterator b = m_mapPigAceProactor.begin();
    mapPigAceProactor::iterator e = m_mapPigAceProactor.end();
    for(b; b!= e; b++)
    {
        int nProactorID           = (int)b->first;
        CPigAceProactor* pPigAceProactor = (CPigAceProactor* )b->second;
        if(NULL != pPigAceProactor && pPigAceProactor->GetProactorID() == nProactorID)
        {
            return pPigAceProactor;
        }
    }
    return NULL;
}
/************************************************************************/
/*
开发日期:  2012-6-21 xiaopig
版本:      1.0
参数:      略
返回值:    略
功能描述:  返回最近一次错误信息
注意:      略
*/
/************************************************************************/
CString CPigAceProactorManager::GetError()
{
    return m_strError;
}

 楼主| 发表于 2012-6-25 15:14:43 | 显示全部楼层
有时错误又会出现在:
int
ACE_Proactor::handle_events (void)
{
  return this->implementation ()->handle_events ();
}
 楼主| 发表于 2012-6-25 15:15:01 | 显示全部楼层
错误出现的地方不一定,真心不知道怎么办了.
发表于 2012-6-25 18:47:33 | 显示全部楼层
在什么平台?这种内存存取错误,多从变量操作入手查找,如果难以查到,借助工具。比如boundschecker等。
发表于 2012-6-25 18:48:01 | 显示全部楼层
另外还有一个常见的原因,就是多线程存取变量同步问题。
 楼主| 发表于 2012-6-25 19:09:29 | 显示全部楼层
我把代码发给你.win7 + visual stdio 2010
基于MFC的
我的qq:279690558
 楼主| 发表于 2012-6-25 19:15:11 | 显示全部楼层
你发我一个邮箱,我把代码发给你.
279690558@qq.com
 楼主| 发表于 2012-6-26 06:35:31 | 显示全部楼层
这个问题终于解决了.现在把解决方案分享一下.
原来在ACE_Task中启动了n个线程来执行ACE_Proactor,而后会异步执行SVC函数,这就是关键!SVC函数不一定是马上执行,这就带来问题,在调用Stop时,可能某个线程的SVC入口还没有进行,却被delete了,出现了内存泄露了.
我在CPigAceProactor::Stop()中加了这么一条
    while(m_nProactorIO < m_nThreadCount)       //所有线程都启动后才可进行退出操作,即所有线程都已进入svc线程入口.
    {
        wait();
        ACE_OS::sleep(1);
    }
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-5-1 23:27 , Processed in 0.014775 second(s), 5 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表