PigProactorManager.cpp PigProactorManager.h
我在主线程中使用
for (int i = 0; i < 400; i++)
{
App_PigIPManager::instance()->Init();
App_PigAceProactorManager::instance()->Init();
App_PigAceProactorManager::instance()->StartProactor();
App_PigAceProactorManager::instance()->StopProactor();
App_PigAceProactorManager::instance()->Fini();
App_PigIPManager::instance()->Fini();
}
出现了Unhandled exception at 0x56dd19f8 (ACEd.dll) in CenterServer.exe: 0xC0000005: Access violation reading location 0xfeeeff56.
断点: // Call the Task's svc() hook method.
int const svc_status = t->svc ();
出现错误.不知道是哪里出了问题.
<PigProactorManager.h>
#ifndef _PIG_PROACTOR_MANAGER_H_
#define _PIG_PROACTOR_MANAGER_H_
// [6/21/2012 xiaopig]
// 描述: 各类ACE事件反应器及反应器管理器
// CPigAceProactor:反应器消息循环线程,用来循环触发ACE_Proactor的消息.
// CPigAceProactorManager:用来管理CPigAceProactor线程队列.
#include "PigDefine.h"
#include "ace/Synch_Traits.h"
#include "ace/Task.h"
#include "ace/Singleton.h"
#include "ace/Proactor.h"
#include "ace/WIN32_Proactor.h"
#include <map>
using namespace std;
class CPigAceProactor : public ACE_Task<ACE_MT_SYNCH>
{
public:
CPigAceProactor();
~CPigAceProactor();
bool Init(int nProactorID, int nThreadCount); //初始化处理器类型,处理器使用的线程数
bool Start(); //根据配置好的参数启动处理器
void Stop(); //停止处理器
void Fini(); //释放资源,初始化参数
CString GetError(); //返回最近一次错误信息
int GetThreadCount(); //内部处理器使用的线程数
ACE_Proactor* GetProactor(); //返回内部处理器
bool IsRun(); //当前线程是否正在运行.
void SetProactorID(uint32 u4ProactorID); //设置内部处理器的ID
uint32 GetProactorID(); //返回内部处理器的ID
protected:
virtual int open(void *args = 0);
virtual int svc(void);
private:
ACE_Proactor* m_pProactor; //内部消息处理器
int m_nThreadCount; //处理器使用的线程数,即开了m_nThreadCount来反应m_pProactor消息处理器.
CString m_strError; //最近一次产生的错误消息
bool m_blRun; //反应器是否在运行标记
uint32 m_u4ProactorID; //反应器的编号:PIG_PROACTOR_CLIENTDEFINE,PIG_PROACTOR_POSTDEFINE,PIG_PROACTOR_UDPDEFINE
int m_nProactorIO; //Poractor被当前线程池调用次数,当为0时对m_pProactor进行清理.
};
// [6/21/2012 xiaopig]
// 描述: 处理器线程管理器
// 用来管理处理器线程资源.
// 此实例只在主线程中使用,所以不需要使用锁.
// 初始化三个Proactor:
class CPigAceProactorManager
{
public:
CPigAceProactorManager(void);
~CPigAceProactorManager(void);
bool Init();
void Fini();
bool StartProactor();
bool StopProactor();
CPigAceProactor* GetPigAceProactor(int nProactorID);
CString GetError();
private:
typedef map<int, CPigAceProactor*> mapPigAceProactor;
mapPigAceProactor m_mapPigAceProactor;
CString m_strError;
};
// ACE_Recursive_Thread_Mutex:多于一个线程调用App_ProactorManager实例
// ACE_Null_Mutex:单线程调用App_ProactorManager实例
typedef ACE_Singleton<CPigAceProactorManager, ACE_Null_Mutex> App_PigAceProactorManager;
#endif
<PigProactorManager.cpp>
#include "StdAfx.h"
#include "PigProactorManager.h"
//构造函数
CPigAceProactor::CPigAceProactor()
{
m_pProactor = NULL; //内部消息处理器
m_nThreadCount = 1; //处理器使用的线程数,即开了m_nThreadCount来反应m_pProactor消息处理器.
m_strError = ""; //最近一次产生的错误消息
m_blRun = false; //反应器是否在运行标记
m_u4ProactorID = PIG_PROACTOR_CLIENTDEFINE; //反应器的编号:PIG_PROACTOR_CLIENTDEFINE,PIG_PROACTOR_POSTDEFINE,PIG_PROACTOR_UDPDEFINE
m_nProactorIO = 0;
}
//析构函数
CPigAceProactor::~CPigAceProactor()
{
Fini();
}
/************************************************************************/
/*
开发日期: 2012-6-21 xiaopig
版本: 1.0
参数: nProactorID: 反应器ID = PIG_PROACTOR_CLIENTDEFINE,PIG_PROACTOR_POSTDEFINE,PIG_PROACTOR_UDPDEFINE
int nThreadCount:启动当前反应器的线程数
返回值: 略
功能描述: 初始化处理器类型,处理器使用的线程数
注意: 略
*/
/************************************************************************/
bool CPigAceProactor::Init(int nProactorID, int nThreadCount) //
{
PIG_SAFE_DELETE(m_pProactor);
m_u4ProactorID = nProactorID;
m_nThreadCount = nThreadCount;
if (nProactorID == PIG_PROACTOR_CLIENTDEFINE) //默认反应器,采用CPU * 2个数线程执行
{
//得到CPU的个数,监听线程根据CPU数量来创建多个事件回调。
m_nThreadCount = ACE_OS::num_processors_online() * 2;
}
//请在stdafx.h的最开始的地方定义一下;#define FD_SETSIZE 1024
//注意:要与ACE_wrappers\ace\config-win32-common.h中定义的或者你自己定义的保持一致
//否则下面调用将出现错误.
ACE_WIN32_Proactor* pWin32Proactor = new ACE_WIN32_Proactor(m_nThreadCount);
if(NULL == pWin32Proactor)
{
m_u4ProactorID = PIG_PROACTOR_CLIENTDEFINE;
m_nThreadCount = 1;
m_strError = "CPigAceProactor::Init()调用new ACE_WIN32_Proactor(m_nThreadCount)失败";
return false;
}
//第二个参数为TRUE,表示m_pProactor->close()调用时,pWin32Proactor会自动删除.
m_pProactor = new ACE_Proactor(pWin32Proactor, true);
if(NULL == m_pProactor)
{
PIG_SAFE_DELETE(pWin32Proactor);
m_u4ProactorID = PIG_PROACTOR_CLIENTDEFINE;
m_nThreadCount = 1;
m_strError = "CPigAceProactor::Init()调用new ACE_Proactor(pWin32Proactor, true)失败";
return false;
}
return true;
}
/************************************************************************/
/*
开发日期: 2012-6-21 xiaopig
版本: 1.0
参数: 略
返回值: 略
功能描述: 根据配置好的参数启动处理器,即启动线程.
注意: 略
*/
/************************************************************************/
bool CPigAceProactor::Start()
{
if(m_nThreadCount > 0)
{
if(0 == open())
{
return true;
}
else
{
return false;
}
}
else
{
if(0 == svc())
{
return true;
}
else
{
return false;
}
}
}
/************************************************************************/
/*
开发日期: 2012-6-21 xiaopig
版本: 1.0
参数: 略
返回值: 略
功能描述: 停止反应器,等待停止.
注意: 略
*/
/************************************************************************/
void CPigAceProactor::Stop()
{
if(NULL == m_pProactor)
{
m_strError = "CPigAceProactor::Stop() m_pProactor为空.";
return ;
}
while (m_nProactorIO > 0)
{
m_pProactor->proactor_end_event_loop();
m_blRun = false;
wait();
}
return ;
}
/************************************************************************/
/*
开发日期: 2012-6-21 xiaopig
版本: 1.0
参数: 略
返回值: 略
功能描述: 停止反应器,清理资源,初始化参数
注意: 略
*/
/************************************************************************/
void CPigAceProactor::Fini()
{
Stop();
PIG_SAFE_DELETE(m_pProactor);
m_strError = "";
m_nThreadCount = 1;
m_blRun = false;
m_u4ProactorID = PIG_PROACTOR_CLIENTDEFINE;
m_nProactorIO = 0;
}
/************************************************************************/
/*
开发日期: 2012-6-21 xiaopig
版本: 1.0
参数: 略
返回值: 略
功能描述: 返回最近一次错误信息
注意: 略
*/
/************************************************************************/
CString CPigAceProactor::GetError()
{
return m_strError;
}
/************************************************************************/
/*
开发日期: 2012-6-21 xiaopig
版本: 1.0
参数: 略
返回值: 略
功能描述: 内部处理器使用的线程数
注意: 略
*/
/************************************************************************/
int CPigAceProactor::GetThreadCount()
{
return m_nThreadCount;
}
/************************************************************************/
/*
开发日期: 2012-6-21 xiaopig
版本: 1.0
参数: 略
返回值: 略
功能描述: 返回内部处理器
注意: 略
*/
/************************************************************************/
ACE_Proactor* CPigAceProactor::GetProactor()
{
return m_pProactor;
}
/************************************************************************/
/*
开发日期: 2012-6-21 xiaopig
版本: 1.0
参数: 略
返回值: 略
功能描述: 当前线程是否正在运行.
注意: 略
*/
/************************************************************************/
bool CPigAceProactor::IsRun()
{
return m_blRun;
}
/************************************************************************/
/*
开发日期: 2012-6-21 xiaopig
版本: 1.0
参数: 略
返回值: 略
功能描述: 设置内部处理器的ID
注意: 略
*/
/************************************************************************/
void CPigAceProactor::SetProactorID(uint32 u4ProactorID)
{
m_u4ProactorID = u4ProactorID;
}
/************************************************************************/
/*
开发日期: 2012-6-21 xiaopig
版本: 1.0
参数: 略
返回值: PIG_PROACTOR_CLIENTDEFINE,PIG_PROACTOR_POSTDEFINE,PIG_PROACTOR_UDPDEFINE
功能描述: 返回内部处理器的ID
注意: 略
*/
/************************************************************************/
uint32 CPigAceProactor::GetProactorID()
{
return m_u4ProactorID;
}
/************************************************************************/
/*
开发日期: 2012-6-21 xiaopig
版本: 1.0
参数: 略
返回值: -1 = 启动不成功 0 = 启动成功
功能描述: 继承自父类的启动线程函数.
启动预定义数量的线程,来执行ACE Proactor反应器
注意: THR_CANCEL_DISABLE : 不允许这个线程被取消;
THR_CANCEL_ENABLE : 允许这个线程被取消;
THR_CANCEL_DEFERRED: 只允许延迟的取消;
THR_BOUND : 创建一个线程,并绑定到一个可由内核调度的实体上;
THR_NEW_LWP : 创建一个内核级线程;该标志影响进程的并发属性;对"未绑定线程"来说,其预想的并发级别是增1,也就是添加一个新的内核线程到可用的线程池中,以运行用户线程;在不支持N:M混合线程模型的OS平台上,该标志会被忽略;
THR_DETACHED : 创建一个分离的线程;这就意味着这个线程的退出状态不能被其它线程访问;当这个线程退出的时候,其线程ID和它所占用的资源会被OS自动回收;
THR_JOINABLE : 允许新创建的线程被"会合(join)";这就意味着这个线程的退出状态能够被其它线程访问,可以通过join()方法来访问它的退出状态,但是它的线程ID和它所占用的资源不会被OS回收;所有ACE线程创建方法的默认行为都是THR_JOINABLE;
THR_SUSPENDED : 创建一个线程,但让其处在挂起状态;
THR_DAEMON : 创建一个看守(daemon)线程;
THR_SCHED_FIFO : 如果可用,使用FIFO政策调度新创建的线程;
THR_SCHED_RR : 如果可用,使用round-robin方案调度新创建的线程;
THR_SCHED_DEFAULT : 使用操作系统上可用的无论哪种默认调度方案;
THR_SCOPE_SYSTEM : 新线程在系统调度争用空间中创建,永久绑定于新创建的内核线程;
THR_SCOPE_PROCESS : 新线程在进程调度争用空间中创建,也就是说,它将作为一个用户线程运行;
线程的这些属性标志通过逻辑或运算符"|"串在一起,并把它作为ACE_Task_Base::active()方法的第一个参数传递给该方法;
*/
/************************************************************************/
int CPigAceProactor::open(void *args/* = 0*/)
{
if(activate(THR_NEW_LWP | THR_BOUND | THR_DETACHED, m_nThreadCount) == -1)
{
m_blRun = false;
m_strError = "[CPigAceProactor::Open]执行[activate]出错]";
return -1;
}
else
{
m_blRun = true;
return 0;
}
}
/************************************************************************/
/*
开发日期: 2012-6-21 xiaopig
版本: 1.0
参数: 略
返回值: 略
功能描述: 线程执行体
以中断的方式运行ACE Proactor中的消息,线程启动后将中断在m_pProactor->proactor_run_event_loop()上
直到m_pProactor->proactor_end_event_loop();调用才退出.
注意: 略
*/
/************************************************************************/
int CPigAceProactor::svc(void)
{
if(NULL == m_pProactor)
{
m_strError = "[CPigAceProactor::Svc]m_pProactor为空.\n";
return -1;
}
else
{
m_blRun = true;
if (m_pProactor != NULL)
{
m_nProactorIO++;
m_pProactor->proactor_run_event_loop(); //中断的方式运行,直到用户调用proactor_end_event_loop()才返回.
m_nProactorIO--;
}
m_blRun = false;
return 0;
}
}
//////////////////////////////////////////////////////////////////////////
/*
开发日期: 2012-6-21 xiaopig
描述:
处理器线程管理器
*/
//////////////////////////////////////////////////////////////////////////
//构造函数
CPigAceProactorManager::CPigAceProactorManager(void)
{
m_mapPigAceProactor.clear();
m_strError = "";
}
//析构函数
CPigAceProactorManager::~CPigAceProactorManager(void)
{
Fini();
}
/************************************************************************/
/*
开发日期: 2012-6-21 xiaopig
版本: 1.0
参数: 略
返回值: 略
功能描述: 创建三个反应器
TCP客户端-服务器之间的反应器,远程客户端连接到当前服务器所使用的反应器
#define PIG_PROACTOR_CLIENTDEFINE 0
TCP服务器-服务器之间的反应器,当前服务器连接到其它服务器时的反应器
#define PIG_PROACTOR_POSTDEFINE 1
UDP客户端-服务器,服务器-服务器之间的反应器(共用一个反应器)
#define PIG_PROACTOR_UDPDEFINE 2
注意: 三个反应器必需按0,1,2三个顺序创建.
*/
/************************************************************************/
bool CPigAceProactorManager::Init()
{
Fini();
CPigAceProactor* pPigAceProactor = new CPigAceProactor();
if (pPigAceProactor == NULL)
{
m_strError = "[CPigAceProactorManager::Init]创建pPigAceProactor失败.";
return false;
}
pPigAceProactor->Init(PIG_PROACTOR_CLIENTDEFINE, 1);
m_mapPigAceProactor.insert(mapPigAceProactor::value_type(PIG_PROACTOR_CLIENTDEFINE, pPigAceProactor));
pPigAceProactor = new CPigAceProactor();
if (pPigAceProactor == NULL)
{
m_strError = "[CPigAceProactorManager::Init]创建pPigAceProactor失败.";
return false;
}
pPigAceProactor->Init(PIG_PROACTOR_POSTDEFINE, 1);
m_mapPigAceProactor.insert(mapPigAceProactor::value_type(PIG_PROACTOR_POSTDEFINE, pPigAceProactor));
pPigAceProactor = new CPigAceProactor();
if (pPigAceProactor == NULL)
{
m_strError = "[CPigAceProactorManager::Init]创建pPigAceProactor失败.";
return false;
}
pPigAceProactor->Init(PIG_PROACTOR_UDPDEFINE, 1);
m_mapPigAceProactor.insert(mapPigAceProactor::value_type(PIG_PROACTOR_UDPDEFINE, pPigAceProactor));
return true;
}
/************************************************************************/
/*
开发日期: 2012-6-21 xiaopig
版本: 1.0
参数: 略
返回值: 略
功能描述: 清理资源.
1)停止所有Proactor运行
2)回收Proactor内存.
3)初始化参数
注意: 略
*/
/************************************************************************/
void CPigAceProactorManager::Fini()
{
StopProactor();
mapPigAceProactor::iterator b = m_mapPigAceProactor.begin();
mapPigAceProactor::iterator e = m_mapPigAceProactor.end();
for(b; b!= e; b++)
{
int nProactorID = (int)b->first;
CPigAceProactor* pPigAceProactor = (CPigAceProactor* )b->second;
PIG_SAFE_DELETE(pPigAceProactor);
}
m_mapPigAceProactor.clear();
m_strError = "";
}
/************************************************************************/
/*
开发日期: 2012-6-21 xiaopig
版本: 1.0
参数: 略
返回值: 略
功能描述: 启动所有反应器
注意: 略
*/
/************************************************************************/
bool CPigAceProactorManager::StartProactor()
{
StopProactor();
mapPigAceProactor::iterator b = m_mapPigAceProactor.begin();
mapPigAceProactor::iterator e = m_mapPigAceProactor.end();
for(b; b!= e; b++)
{
int nProactorID = (int)b->first;
CPigAceProactor* pPigAceProactor = (CPigAceProactor* )b->second;
if(NULL != pPigAceProactor)
{
if(!pPigAceProactor->Start())
{
m_strError = pPigAceProactor->GetError();
return false;
}
}
}
return true;
}
/************************************************************************/
/*
开发日期: 2012-6-21 xiaopig
版本: 1.0
参数: 略
返回值: 略
功能描述: 停止所有Proactor处理器运行,即停止线程运行.
注意: 略
*/
/************************************************************************/
bool CPigAceProactorManager::StopProactor()
{
mapPigAceProactor::iterator b = m_mapPigAceProactor.begin();
mapPigAceProactor::iterator e = m_mapPigAceProactor.end();
for(b; b!= e; b++)
{
int nProactorID = (int)b->first;
CPigAceProactor* pPigAceProactor = (CPigAceProactor* )b->second;
if(NULL != pPigAceProactor)
{
pPigAceProactor->Stop();
}
}
return true;
}
/************************************************************************/
/*
开发日期: 2012-6-21 xiaopig
版本: 1.0
参数: int nProactorID
返回值: 略
功能描述: 返回指定类型的处理器.
注意: 略
*/
/************************************************************************/
CPigAceProactor* CPigAceProactorManager::GetPigAceProactor(int nProactorID)
{
mapPigAceProactor::iterator b = m_mapPigAceProactor.begin();
mapPigAceProactor::iterator e = m_mapPigAceProactor.end();
for(b; b!= e; b++)
{
int nProactorID = (int)b->first;
CPigAceProactor* pPigAceProactor = (CPigAceProactor* )b->second;
if(NULL != pPigAceProactor && pPigAceProactor->GetProactorID() == nProactorID)
{
return pPigAceProactor;
}
}
return NULL;
}
/************************************************************************/
/*
开发日期: 2012-6-21 xiaopig
版本: 1.0
参数: 略
返回值: 略
功能描述: 返回最近一次错误信息
注意: 略
*/
/************************************************************************/
CString CPigAceProactorManager::GetError()
{
return m_strError;
}
|