找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 11806|回复: 11

Proactor中对异步IO预提交多个Recv出现内存泄漏

[复制链接]
发表于 2008-7-15 22:59:03 | 显示全部楼层 |阅读模式
就是让异步IO多提交几次Recv消息后,结束事件处理(end_event_loop)后,出现内存问题!请大家帮看一下.
有VC6及VC8下面的工程文件!
而且是提交Recv的次数越多,内存泄漏就越多.而且我自己分配的内存池的所有的内存都是已经释放过了.真不知道应该处理在什么地方.
附件中有工程代码.不过担心传不好,这里也简单说一下源代码
一个从Proactor中找的DGram例子改的Receiver 类,增加了发送操作
////////////////////////////////////////////////
ACE_RCSID(Proactor, test_udp_proactor, "test_proactor.cpp,v 1.29 2001/02/02 23:41:16 shuston Exp")
//#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE) || defined (ACE_HAS_AIO_CALLS)
// This only works on Win32 platforms.
class Receiver : public ACE_Handler /*ACE_Service_Handler*/
{
public:
// Keep track of when we're done.
bool m_bDone ;
// = Initialization and termination.
Receiver (void);
~Receiver (void);

//初始化预接收个数, 暂时不用,有点内存管理混乱
bool InitPostRecvCount (size_t nCount = 50);
protected:
//初始化接收缓冲池
bool InitRecvMsgPoll(size_t nCount = 100, size_t nDefaultSize = 1024);
//初始化发送缓冲池
bool InitSendMsgPoll(size_t nCount = 50, size_t nDefaultSize = 1024);
int InitSock(ACE_INET_Addr &localAddr);
void fini();
// These methods are called by the framework
/// This method will be called when an asynchronous read completes on
/// a UDP socket.
virtual void handle_read_dgram (const ACE_Asynch_Read_Dgram::Result &result);
virtual void handle_write_dgram (const ACE_Asynch_Write_Dgram::Result &result);
//用户定义的接收自处理
virtual BOOL OnReceived(DWORD dwRemote,WORD wPort,BYTE * pData,DWORD dwDataLen) = 0;
//提交异步发送请求
int PostSend(const BYTE *pData, DWORD dwDataLen, ACE_INET_Addr & aiaRemote);
//提交异步读
int PostRecv();
protected:
WORD m_wRecvPort;  //接收的端口
ACE_SOCK_Dgram m_sockDgram;

size_t m_nPostRecvCount; //预接收个数
size_t m_nRecvCount;  //接收数目
private:
//ACE_Message_Block* m_pmbRecv; //接收缓冲区
CMsgBlockMgr m_pMbmRecv;
// create a message block for the message
CMsgBlockMgr m_pMbmSend ;  //发送缓冲
//ACE_Message_Block* m_pMsgRecv ; //接收时读消息指针
//ACE_Message_Block* m_pMsgSend ; //发送时消息指针
ACE_Asynch_Write_Dgram m_wd_; //异步数据写
protected:
// rd (read dgram): for reading from a UDP socket.
ACE_Asynch_Read_Dgram m_rd_; //异步数据读
vector <ACE_Message_Block*> m_vctMsgPointArr;
};
/////////////////////////////////////////CPP实现文件///////////////////////////////////////
// UDPBaseACE.cpp: implementation of the CUDPBaseACE class.
//
//////////////////////////////////////////////////////////////////////
#include "stdafx.h"
#include "UDPBaseACE.h"
#include <assert.h>
#include <ace/Thread.h>
#include <ace/Proactor.h>
#include <ace/Asynch_IO.h>
#include <ace/INET_Addr.h>
#include <ace/SOCK_Dgram.h>
#include <ace/Message_Block.h>
#include <ace/Get_Opt.h>
ACE_RCSID(Proactor, test_udp_proactor, "UDPBaseACE.cpp")
//
Receiver::Receiver (void)
:m_bDone ( false),
//m_pMsgRecv(NULL),
//m_pMsgSend(NULL),
m_nRecvCount(0),
m_nPostRecvCount(0)
{
}
Receiver::~Receiver (void)
{
//m_bDone = true;
}
void Receiver::fini()
{
ACE_Guard<ACE_Recursive_Thread_Mutex> locker2(m_pMbmRecv.m_csLock);
//释放内存
m_pMbmRecv.fini();
m_pMbmSend.fini();
m_sockDgram.close ();
}
////////////////////////////////////////////////////////////////////////////////   
// 函数名 : Receiver::PostRecv
// 描述   : 提交异步接收
// 参数   :
// 返回值 : 0,表示成功
// 作者   : lihui
// 日期   : 2007-1-15
////////////////////////////////////////////////////////////////////////////////   
int Receiver::PostRecv()
{
ACE_Message_Block* pMsg =NULL;
//CCriticalSection locker(&m_pMbmRecv.m_csLock);
pMsg = m_pMbmRecv.get_msg_block();
//ACE_Guard<ACE_Recursive_Thread_Mutex> locker(m_pMbmRecv.m_csLock);
if (NULL == pMsg)
{
  assert(false);
  return -1;
  //pMsg = m_pMbmRecv.get_msg_block();
}
//m_pMsgRecv = pMsg;
// ok lets do the asynch read
size_t number_of_bytes_recvd = 0;
int res = m_rd_.recv (pMsg, number_of_bytes_recvd, 0);  //,PF_INET,   "Recever act"
switch (res)
{
case 0:
  // this is a good error.  The proactor will call our handler when the
  // read has completed.
  break;
case 1:
  // actually read something, we will handle it in the handler callback
  ACE_DEBUG ((LM_DEBUG, "********************\n"));
  ACE_DEBUG ((LM_DEBUG,
   "%s = %d\n",
   "bytes recieved immediately",
   number_of_bytes_recvd));
  ACE_DEBUG ((LM_DEBUG, "********************\n"));
  res = 0;
  break;
case -1:
  // Something else went wrong.
  ACE_ERROR ((LM_ERROR,
   "%p\n",
   "ACE_Asynch_Read_Dgram::recv"));
  // the handler will not get called in this case so lets clean up our msg
  m_pMbmRecv.release_msg_block(pMsg);
  break;
default:
  // Something undocumented really went wrong.
  ACE_ERROR ((LM_ERROR,
   "%p\n",
   "ACE_Asynch_Read_Dgram::recv"));
  m_pMbmRecv.release_msg_block(pMsg);
  break;
}
return res;
}

////////////////////////////////////////////////////////////////////////////////   
// 函数名 : Receiver::open_addr
// 描述   : 打开本地端口
// 参数   : localAddr, 地址
// 返回值 : 0 ,表示成功; -1 表示10次都不能打开相应端口
// 作者   : lihui
// 日期   : 2007-1-15
////////////////////////////////////////////////////////////////////////////////   
int Receiver::InitSock(ACE_INET_Addr &localAddr)
{
ACE_DEBUG ((LM_DEBUG,
  "%N:%l:Receiver::open_addr called\n"));
if (0 == localAddr.get_port_number())
{
  if (this->m_sockDgram.open (ACE_Addr::sap_any) == -1)
   ACE_ERROR_RETURN ((LM_ERROR,
   "%p\n",
   "ACE_SOCK_Dgram::open"), -1);
  m_sockDgram.get_local_addr (localAddr);
  m_wRecvPort = localAddr.get_port_number();
}
else
{
  if (this->m_sockDgram.open (localAddr) == -1)
  {
   int iRet = -1;
   //有错误时表示端口被占用,在原地址上加1测试,进行10次
   for (int i = 0; i < 10; i++)
   {
    m_wRecvPort++;
    localAddr.set_port_number(m_wRecvPort);
    if ( -1  != m_sockDgram.open (localAddr))
    {
     iRet = 0;
     break;
    }
   }
   if ( 0 == iRet)
   {
    ACE_ERROR_RETURN ((LM_ERROR,
     "%p\n",
     "ACE_SOCK_Dgram::open"), -1);
   }
  }
}
// Create a local UDP socket to receive datagrams.
// Initialize the asynchronous read.
if (this->m_rd_.open (*this,
  this->m_sockDgram.get_handle (),
  "Receiver Completion Key",
  ACE_Proactor::instance ()) == -1)
{
  ACE_ERROR_RETURN ((LM_ERROR,
   "%p\n",
   "ACE_Asynch_Read_Dgram::open"), -1);
}

// Initialize the asynchronous write.
if (this->m_wd_.open (*this,
  this->m_sockDgram.get_handle (),
  "Write Completion Key",
  ACE_Proactor::instance ()) == -1)
{
  ACE_ERROR_RETURN ((LM_ERROR,
  "%p\n",
  "ACE_Asynch_Write_Dgram::open"), -1);
}
InitRecvMsgPoll();  //初始化接收缓冲池
InitPostRecvCount(); //初始化预接收个数
return 0;
}
//提交数据发送请求
int Receiver::PostSend(const BYTE *pData, DWORD dwDataLen, ACE_INET_Addr & aiaRemote)
{
ACE_Message_Block* pMsg =NULL;
pMsg  = m_pMbmSend.get_msg_block();
if (NULL == pMsg )
{
  this->InitSendMsgPoll();
  pMsg  = m_pMbmSend.get_msg_block();
}
//ACE_Guard<ACE_Recursive_Thread_Mutex> locker(m_pMbmSend.m_csLock);
// Copy buf into the Message_Block and update the wr_ptr ().
pMsg ->copy((const char *)pData, dwDataLen);
// do the asynch send
size_t number_of_bytes_sent = 0;
// ACE_INET_Addr serverAddr(port);
int res = this->m_wd_.send(pMsg , number_of_bytes_sent, 0, aiaRemote,  "send act");
switch (res)
{
case 0:
  //ERROR_IO_PENDING
  // this is a good error.  The proactor will call our handler when the
  // send has completed.
  break;
case 1:
  // actually sent something, we will handle it in the handler callback
  res = 0;
  break;
case -1:
  // Something else went wrong.
  ACE_ERROR ((LM_ERROR,
   "%p\n",
   "ACE_Asynch_Write_Dgram::recv"));
  // the handler will not get called in this case so lets clean up our msg
  m_pMbmSend.release_msg_block(pMsg );
  break;
default:
  // Something undocumented really went wrong.
  ACE_ERROR ((LM_ERROR,
   "%p\n",
   "ACE_Asynch_Write_Dgram::recv"));
  m_pMbmSend.release_msg_block(pMsg );
  break;
}
return res;
}
////////////////////////////////////////////////////////////////////////////////   
// 函数名 : Receiver::handle_read_dgram
// 描述   : 完成数据接收后,数据处理函数
// 参数   :
// 返回值 : 无
// 作者   : lihui
// 日期   : 2007-1-15
////////////////////////////////////////////////////////////////////////////////   
void Receiver::handle_read_dgram (const ACE_Asynch_Read_Dgram::Result &result)
{
ACE_INET_Addr peerAddr;
PostRecv();
result.remote_address (peerAddr);
ACE_Message_Block* pMsg = result.message_block();
size_t iRet = result.bytes_transferred () ;
if (result.success () && iRet!= 0)
{
  OnReceived(htonl(peerAddr.get_ip_address()), peerAddr.get_port_number(),
   (BYTE *)pMsg->rd_ptr(), pMsg->length ());
  
}
//ACE_Guard<ACE_Recursive_Thread_Mutex> locker(m_pMbmRecv.m_csLock);
m_pMbmRecv.release_msg_block(pMsg);
}
//数据发送完成后的处理
void Receiver::handle_write_dgram (const ACE_Asynch_Write_Dgram::Result &result)
{
ACE_Message_Block * pmb = result.message_block();
if(!result.success ())
{
  //本想重发,但是已经不知道发向的目的端口
  //Sleep(50);
  //pmb->rd_ptr(pmb->base());
  //size_t number_of_bytes_sent = 0;
  //int res = this->m_wd_.send(pmb, number_of_bytes_sent, 0, aiaRemote,  "send act");
}
// else
{
  m_pMbmSend.release_msg_block(pmb);
}
}
//初始化接收缓冲池
bool Receiver::InitRecvMsgPoll(size_t nCount, size_t nDefaultSize )
{
if (nCount <= 0 || nDefaultSize <=0)
{
  return false;
}
m_pMbmRecv.init(nCount,nDefaultSize);
m_nRecvCount  = nCount;
return true;
}
//初始化发送缓冲池
bool Receiver::InitSendMsgPoll(size_t nCount , size_t nDefaultSize)
{
if (nCount <= 0 || nDefaultSize <=0)
{
  return false;
}
m_pMbmSend.init(nCount,nDefaultSize);
return true;
}
//初始化预接收个数,暂不处理
bool Receiver::InitPostRecvCount (size_t nCount )
{
m_nPostRecvCount = nCount;
if (nCount > 0  && nCount <= m_nRecvCount)
{
  m_nPostRecvCount = nCount;
}
else
  return false;
return true;
}

///////////////////////////////////使用的派生类///////////////
/////////////////////////////////////////////////////////////////////////////////   
// 类名      :  CUDPBaseACE
// 描述      :  从CUdpBase改来的,去掉了一个//virtual BOOL OnEnd(int iError) = 0;
// 接口      :  
// 调用者    :  
// 作者      : lihui  
/////////////////////////////////////////////////////////////////////////////////  
//class CThreadObject;
class CUDPBaseACE  : public Receiver
{
//ACE_Thread::spawn();
public:
DWORD GetErrorTimes();
DWORD GetErrorCode();
CUDPBaseACE();
~CUDPBaseACE();
BOOL Send(BYTE *pData, DWORD dwDataLen, DWORD dwSendTo, WORD wPort);

void StartNetwork(WORD wPort = 0);
WORD GetPort();
BOOL IsValidSocket();
void CloseSocket();
//BOOL CreateSocket(WORD wPort = 0);
void Stop();
DWORD GetHost(const char FAR * szAddress);
BOOL Start(const char * szThreadName);
protected:
virtual BOOL OnReceived(DWORD dwRemote,WORD wPort,BYTE * pData,DWORD dwDataLen)
{
  return false;
}
//virtual BOOL OnEnd(int iError) = 0;
private:
virtual DWORD ThreadWorkFunc();
static DWORD ThreadBaseFunc(LPVOID lpParam);
DWORD m_dwExitCode;
DWORD m_dwErrorTimes;//出错的次数
//HANDLE  m_hWorkQuit; //工作线程退出事件
};
/////////////////////////////////派生类实现CPP文件/////////////////////////////////////////////

//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////
CUDPBaseACE::CUDPBaseACE()
{
//m_socket = INVALID_SOCKET;
m_wRecvPort = 0;
m_dwExitCode = 0;
m_dwErrorTimes = 0;
//m_hWorkQuit = ::CreateEvent(NULL, TRUE, FALSE, "QuitEvent");
}
CUDPBaseACE::~CUDPBaseACE()
{
m_bDone = TRUE;
//Send((BYTE*)"0",1,0x0100007f,m_wRecvPort);
//提交多个接收 m_nPostRecvCount
ACE_Proactor::instance ()->end_event_loop();
fini();
// Stop();
}
//将一个字符串转换成为IP地址
//如果地址转换成功返回地址,否则返回NULL
DWORD CUDPBaseACE::GetHost(const char * szAddress)
{
DWORD dwHost = INADDR_NONE;
dwHost = inet_addr(szAddress);
if(dwHost == INADDR_NONE)
{
  struct hostent *host=NULL;
  host = gethostbyname(szAddress);
  if(host)
   memcpy(&dwHost,host->h_addr_list[0],sizeof(DWORD));
}
if(dwHost == INADDR_NONE)
  dwHost = NULL;
return dwHost;
}
/*//
线程工作函数
返回值:
0 线程成功完毕
1 Socket创建失败
2 接收失败
//*/
DWORD CUDPBaseACE::ThreadWorkFunc()
{
int iEndCode = 0;
ACE_Proactor::instance ()->run_event_loop();
return iEndCode;
}
/*//
发送数据
pData  数据缓冲区
dwDataLen 数据长度
dwSendTo 目的主机IP
wPort  目的主机端口
//*/
BOOL CUDPBaseACE::Send(BYTE *pData, DWORD dwDataLen, DWORD dwSendTo, WORD wPort)
{
int    iRet=-1;         
SOCKADDR_IN  remote;   
memset(&remote,0 ,sizeof(remote));
remote.sin_family = AF_INET;
remote.sin_port = htons(wPort);
remote.sin_addr.s_addr = dwSendTo;
ACE_INET_Addr aiaRemote(&remote, sizeof(SOCKADDR_IN));
iRet = PostSend(pData, dwDataLen,aiaRemote );
if(iRet)
  return FALSE;
return TRUE;
}
void CUDPBaseACE::Stop()
{
m_bDone = true;
}

void CUDPBaseACE::CloseSocket()
{
Stop();
}
BOOL CUDPBaseACE::IsValidSocket()
{
return TRUE;
}
WORD CUDPBaseACE::GetPort()
{
return m_wRecvPort;
}
void CUDPBaseACE::StartNetwork(WORD wPort)
{
m_wRecvPort = wPort;
if (InitSock (ACE_INET_Addr (m_wRecvPort)) == -1)
{
  return ;
}
//提交多个接收 m_nPostRecvCount
for (int i = 0; i < m_nPostRecvCount; i++)
{
  this->PostRecv();
}
//开始接收的工作线程
Start("CUDPBaseACE thread");
}
DWORD CUDPBaseACE::GetErrorCode()
{
return m_dwExitCode;
}
//返回出错的次数,通常是由于收到长度为0的包导致
DWORD CUDPBaseACE::GetErrorTimes()
{
return m_dwErrorTimes;
}
//启动线程
BOOL CUDPBaseACE::Start(const char * szThreadName)
{
ACE_Thread::spawn((ACE_THR_FUNC)ThreadBaseFunc,(void *)this,THR_DETACHED);
return FALSE;
}
DWORD CUDPBaseACE::ThreadBaseFunc(LPVOID lpParam)
{
DWORD dwRes = 0;
CUDPBaseACE * pTO = (CUDPBaseACE*)lpParam;
dwRes = pTO->ThreadWorkFunc();
return dwRes;
}
 楼主| 发表于 2008-7-15 22:59:20 | 显示全部楼层
如题,服务端也是用同一个类的,是一个Echo ,即发什么消息,把原来的消息反身回来的一个例子,使用的UDP 20002,也是Proactor中例子的端口

有很大的相似的.就不放上来了.

还有两个初始化类:

#ifndef ACEMFCINIT_H
#define ACEMFCINIT_H
#include <ace/Init_ACE.h>
//ACE 初始化
class CACEInit
{
public:
CACEInit()
{
  ACE::init ();
}
~CACEInit()
{
  ACE::fini();
}

};
#endif

///////////////

//初始化socket库
class CWSAInit
{
public:
CWSAInit(BOOL bForceExit = TRUE)
{
  WSADATA wsaData;
  if(WSAStartup(0x0202,&wsaData)!=NULL)
  {   
   if(bForceExit)
   {   
    MessageBox(NULL,(LPCSTR)"网络初始化失败,点击确定后程序将退出!",(LPCSTR)"错误",MB_OK|MB_ICONSTOP);
    WSACleanup();
    ::exit(0);
   }
  }
}
~CWSAInit(){WSACleanup();}
private:
static CWSAInit m_WSAInstance;
};
 楼主| 发表于 2008-7-15 22:59:27 | 显示全部楼层
我大体看了一遍。给你提几个建议:
1、你的代码里面遗留了不少API模式下的操作,是不需要的,ACE处理起来非常便利。
SOCKADDR_IN  remote;   
memset(&remote,0 ,sizeof(remote));
remote.sin_family = AF_INET;
remote.sin_port = htons(wPort);
remote.sin_addr.s_addr = dwSendTo;
ACE_INET_Addr aiaRemote(&remote, sizeof(SOCKADDR_IN));
ACE极力避免的,就是上面的调用,因为socket的地址处理非常容易出错。直接用ACE_INET_Addr是最佳选择。
2、不必调用socket的初始化函数,ACE::init或者ACE_MAIN都帮你做好了。
3、异步投递,退出时候可以认为有内存泄漏,也可以认为没有。
因为什么呢?当你程序运行的时候,这个投递返回结果的时候,就没有内存泄漏了,根本不必担心。只有当你退出,以前的异步投递取消不掉,才有内存泄漏,但是,操作系统会帮助你清理,也不需要担心。
不过你不能投递太多的异步操作,会导致资源耗尽。
 楼主| 发表于 2008-7-15 22:59:36 | 显示全部楼层
遗留了不少API模式下的东西, 是因为以前用的是API的编写的工程.那个派生类主要是为了和原来的接口相同,才做的一个.没有清理得太干净

异步操作投递多几个,怎么有方法取消息一下啊,我用了

异步读的 cancel; 把Proactor的实例cancel_timer; close_singleton

好像效果也一样.真是不知道如何把多余投递的取消掉

主要是退出时VC调试时看到有泄漏,感觉很不爽,而且也很担心.
 楼主| 发表于 2008-7-15 22:59:45 | 显示全部楼层
异步对象的cancel就可以了。你可以限制异步投递的数量呀,比如初始化的时候,默认只投递4个,返回的时候重新投递一个。退出的时候,使用cancel。但是要求是投递的线程进行cancel处理,否则无法达到目的。不必担心VC的报告,心里有数就可以了。
我的工程也是如此的。但在网上的服务器,跑了1个月也没出过异常,非常稳定。
 楼主| 发表于 2008-7-15 22:59:55 | 显示全部楼层
我发现异步接受器投递出去的几个异步请求,就算调用 cancel() 了,也会内存泄露,因为它在里面 new 了一些东西,却没有地方释放,不知各位有没有对这些作处理呢?
 楼主| 发表于 2008-7-15 23:00:02 | 显示全部楼层
不处理。因为需要cancel的时候,基本都是程序退出的时候。处理起来,没啥意义了。
 楼主| 发表于 2008-7-15 23:00:11 | 显示全部楼层
我也是这样想的。但如果接受器要经常的打开和关闭时,我想这时就需要处理了。。。
 楼主| 发表于 2008-7-15 23:00:16 | 显示全部楼层
但是我想不出有什么情况,接收器需要经常的打开和关闭。
 楼主| 发表于 2008-7-15 23:00:27 | 显示全部楼层
可能有这种要求:在一个服务器上,用户需要随时暂停客户端发过来连接请求,然后会继续接收请求!当然我们也可以通过其它方式处理这种问题。再说,我们想不到的情况并不代表没有。。。
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-12-22 17:09 , Processed in 0.036062 second(s), 5 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表