独来读网 发表于 2012-6-25 15:02:54

ACE_Proactor在ACE_Task<ACE_MT_SYNCH>中正常停止的问题

PigProactorManager.cpp PigProactorManager.h
我在主线程中使用
   for (int i = 0; i < 400; i++)
   {
         App_PigIPManager::instance()->Init();
         App_PigAceProactorManager::instance()->Init();
         App_PigAceProactorManager::instance()->StartProactor();
         App_PigAceProactorManager::instance()->StopProactor();
         App_PigAceProactorManager::instance()->Fini();
         App_PigIPManager::instance()->Fini();
   }
出现了Unhandled exception at 0x56dd19f8 (ACEd.dll) in CenterServer.exe: 0xC0000005: Access violation reading location 0xfeeeff56.
断点:// Call the Task's svc() hook method.
int const svc_status = t->svc ();
出现错误.不知道是哪里出了问题.


<PigProactorManager.h>
      #ifndef _PIG_PROACTOR_MANAGER_H_
#define _PIG_PROACTOR_MANAGER_H_
//
// 描述:各类ACE事件反应器及反应器管理器
//          CPigAceProactor:反应器消息循环线程,用来循环触发ACE_Proactor的消息.
//          CPigAceProactorManager:用来管理CPigAceProactor线程队列.
#include "PigDefine.h"
#include "ace/Synch_Traits.h"
#include "ace/Task.h"
#include "ace/Singleton.h"
#include "ace/Proactor.h"
#include "ace/WIN32_Proactor.h"
#include <map>
using namespace std;
class CPigAceProactor : public ACE_Task<ACE_MT_SYNCH>
{
public:
    CPigAceProactor();
    ~CPigAceProactor();
    bool Init(int nProactorID, int nThreadCount);         //初始化处理器类型,处理器使用的线程数
    bool Start();                                           //根据配置好的参数启动处理器
    void Stop();                                          //停止处理器
    void Fini();                                          //释放资源,初始化参数
    CString GetError();                                     //返回最近一次错误信息
    int GetThreadCount();                                 //内部处理器使用的线程数
    ACE_Proactor* GetProactor();                            //返回内部处理器
    bool IsRun();                                           //当前线程是否正在运行.
    void   SetProactorID(uint32 u4ProactorID);            //设置内部处理器的ID
    uint32 GetProactorID();                                 //返回内部处理器的ID
protected:
    virtual int open(void *args = 0);
    virtual int svc(void);

private:
    ACE_Proactor* m_pProactor;                      //内部消息处理器
    int         m_nThreadCount;                   //处理器使用的线程数,即开了m_nThreadCount来反应m_pProactor消息处理器.
    CString       m_strError;                     //最近一次产生的错误消息
    bool          m_blRun;                        //反应器是否在运行标记
    uint32      m_u4ProactorID;                   //反应器的编号:PIG_PROACTOR_CLIENTDEFINE,PIG_PROACTOR_POSTDEFINE,PIG_PROACTOR_UDPDEFINE
    int         m_nProactorIO;                  //Poractor被当前线程池调用次数,当为0时对m_pProactor进行清理.
};
//
// 描述:处理器线程管理器
//          用来管理处理器线程资源.
//          此实例只在主线程中使用,所以不需要使用锁.
//          初始化三个Proactor:
class CPigAceProactorManager
{
public:
    CPigAceProactorManager(void);
    ~CPigAceProactorManager(void);
    bool Init();
    void Fini();

    bool StartProactor();
    bool StopProactor();
    CPigAceProactor* GetPigAceProactor(int nProactorID);
   
    CString GetError();
private:
    typedef map<int, CPigAceProactor*> mapPigAceProactor;
    mapPigAceProactor m_mapPigAceProactor;
    CString m_strError;
};
// ACE_Recursive_Thread_Mutex:多于一个线程调用App_ProactorManager实例
// ACE_Null_Mutex:单线程调用App_ProactorManager实例
typedef ACE_Singleton<CPigAceProactorManager, ACE_Null_Mutex> App_PigAceProactorManager;
#endif





<PigProactorManager.cpp>
#include "StdAfx.h"
#include "PigProactorManager.h"
//构造函数
CPigAceProactor::CPigAceProactor()
{
    m_pProactor = NULL;                           //内部消息处理器
    m_nThreadCount = 1;                           //处理器使用的线程数,即开了m_nThreadCount来反应m_pProactor消息处理器.
    m_strError = "";                              //最近一次产生的错误消息
    m_blRun = false;                              //反应器是否在运行标记
    m_u4ProactorID = PIG_PROACTOR_CLIENTDEFINE;   //反应器的编号:PIG_PROACTOR_CLIENTDEFINE,PIG_PROACTOR_POSTDEFINE,PIG_PROACTOR_UDPDEFINE
    m_nProactorIO = 0;
}
//析构函数
CPigAceProactor::~CPigAceProactor()
{
    Fini();
}
/************************************************************************/
/*
开发日期:2012-6-21 xiaopig
版本:      1.0
参数:      nProactorID: 反应器ID = PIG_PROACTOR_CLIENTDEFINE,PIG_PROACTOR_POSTDEFINE,PIG_PROACTOR_UDPDEFINE
            int nThreadCount:启动当前反应器的线程数
返回值:    略
功能描述:初始化处理器类型,处理器使用的线程数
注意:      略
*/
/************************************************************************/
bool CPigAceProactor::Init(int nProactorID, int nThreadCount)         //
{
    PIG_SAFE_DELETE(m_pProactor);
    m_u4ProactorID = nProactorID;
    m_nThreadCount = nThreadCount;
    if (nProactorID == PIG_PROACTOR_CLIENTDEFINE)   //默认反应器,采用CPU * 2个数线程执行
    {
      //得到CPU的个数,监听线程根据CPU数量来创建多个事件回调。
      m_nThreadCount = ACE_OS::num_processors_online() * 2;
    }
    //请在stdafx.h的最开始的地方定义一下;#define FD_SETSIZE 1024
    //注意:要与ACE_wrappers\ace\config-win32-common.h中定义的或者你自己定义的保持一致
    //否则下面调用将出现错误.
    ACE_WIN32_Proactor* pWin32Proactor = new ACE_WIN32_Proactor(m_nThreadCount);
    if(NULL == pWin32Proactor)
    {
      m_u4ProactorID = PIG_PROACTOR_CLIENTDEFINE;
      m_nThreadCount = 1;
      m_strError = "CPigAceProactor::Init()调用new ACE_WIN32_Proactor(m_nThreadCount)失败";
      return false;
    }
    //第二个参数为TRUE,表示m_pProactor->close()调用时,pWin32Proactor会自动删除.
    m_pProactor = new ACE_Proactor(pWin32Proactor, true);
    if(NULL == m_pProactor)
    {
      PIG_SAFE_DELETE(pWin32Proactor);
      m_u4ProactorID = PIG_PROACTOR_CLIENTDEFINE;
      m_nThreadCount = 1;
      m_strError = "CPigAceProactor::Init()调用new ACE_Proactor(pWin32Proactor, true)失败";
      return false;
    }
    return true;
}
/************************************************************************/
/*
开发日期:2012-6-21 xiaopig
版本:      1.0
参数:      略
返回值:    略
功能描述:根据配置好的参数启动处理器,即启动线程.
注意:      略
*/
/************************************************************************/
bool CPigAceProactor::Start()                                          
{
    if(m_nThreadCount > 0)
    {
      if(0 == open())
      {
            return true;
      }
      else
      {
            return false;
      }
    }
    else
    {
      if(0 == svc())
      {
            return true;
      }
      else
      {
            return false;
      }
    }
}
/************************************************************************/
/*
开发日期:2012-6-21 xiaopig
版本:      1.0
参数:      略
返回值:    略
功能描述:停止反应器,等待停止.
注意:      略
*/
/************************************************************************/
void CPigAceProactor::Stop()                                          
{
    if(NULL == m_pProactor)
    {
      m_strError = "CPigAceProactor::Stop() m_pProactor为空.";
      return ;
    }
    while (m_nProactorIO > 0)
    {
      m_pProactor->proactor_end_event_loop();
      m_blRun = false;
      wait();
    }
    return ;
}
/************************************************************************/
/*
开发日期:2012-6-21 xiaopig
版本:      1.0
参数:      略
返回值:    略
功能描述:停止反应器,清理资源,初始化参数
注意:      略
*/
/************************************************************************/
void CPigAceProactor::Fini()                                       
{
    Stop();
    PIG_SAFE_DELETE(m_pProactor);
    m_strError = "";
    m_nThreadCount = 1;
    m_blRun = false;
    m_u4ProactorID = PIG_PROACTOR_CLIENTDEFINE;
    m_nProactorIO = 0;
}
/************************************************************************/
/*
开发日期:2012-6-21 xiaopig
版本:      1.0
参数:      略
返回值:    略
功能描述:返回最近一次错误信息
注意:      略
*/
/************************************************************************/
CString CPigAceProactor::GetError()
{
    return m_strError;
}
/************************************************************************/
/*
开发日期:2012-6-21 xiaopig
版本:      1.0
参数:      略
返回值:    略
功能描述:内部处理器使用的线程数
注意:      略
*/
/************************************************************************/
int CPigAceProactor::GetThreadCount()                                 
{
    return m_nThreadCount;
}
/************************************************************************/
/*
开发日期:2012-6-21 xiaopig
版本:      1.0
参数:      略
返回值:    略
功能描述:返回内部处理器
注意:      略
*/
/************************************************************************/
ACE_Proactor* CPigAceProactor::GetProactor()                           
{
    return m_pProactor;
}
/************************************************************************/
/*
开发日期:2012-6-21 xiaopig
版本:      1.0
参数:      略
返回值:    略
功能描述:当前线程是否正在运行.
注意:      略
*/
/************************************************************************/
bool CPigAceProactor::IsRun()                                          
{
    return m_blRun;
}
/************************************************************************/
/*
开发日期:2012-6-21 xiaopig
版本:      1.0
参数:      略
返回值:    略
功能描述:设置内部处理器的ID
注意:      略
*/
/************************************************************************/
void CPigAceProactor::SetProactorID(uint32 u4ProactorID)            
{
    m_u4ProactorID = u4ProactorID;
}
/************************************************************************/
/*
开发日期:2012-6-21 xiaopig
版本:      1.0
参数:      略
返回值:    PIG_PROACTOR_CLIENTDEFINE,PIG_PROACTOR_POSTDEFINE,PIG_PROACTOR_UDPDEFINE
功能描述:返回内部处理器的ID
注意:      略
*/
/************************************************************************/
uint32 CPigAceProactor::GetProactorID()                                 
{
    return m_u4ProactorID;
}
/************************************************************************/
/*
开发日期:2012-6-21 xiaopig
版本:      1.0
参数:      略
返回值:    -1 = 启动不成功 0 = 启动成功
功能描述:继承自父类的启动线程函数.
            启动预定义数量的线程,来执行ACE Proactor反应器
注意:      THR_CANCEL_DISABLE : 不允许这个线程被取消;
            THR_CANCEL_ENABLE: 允许这个线程被取消;
            THR_CANCEL_DEFERRED: 只允许延迟的取消;
            THR_BOUND          : 创建一个线程,并绑定到一个可由内核调度的实体上;
            THR_NEW_LWP      : 创建一个内核级线程;该标志影响进程的并发属性;对"未绑定线程"来说,其预想的并发级别是增1,也就是添加一个新的内核线程到可用的线程池中,以运行用户线程;在不支持N:M混合线程模型的OS平台上,该标志会被忽略;
            THR_DETACHED       : 创建一个分离的线程;这就意味着这个线程的退出状态不能被其它线程访问;当这个线程退出的时候,其线程ID和它所占用的资源会被OS自动回收;
            THR_JOINABLE       : 允许新创建的线程被"会合(join)";这就意味着这个线程的退出状态能够被其它线程访问,可以通过join()方法来访问它的退出状态,但是它的线程ID和它所占用的资源不会被OS回收;所有ACE线程创建方法的默认行为都是THR_JOINABLE;
            THR_SUSPENDED      : 创建一个线程,但让其处在挂起状态;
            THR_DAEMON         : 创建一个看守(daemon)线程;
            THR_SCHED_FIFO   : 如果可用,使用FIFO政策调度新创建的线程;
            THR_SCHED_RR       : 如果可用,使用round-robin方案调度新创建的线程;
            THR_SCHED_DEFAULT: 使用操作系统上可用的无论哪种默认调度方案;
            THR_SCOPE_SYSTEM   : 新线程在系统调度争用空间中创建,永久绑定于新创建的内核线程;
            THR_SCOPE_PROCESS: 新线程在进程调度争用空间中创建,也就是说,它将作为一个用户线程运行;
            线程的这些属性标志通过逻辑或运算符"|"串在一起,并把它作为ACE_Task_Base::active()方法的第一个参数传递给该方法;
*/
/************************************************************************/
int CPigAceProactor::open(void *args/* = 0*/)
{
    if(activate(THR_NEW_LWP | THR_BOUND | THR_DETACHED, m_nThreadCount)== -1)
    {
      m_blRun = false;
      m_strError = "执行出错]";
      return -1;
    }
    else
    {
      m_blRun = true;
      return 0;
    }
}
/************************************************************************/
/*
开发日期:2012-6-21 xiaopig
版本:      1.0
参数:      略
返回值:    略
功能描述:线程执行体
            以中断的方式运行ACE Proactor中的消息,线程启动后将中断在m_pProactor->proactor_run_event_loop()上
            直到m_pProactor->proactor_end_event_loop();调用才退出.
注意:      略
*/
/************************************************************************/
int CPigAceProactor::svc(void)
{
    if(NULL == m_pProactor)
    {
      m_strError = "m_pProactor为空.\n";
      return -1;
    }
    else
    {
      m_blRun = true;
      if (m_pProactor != NULL)
      {
            m_nProactorIO++;
            m_pProactor->proactor_run_event_loop();             //中断的方式运行,直到用户调用proactor_end_event_loop()才返回.
            m_nProactorIO--;
      }
      m_blRun = false;
      return 0;
    }
}

//////////////////////////////////////////////////////////////////////////
/*
    开发日期:2012-6-21 xiaopig
    描述:
    处理器线程管理器
*/
//////////////////////////////////////////////////////////////////////////
//构造函数
CPigAceProactorManager::CPigAceProactorManager(void)
{
    m_mapPigAceProactor.clear();
    m_strError = "";
}
//析构函数
CPigAceProactorManager::~CPigAceProactorManager(void)
{
    Fini();
}
/************************************************************************/
/*
开发日期:2012-6-21 xiaopig
版本:      1.0
参数:      略
返回值:    略
功能描述:创建三个反应器
            TCP客户端-服务器之间的反应器,远程客户端连接到当前服务器所使用的反应器
            #define PIG_PROACTOR_CLIENTDEFINE   0
            TCP服务器-服务器之间的反应器,当前服务器连接到其它服务器时的反应器
            #define PIG_PROACTOR_POSTDEFINE   1
            UDP客户端-服务器,服务器-服务器之间的反应器(共用一个反应器)
            #define PIG_PROACTOR_UDPDEFINE      2
注意:      三个反应器必需按0,1,2三个顺序创建.
*/
/************************************************************************/
bool CPigAceProactorManager::Init()
{
    Fini();
    CPigAceProactor* pPigAceProactor = new CPigAceProactor();
    if (pPigAceProactor == NULL)
    {
      m_strError = "创建pPigAceProactor失败.";
      return false;
    }
    pPigAceProactor->Init(PIG_PROACTOR_CLIENTDEFINE, 1);
    m_mapPigAceProactor.insert(mapPigAceProactor::value_type(PIG_PROACTOR_CLIENTDEFINE, pPigAceProactor));
    pPigAceProactor = new CPigAceProactor();
    if (pPigAceProactor == NULL)
    {
      m_strError = "创建pPigAceProactor失败.";
      return false;
    }
    pPigAceProactor->Init(PIG_PROACTOR_POSTDEFINE, 1);
    m_mapPigAceProactor.insert(mapPigAceProactor::value_type(PIG_PROACTOR_POSTDEFINE, pPigAceProactor));
    pPigAceProactor = new CPigAceProactor();
    if (pPigAceProactor == NULL)
    {
      m_strError = "创建pPigAceProactor失败.";
      return false;
    }
    pPigAceProactor->Init(PIG_PROACTOR_UDPDEFINE, 1);
    m_mapPigAceProactor.insert(mapPigAceProactor::value_type(PIG_PROACTOR_UDPDEFINE, pPigAceProactor));
    return true;
}
/************************************************************************/
/*
开发日期:2012-6-21 xiaopig
版本:      1.0
参数:      略
返回值:    略
功能描述:清理资源.
            1)停止所有Proactor运行
            2)回收Proactor内存.
            3)初始化参数
注意:      略
*/
/************************************************************************/
void CPigAceProactorManager::Fini()
{
    StopProactor();
    mapPigAceProactor::iterator b = m_mapPigAceProactor.begin();
    mapPigAceProactor::iterator e = m_mapPigAceProactor.end();
    for(b; b!= e; b++)
    {
      int nProactorID         = (int)b->first;
      CPigAceProactor* pPigAceProactor = (CPigAceProactor* )b->second;
      PIG_SAFE_DELETE(pPigAceProactor);
    }
    m_mapPigAceProactor.clear();
    m_strError = "";
}
/************************************************************************/
/*
开发日期:2012-6-21 xiaopig
版本:      1.0
参数:      略
返回值:    略
功能描述:启动所有反应器
注意:      略
*/
/************************************************************************/
bool CPigAceProactorManager::StartProactor()
{
    StopProactor();
    mapPigAceProactor::iterator b = m_mapPigAceProactor.begin();
    mapPigAceProactor::iterator e = m_mapPigAceProactor.end();
    for(b; b!= e; b++)
    {
      int nProactorID         = (int)b->first;
      CPigAceProactor* pPigAceProactor = (CPigAceProactor* )b->second;
      if(NULL != pPigAceProactor)
      {
            if(!pPigAceProactor->Start())
            {
                m_strError = pPigAceProactor->GetError();
                return false;
            }
      }
    }
    return true;
}
/************************************************************************/
/*
开发日期:2012-6-21 xiaopig
版本:      1.0
参数:      略
返回值:    略
功能描述:停止所有Proactor处理器运行,即停止线程运行.
注意:      略
*/
/************************************************************************/
bool CPigAceProactorManager::StopProactor()
{
    mapPigAceProactor::iterator b = m_mapPigAceProactor.begin();
    mapPigAceProactor::iterator e = m_mapPigAceProactor.end();
    for(b; b!= e; b++)
    {
      int nProactorID         = (int)b->first;
      CPigAceProactor* pPigAceProactor = (CPigAceProactor* )b->second;
      if(NULL != pPigAceProactor)
      {
            pPigAceProactor->Stop();
      }
    }
    return true;
}
/************************************************************************/
/*
开发日期:2012-6-21 xiaopig
版本:      1.0
参数:      int nProactorID
返回值:    略
功能描述:返回指定类型的处理器.
注意:      略
*/
/************************************************************************/
CPigAceProactor* CPigAceProactorManager::GetPigAceProactor(int nProactorID)
{
    mapPigAceProactor::iterator b = m_mapPigAceProactor.begin();
    mapPigAceProactor::iterator e = m_mapPigAceProactor.end();
    for(b; b!= e; b++)
    {
      int nProactorID         = (int)b->first;
      CPigAceProactor* pPigAceProactor = (CPigAceProactor* )b->second;
      if(NULL != pPigAceProactor && pPigAceProactor->GetProactorID() == nProactorID)
      {
            return pPigAceProactor;
      }
    }
    return NULL;
}
/************************************************************************/
/*
开发日期:2012-6-21 xiaopig
版本:      1.0
参数:      略
返回值:    略
功能描述:返回最近一次错误信息
注意:      略
*/
/************************************************************************/
CString CPigAceProactorManager::GetError()
{
    return m_strError;
}

独来读网 发表于 2012-6-25 15:14:43

有时错误又会出现在:
int
ACE_Proactor::handle_events (void)
{
return this->implementation ()->handle_events ();
}

独来读网 发表于 2012-6-25 15:15:01

错误出现的地方不一定,真心不知道怎么办了.

winston 发表于 2012-6-25 18:47:33

在什么平台?这种内存存取错误,多从变量操作入手查找,如果难以查到,借助工具。比如boundschecker等。

winston 发表于 2012-6-25 18:48:01

另外还有一个常见的原因,就是多线程存取变量同步问题。

独来读网 发表于 2012-6-25 19:09:29

我把代码发给你.win7 + visual stdio 2010
基于MFC的
我的qq:279690558

独来读网 发表于 2012-6-25 19:15:11

你发我一个邮箱,我把代码发给你.
279690558@qq.com

独来读网 发表于 2012-6-26 06:35:31

这个问题终于解决了.现在把解决方案分享一下.
原来在ACE_Task中启动了n个线程来执行ACE_Proactor,而后会异步执行SVC函数,这就是关键!SVC函数不一定是马上执行,这就带来问题,在调用Stop时,可能某个线程的SVC入口还没有进行,却被delete了,出现了内存泄露了.
我在CPigAceProactor::Stop()中加了这么一条
    while(m_nProactorIO < m_nThreadCount)       //所有线程都启动后才可进行退出操作,即所有线程都已进入svc线程入口.
    {
      wait();
      ACE_OS::sleep(1);
    }
页: [1]
查看完整版本: ACE_Proactor在ACE_Task<ACE_MT_SYNCH>中正常停止的问题