|
就是让异步IO多提交几次Recv消息后,结束事件处理(end_event_loop)后,出现内存问题!请大家帮看一下.
有VC6及VC8下面的工程文件!
而且是提交Recv的次数越多,内存泄漏就越多.而且我自己分配的内存池的所有的内存都是已经释放过了.真不知道应该处理在什么地方.
附件中有工程代码.不过担心传不好,这里也简单说一下源代码
一个从Proactor中找的DGram例子改的Receiver 类,增加了发送操作
////////////////////////////////////////////////
ACE_RCSID(Proactor, test_udp_proactor, "test_proactor.cpp,v 1.29 2001/02/02 23:41:16 shuston Exp")
//#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE) || defined (ACE_HAS_AIO_CALLS)
// This only works on Win32 platforms.
class Receiver : public ACE_Handler /*ACE_Service_Handler*/
{
public:
// Keep track of when we're done.
bool m_bDone ;
// = Initialization and termination.
Receiver (void);
~Receiver (void);
//初始化预接收个数, 暂时不用,有点内存管理混乱
bool InitPostRecvCount (size_t nCount = 50);
protected:
//初始化接收缓冲池
bool InitRecvMsgPoll(size_t nCount = 100, size_t nDefaultSize = 1024);
//初始化发送缓冲池
bool InitSendMsgPoll(size_t nCount = 50, size_t nDefaultSize = 1024);
int InitSock(ACE_INET_Addr &localAddr);
void fini();
// These methods are called by the framework
/// This method will be called when an asynchronous read completes on
/// a UDP socket.
virtual void handle_read_dgram (const ACE_Asynch_Read_Dgram::Result &result);
virtual void handle_write_dgram (const ACE_Asynch_Write_Dgram::Result &result);
//用户定义的接收自处理
virtual BOOL OnReceived(DWORD dwRemote,WORD wPort,BYTE * pData,DWORD dwDataLen) = 0;
//提交异步发送请求
int PostSend(const BYTE *pData, DWORD dwDataLen, ACE_INET_Addr & aiaRemote);
//提交异步读
int PostRecv();
protected:
WORD m_wRecvPort; //接收的端口
ACE_SOCK_Dgram m_sockDgram;
size_t m_nPostRecvCount; //预接收个数
size_t m_nRecvCount; //接收数目
private:
//ACE_Message_Block* m_pmbRecv; //接收缓冲区
CMsgBlockMgr m_pMbmRecv;
// create a message block for the message
CMsgBlockMgr m_pMbmSend ; //发送缓冲
//ACE_Message_Block* m_pMsgRecv ; //接收时读消息指针
//ACE_Message_Block* m_pMsgSend ; //发送时消息指针
ACE_Asynch_Write_Dgram m_wd_; //异步数据写
protected:
// rd (read dgram): for reading from a UDP socket.
ACE_Asynch_Read_Dgram m_rd_; //异步数据读
vector <ACE_Message_Block*> m_vctMsgPointArr;
};
/////////////////////////////////////////CPP实现文件///////////////////////////////////////
// UDPBaseACE.cpp: implementation of the CUDPBaseACE class.
//
//////////////////////////////////////////////////////////////////////
#include "stdafx.h"
#include "UDPBaseACE.h"
#include <assert.h>
#include <ace/Thread.h>
#include <ace/Proactor.h>
#include <ace/Asynch_IO.h>
#include <ace/INET_Addr.h>
#include <ace/SOCK_Dgram.h>
#include <ace/Message_Block.h>
#include <ace/Get_Opt.h>
ACE_RCSID(Proactor, test_udp_proactor, "UDPBaseACE.cpp")
//
Receiver::Receiver (void)
:m_bDone ( false),
//m_pMsgRecv(NULL),
//m_pMsgSend(NULL),
m_nRecvCount(0),
m_nPostRecvCount(0)
{
}
Receiver::~Receiver (void)
{
//m_bDone = true;
}
void Receiver::fini()
{
ACE_Guard<ACE_Recursive_Thread_Mutex> locker2(m_pMbmRecv.m_csLock);
//释放内存
m_pMbmRecv.fini();
m_pMbmSend.fini();
m_sockDgram.close ();
}
////////////////////////////////////////////////////////////////////////////////
// 函数名 : Receiver::PostRecv
// 描述 : 提交异步接收
// 参数 :
// 返回值 : 0,表示成功
// 作者 : lihui
// 日期 : 2007-1-15
////////////////////////////////////////////////////////////////////////////////
int Receiver::PostRecv()
{
ACE_Message_Block* pMsg =NULL;
//CCriticalSection locker(&m_pMbmRecv.m_csLock);
pMsg = m_pMbmRecv.get_msg_block();
//ACE_Guard<ACE_Recursive_Thread_Mutex> locker(m_pMbmRecv.m_csLock);
if (NULL == pMsg)
{
assert(false);
return -1;
//pMsg = m_pMbmRecv.get_msg_block();
}
//m_pMsgRecv = pMsg;
// ok lets do the asynch read
size_t number_of_bytes_recvd = 0;
int res = m_rd_.recv (pMsg, number_of_bytes_recvd, 0); //,PF_INET, "Recever act"
switch (res)
{
case 0:
// this is a good error. The proactor will call our handler when the
// read has completed.
break;
case 1:
// actually read something, we will handle it in the handler callback
ACE_DEBUG ((LM_DEBUG, "********************\n"));
ACE_DEBUG ((LM_DEBUG,
"%s = %d\n",
"bytes recieved immediately",
number_of_bytes_recvd));
ACE_DEBUG ((LM_DEBUG, "********************\n"));
res = 0;
break;
case -1:
// Something else went wrong.
ACE_ERROR ((LM_ERROR,
"%p\n",
"ACE_Asynch_Read_Dgram::recv"));
// the handler will not get called in this case so lets clean up our msg
m_pMbmRecv.release_msg_block(pMsg);
break;
default:
// Something undocumented really went wrong.
ACE_ERROR ((LM_ERROR,
"%p\n",
"ACE_Asynch_Read_Dgram::recv"));
m_pMbmRecv.release_msg_block(pMsg);
break;
}
return res;
}
////////////////////////////////////////////////////////////////////////////////
// 函数名 : Receiver::open_addr
// 描述 : 打开本地端口
// 参数 : localAddr, 地址
// 返回值 : 0 ,表示成功; -1 表示10次都不能打开相应端口
// 作者 : lihui
// 日期 : 2007-1-15
////////////////////////////////////////////////////////////////////////////////
int Receiver::InitSock(ACE_INET_Addr &localAddr)
{
ACE_DEBUG ((LM_DEBUG,
"%N:%l:Receiver::open_addr called\n"));
if (0 == localAddr.get_port_number())
{
if (this->m_sockDgram.open (ACE_Addr::sap_any) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"%p\n",
"ACE_SOCK_Dgram::open"), -1);
m_sockDgram.get_local_addr (localAddr);
m_wRecvPort = localAddr.get_port_number();
}
else
{
if (this->m_sockDgram.open (localAddr) == -1)
{
int iRet = -1;
//有错误时表示端口被占用,在原地址上加1测试,进行10次
for (int i = 0; i < 10; i++)
{
m_wRecvPort++;
localAddr.set_port_number(m_wRecvPort);
if ( -1 != m_sockDgram.open (localAddr))
{
iRet = 0;
break;
}
}
if ( 0 == iRet)
{
ACE_ERROR_RETURN ((LM_ERROR,
"%p\n",
"ACE_SOCK_Dgram::open"), -1);
}
}
}
// Create a local UDP socket to receive datagrams.
// Initialize the asynchronous read.
if (this->m_rd_.open (*this,
this->m_sockDgram.get_handle (),
"Receiver Completion Key",
ACE_Proactor::instance ()) == -1)
{
ACE_ERROR_RETURN ((LM_ERROR,
"%p\n",
"ACE_Asynch_Read_Dgram::open"), -1);
}
// Initialize the asynchronous write.
if (this->m_wd_.open (*this,
this->m_sockDgram.get_handle (),
"Write Completion Key",
ACE_Proactor::instance ()) == -1)
{
ACE_ERROR_RETURN ((LM_ERROR,
"%p\n",
"ACE_Asynch_Write_Dgram::open"), -1);
}
InitRecvMsgPoll(); //初始化接收缓冲池
InitPostRecvCount(); //初始化预接收个数
return 0;
}
//提交数据发送请求
int Receiver::PostSend(const BYTE *pData, DWORD dwDataLen, ACE_INET_Addr & aiaRemote)
{
ACE_Message_Block* pMsg =NULL;
pMsg = m_pMbmSend.get_msg_block();
if (NULL == pMsg )
{
this->InitSendMsgPoll();
pMsg = m_pMbmSend.get_msg_block();
}
//ACE_Guard<ACE_Recursive_Thread_Mutex> locker(m_pMbmSend.m_csLock);
// Copy buf into the Message_Block and update the wr_ptr ().
pMsg ->copy((const char *)pData, dwDataLen);
// do the asynch send
size_t number_of_bytes_sent = 0;
// ACE_INET_Addr serverAddr(port);
int res = this->m_wd_.send(pMsg , number_of_bytes_sent, 0, aiaRemote, "send act");
switch (res)
{
case 0:
//ERROR_IO_PENDING
// this is a good error. The proactor will call our handler when the
// send has completed.
break;
case 1:
// actually sent something, we will handle it in the handler callback
res = 0;
break;
case -1:
// Something else went wrong.
ACE_ERROR ((LM_ERROR,
"%p\n",
"ACE_Asynch_Write_Dgram::recv"));
// the handler will not get called in this case so lets clean up our msg
m_pMbmSend.release_msg_block(pMsg );
break;
default:
// Something undocumented really went wrong.
ACE_ERROR ((LM_ERROR,
"%p\n",
"ACE_Asynch_Write_Dgram::recv"));
m_pMbmSend.release_msg_block(pMsg );
break;
}
return res;
}
////////////////////////////////////////////////////////////////////////////////
// 函数名 : Receiver::handle_read_dgram
// 描述 : 完成数据接收后,数据处理函数
// 参数 :
// 返回值 : 无
// 作者 : lihui
// 日期 : 2007-1-15
////////////////////////////////////////////////////////////////////////////////
void Receiver::handle_read_dgram (const ACE_Asynch_Read_Dgram::Result &result)
{
ACE_INET_Addr peerAddr;
PostRecv();
result.remote_address (peerAddr);
ACE_Message_Block* pMsg = result.message_block();
size_t iRet = result.bytes_transferred () ;
if (result.success () && iRet!= 0)
{
OnReceived(htonl(peerAddr.get_ip_address()), peerAddr.get_port_number(),
(BYTE *)pMsg->rd_ptr(), pMsg->length ());
}
//ACE_Guard<ACE_Recursive_Thread_Mutex> locker(m_pMbmRecv.m_csLock);
m_pMbmRecv.release_msg_block(pMsg);
}
//数据发送完成后的处理
void Receiver::handle_write_dgram (const ACE_Asynch_Write_Dgram::Result &result)
{
ACE_Message_Block * pmb = result.message_block();
if(!result.success ())
{
//本想重发,但是已经不知道发向的目的端口
//Sleep(50);
//pmb->rd_ptr(pmb->base());
//size_t number_of_bytes_sent = 0;
//int res = this->m_wd_.send(pmb, number_of_bytes_sent, 0, aiaRemote, "send act");
}
// else
{
m_pMbmSend.release_msg_block(pmb);
}
}
//初始化接收缓冲池
bool Receiver::InitRecvMsgPoll(size_t nCount, size_t nDefaultSize )
{
if (nCount <= 0 || nDefaultSize <=0)
{
return false;
}
m_pMbmRecv.init(nCount,nDefaultSize);
m_nRecvCount = nCount;
return true;
}
//初始化发送缓冲池
bool Receiver::InitSendMsgPoll(size_t nCount , size_t nDefaultSize)
{
if (nCount <= 0 || nDefaultSize <=0)
{
return false;
}
m_pMbmSend.init(nCount,nDefaultSize);
return true;
}
//初始化预接收个数,暂不处理
bool Receiver::InitPostRecvCount (size_t nCount )
{
m_nPostRecvCount = nCount;
if (nCount > 0 && nCount <= m_nRecvCount)
{
m_nPostRecvCount = nCount;
}
else
return false;
return true;
}
///////////////////////////////////使用的派生类///////////////
/////////////////////////////////////////////////////////////////////////////////
// 类名 : CUDPBaseACE
// 描述 : 从CUdpBase改来的,去掉了一个//virtual BOOL OnEnd(int iError) = 0;
// 接口 :
// 调用者 :
// 作者 : lihui
/////////////////////////////////////////////////////////////////////////////////
//class CThreadObject;
class CUDPBaseACE : public Receiver
{
//ACE_Thread::spawn();
public:
DWORD GetErrorTimes();
DWORD GetErrorCode();
CUDPBaseACE();
~CUDPBaseACE();
BOOL Send(BYTE *pData, DWORD dwDataLen, DWORD dwSendTo, WORD wPort);
void StartNetwork(WORD wPort = 0);
WORD GetPort();
BOOL IsValidSocket();
void CloseSocket();
//BOOL CreateSocket(WORD wPort = 0);
void Stop();
DWORD GetHost(const char FAR * szAddress);
BOOL Start(const char * szThreadName);
protected:
virtual BOOL OnReceived(DWORD dwRemote,WORD wPort,BYTE * pData,DWORD dwDataLen)
{
return false;
}
//virtual BOOL OnEnd(int iError) = 0;
private:
virtual DWORD ThreadWorkFunc();
static DWORD ThreadBaseFunc(LPVOID lpParam);
DWORD m_dwExitCode;
DWORD m_dwErrorTimes;//出错的次数
//HANDLE m_hWorkQuit; //工作线程退出事件
};
/////////////////////////////////派生类实现CPP文件/////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////
CUDPBaseACE::CUDPBaseACE()
{
//m_socket = INVALID_SOCKET;
m_wRecvPort = 0;
m_dwExitCode = 0;
m_dwErrorTimes = 0;
//m_hWorkQuit = ::CreateEvent(NULL, TRUE, FALSE, "QuitEvent");
}
CUDPBaseACE::~CUDPBaseACE()
{
m_bDone = TRUE;
//Send((BYTE*)"0",1,0x0100007f,m_wRecvPort);
//提交多个接收 m_nPostRecvCount
ACE_Proactor::instance ()->end_event_loop();
fini();
// Stop();
}
//将一个字符串转换成为IP地址
//如果地址转换成功返回地址,否则返回NULL
DWORD CUDPBaseACE::GetHost(const char * szAddress)
{
DWORD dwHost = INADDR_NONE;
dwHost = inet_addr(szAddress);
if(dwHost == INADDR_NONE)
{
struct hostent *host=NULL;
host = gethostbyname(szAddress);
if(host)
memcpy(&dwHost,host->h_addr_list[0],sizeof(DWORD));
}
if(dwHost == INADDR_NONE)
dwHost = NULL;
return dwHost;
}
/*//
线程工作函数
返回值:
0 线程成功完毕
1 Socket创建失败
2 接收失败
//*/
DWORD CUDPBaseACE::ThreadWorkFunc()
{
int iEndCode = 0;
ACE_Proactor::instance ()->run_event_loop();
return iEndCode;
}
/*//
发送数据
pData 数据缓冲区
dwDataLen 数据长度
dwSendTo 目的主机IP
wPort 目的主机端口
//*/
BOOL CUDPBaseACE::Send(BYTE *pData, DWORD dwDataLen, DWORD dwSendTo, WORD wPort)
{
int iRet=-1;
SOCKADDR_IN remote;
memset(&remote,0 ,sizeof(remote));
remote.sin_family = AF_INET;
remote.sin_port = htons(wPort);
remote.sin_addr.s_addr = dwSendTo;
ACE_INET_Addr aiaRemote(&remote, sizeof(SOCKADDR_IN));
iRet = PostSend(pData, dwDataLen,aiaRemote );
if(iRet)
return FALSE;
return TRUE;
}
void CUDPBaseACE::Stop()
{
m_bDone = true;
}
void CUDPBaseACE::CloseSocket()
{
Stop();
}
BOOL CUDPBaseACE::IsValidSocket()
{
return TRUE;
}
WORD CUDPBaseACE::GetPort()
{
return m_wRecvPort;
}
void CUDPBaseACE::StartNetwork(WORD wPort)
{
m_wRecvPort = wPort;
if (InitSock (ACE_INET_Addr (m_wRecvPort)) == -1)
{
return ;
}
//提交多个接收 m_nPostRecvCount
for (int i = 0; i < m_nPostRecvCount; i++)
{
this->PostRecv();
}
//开始接收的工作线程
Start("CUDPBaseACE thread");
}
DWORD CUDPBaseACE::GetErrorCode()
{
return m_dwExitCode;
}
//返回出错的次数,通常是由于收到长度为0的包导致
DWORD CUDPBaseACE::GetErrorTimes()
{
return m_dwErrorTimes;
}
//启动线程
BOOL CUDPBaseACE::Start(const char * szThreadName)
{
ACE_Thread::spawn((ACE_THR_FUNC)ThreadBaseFunc,(void *)this,THR_DETACHED);
return FALSE;
}
DWORD CUDPBaseACE::ThreadBaseFunc(LPVOID lpParam)
{
DWORD dwRes = 0;
CUDPBaseACE * pTO = (CUDPBaseACE*)lpParam;
dwRes = pTO->ThreadWorkFunc();
return dwRes;
} |
|