找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 13352|回复: 25

服务器内存线性增长的问题,附工程~

[复制链接]
发表于 2008-8-6 16:02:30 | 显示全部楼层 |阅读模式
ftp://glenndai@go3.icpcn.com/     用户名: glenndai  密码: qazwsx123  
用的是ace静态库,在xp下用vs2005 +ace5.6或者5.6.5都可以编译。

出现的问题是一旦有新连接,内存就会增长, 关闭连接之后内存也没有降。:(
发表于 2008-8-6 16:54:31 | 显示全部楼层
等不上去,看不到程序。
ACE_Asynch_Acceptor<Handler>的话注意,每次accept都会新产生一个Handler,需要自己释放。不像reactor可以在handle_*函数返回-1就会由reactor自动remove注册的handler。proactor是没有remove的,因为异步I/O,proactor framework无法确定何时I/O complete,然后安全释放handler。所以一定要用户自己在handle_*函数中自己判断是否结束并释放资源。

比如
if (!result.success () || result.bytes_transferred () == 0)
    delete this;
发表于 2008-8-6 23:44:41 | 显示全部楼层
wishel 正解!就是这样用的。
 楼主| 发表于 2008-8-7 09:22:22 | 显示全部楼层
不会吧,应该能登上去的。  
我是这样做的  
if (!result.success () || result.bytes_transferred () == 0)
    HandlerMgr::instance()->ReleaseHandler(this);
将这个Handler回收,Acceptor里面的make_handler方法我重写成从HandlerMgr中取出一个空闲的Handler。

应该不是这里出的问题,我用Visual Leak Detector也没有发现这部分的内存泄露。

[ 本帖最后由 glen_dai 于 2008-8-7 09:30 编辑 ]
发表于 2008-8-7 11:31:12 | 显示全部楼层
我在公司的网络登不上去,等我晚上回家看看能不能上
你做了个Handler的池,用个singleton的HandlerMgr统一管理,有必要搞这么复杂么?新建和释放handler的overhead也不是很严重吧。
这个池有多大?太大的话会占用过多内存,小的话同时服务的用户数就受限制了。
accepter和handler的代码很大么,不太大的话贴上来看看吧

[ 本帖最后由 wishel 于 2008-8-7 11:38 编辑 ]
 楼主| 发表于 2008-8-7 12:18:17 | 显示全部楼层
  1. #ifndef CLIENT_ACCEPTOR_H
  2. #define CLIENT_ACCEPTOR_H
  3. #include <ace/Asynch_Acceptor.h>
  4. #include <ace/Recursive_Thread_Mutex.h>
  5. #include "ClientHandler.h"
  6. class ClientAcceptor : public ACE_Asynch_Acceptor<ClientHandler>
  7. {
  8. public:
  9. ClientAcceptor();
  10. ~ClientAcceptor();
  11. int Start(int port);
  12. protected:
  13. virtual ClientHandler* make_handler();
  14. //private:
  15. // ACE_Recursive_Thread_Mutex  mLock;
  16. };
  17. #endif
复制代码
  1. #include <ace/Null_Mutex.h>
  2. #include "ClientAcceptor.h"
  3. #include "ClientManager.h"
  4. ClientAcceptor::ClientAcceptor()
  5. :ACE_Asynch_Acceptor<ClientHandler> ()
  6. {
  7. ACE_DEBUG ((LM_TRACE, ACE_TEXT ("<%P|%t>ClientAcceptor::ClientAcceptor\n")));
  8. }
  9. ClientAcceptor::~ClientAcceptor()
  10. {
  11. ACE_DEBUG ((LM_TRACE, ACE_TEXT ("<%P|%t>ClientAcceptor::~ClientAcceptor\n")));
  12. cancel();
  13. ACE_OS::closesocket(handle());
  14. }
  15. int ClientAcceptor::Start(int port)
  16. {
  17. ACE_DEBUG ((LM_TRACE, ACE_TEXT ("<%P|%t>ClientAcceptor::Listen\n")));
  18. ACE_INET_Addr addr(port);
  19. if ( open(addr,//addr
  20.   0,//bytes_to_read
  21.   1,//pass addr
  22.   10
  23.   ) == -1)
  24. {
  25.   ACE_ERROR((LM_ERROR, "Acceptor Open Failure : %P\n"));
  26.   return -1;
  27. }
  28. return 0;
  29. }
  30. ClientHandler* ClientAcceptor::make_handler()
  31. {
  32. return ClientManagerSingleton::instance()->AllocClientHandler();
  33. }
复制代码
 楼主| 发表于 2008-8-7 12:21:32 | 显示全部楼层
ClientHandler.h
  1. #ifndef CLIENT_HANDLER_H
  2. #define CLIENT_HANDLER_H
  3. #include <ace/OS.h>
  4. #include <ace/Asynch_IO.h>
  5. #include <ace/Atomic_Op.h>
  6. #include <ace/Message_Block.h>
  7. #include <ace/INet_Addr.h>
  8. #include <ace/Recursive_Thread_Mutex.h>
  9. class ClientHandler : public ACE_Service_Handler
  10. {
  11. public:
  12. ClientHandler(){}
  13. ~ClientHandler(){}
  14. void Init();
  15. //回收前重置
  16. void Reset();
  17. //主动关闭连接
  18. void Close();
  19. // 当新连接进入时,由ACCEPTOR调用用于传送地址
  20. virtual void addresses(const ACE_INET_Addr &remote_address, const ACE_INET_Addr &local_address);
  21. // 当客户端发起连接请求被接受时被调用
  22. virtual void open(ACE_HANDLE new_handle, ACE_Message_Block &message_block);
  23. virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result);
  24. virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result);
  25. virtual void handle_time_out(const ACE_Time_Value &tv, const void *act = 0);
  26. int initiate_read_stream();
  27. int initiate_write_stream(ACE_Message_Block &mb, size_t numBytes);
  28. private:
  29. ACE_Asynch_Read_Stream mReader;
  30. ACE_Asynch_Write_Stream mWriter;
  31. char     mRemoteAddress[MAXHOSTNAMELEN];
  32. u_short     mRemotePort;
  33. ACE_Message_Block*  mReadMsgBlock;
  34. ACE_Recursive_Thread_Mutex mLock;
  35. };
  36. #endif
复制代码
 楼主| 发表于 2008-8-7 12:25:56 | 显示全部楼层
ClientHandler.cpp
  1. #include <ace/CDR_Stream.h>
  2. #include "ClientHandler.h"
  3. #include "Packet.h"
  4. #include "ClientManager.h"
  5. #include "MsgBlockManager.h"
  6. #include "MsgProcessTask.h"
  7. void ClientHandler::Init()
  8. {
  9. mReadMsgBlock = MsgBlockManagerSingleton::instance()->AllocMsgBlock();
  10. mReadMsgBlock->rd_ptr(mReadMsgBlock->base());
  11. mReadMsgBlock->wr_ptr(mReadMsgBlock->base());
  12. this->handle(ACE_INVALID_HANDLE);
  13. }
  14. //回收前重置
  15. void ClientHandler::Reset()
  16. {
  17. MsgBlockManagerSingleton::instance()->ReleaseMsgBlock(mReadMsgBlock);
  18. mReadMsgBlock = NULL;
  19. ACE_OS::shutdown(this->handle(), ACE_SHUTDOWN_BOTH);
  20. ACE_OS::closesocket(this->handle());
  21. this->handle(ACE_INVALID_HANDLE);
  22. }
  23. void ClientHandler::addresses(const ACE_INET_Addr &remote_address, const ACE_INET_Addr &local_address)
  24. {
  25. remote_address.addr_to_string (mRemoteAddress, MAXHOSTNAMELEN);
  26. mRemotePort = remote_address.get_port_number();
  27. }
  28. //主动关闭连接
  29. void ClientHandler::Close()
  30. {
  31. ClientManagerSingleton::instance()->ReleaseClientHandler(this);
  32. ACE_OS::shutdown(this->handle(), ACE_SHUTDOWN_BOTH);
  33. ACE_OS::closesocket(this->handle());
  34. }
  35. void ClientHandler::open(ACE_HANDLE new_handle, ACE_Message_Block &message_block)
  36. {
  37. if (mReader.open(*this, this->handle()) == -1)
  38. {
  39.   ACE_ERROR((LM_ERROR, ACE_TEXT("<%P|%t>ACE_Asynch_Read_Stream::open\n")));
  40. }
  41. else if (mWriter.open(*this, this->handle()) == -1)
  42. {
  43.   ACE_ERROR((LM_ERROR, ACE_TEXT("<%P|%t>ACE_Asynch_Write_Stream::open\n")));
  44. }
  45. else
  46. {
  47.   ACE_DEBUG((LM_DEBUG, ACE_TEXT("<%P|%t>ClientHandler::open, Remote ip is : %s\n"), mRemoteAddress));
  48.   initiate_read_stream();
  49. }
  50. ClientManagerSingleton::instance()->ActiveClient(this->handle(), this);
  51. }
  52. void ClientHandler::handle_read_stream(const ACE_Asynch_Read_Stream::Result &result)
  53. {
  54. size_t bytesRecved = result.bytes_transferred();
  55. if (result.success() && (bytesRecved != 0))
  56. {
  57.   ACE_DEBUG((LM_DEBUG, ACE_TEXT("<%P|%t>ClientHandler::handle_read_stream handle=%d complete : %d\n"),
  58.      handle(), bytesRecved));
  59.   if (mReadMsgBlock->length() < sizeof(PacketHeader)) //消息头尚未接收完全
  60.   {
  61.    ACE_Guard<ACE_Recursive_Thread_Mutex> locker(mLock);
  62.    mReader.read(*mReadMsgBlock, sizeof(PacketHeader) - mReadMsgBlock->length());
  63.    return;
  64.   }
  65.   PacketHeader *header = (PacketHeader *) (mReadMsgBlock->rd_ptr());
  66.   ACE_Message_Block * mb = mReadMsgBlock->cont(); //cont()为数据信息
  67.   if (!mb)
  68.   {
  69.    // 只是接收完长度信息
  70.    ACE_Guard<ACE_Recursive_Thread_Mutex> locker(mLock);
  71.    mb = MsgBlockManagerSingleton::instance()->AllocMsgBlock(); //这个mb在MsgProcessTask处理完数据后回收
  72.    mReadMsgBlock->cont (mb);
  73.   }
  74.   if (mb->length() == header->mClientHeader.mDataLength) // 数据已接收完,再继续接收下一个数据包
  75.   {
  76.    ACE_Guard<ACE_Recursive_Thread_Mutex> locker(mLock);
  77.    MsgProcessTaskSingleton::instance()->putq(mb);
  78.    mReadMsgBlock->reset();
  79.    mReadMsgBlock->cont(NULL);
  80.    initiate_read_stream();
  81.    return;
  82.   }
  83.   else // 否则继续接收该数据包
  84.   {
  85.    mReader.read (*mb, header->mClientHeader.mDataLength);
  86.   }
  87. }
  88. else
  89. {
  90.   ACE_DEBUG((LM_DEBUG,
  91.    ACE_TEXT("<%P|%t>ClientHandler, remote: %s handle=%d shutdown \n"), mRemoteAddress, handle()));
  92.   ClientManagerSingleton::instance()->ReleaseClientHandler(this);
  93. }
  94. }
  95. void ClientHandler::handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
  96. {
  97. }
  98. int ClientHandler::initiate_read_stream()
  99. {
  100. ACE_Guard<ACE_Recursive_Thread_Mutex> locker(mLock);
  101. //先接收包头
  102. int headerSize = sizeof(PacketHeader);
  103. ACE_HANDLE handle = this->handle();
  104. mReadMsgBlock->copy((const char*)&handle, sizeof(ACE_HANDLE));
  105. if (mReader.read(*mReadMsgBlock, sizeof(PacketHeader::PacketHeaderFromClient)) == -1)
  106. {
  107.   ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("<%P|%t>ACE_Asynch_Read_Stream::read")), -1);
  108. }
  109. return 0;
  110. }
  111. int ClientHandler::initiate_write_stream(ACE_Message_Block &mb, size_t numBytes)
  112. {
  113. ACE_Guard<ACE_Recursive_Thread_Mutex> locker(mLock);
  114. if (mWriter.write(mb, numBytes) == -1)
  115. {
  116.   ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("<%P|%t>ACE_Asynch_Write_Stream::write")), -1);
  117. }
  118. return 0;
  119. }
  120. void ClientHandler::handle_time_out(const ACE_Time_Value &tv, const void *act)
  121. {
  122. ACE_DEBUG((LM_DEBUG, ACE_TEXT("<%P|%t>ClientHandler::handle_time_out")));
  123. }
复制代码
 楼主| 发表于 2008-8-7 12:50:27 | 显示全部楼层
Handler预先new了5000, msgblock也是5000
发表于 2008-8-7 14:19:16 | 显示全部楼层

HandlerMgr::instance()->ReleaseHandler(this);
前面再加一句:
MsgBlockManagerSingleton::instance()->ReleaseMsgBlock(&result.message_block());
把message block也释放了。
如果还不行的话,要看看你设计的几个缓冲池对资源的分配和释放是否正确了。

另外,个人对内存池的做法保留意见。
pool的作用是用空间换时间。通常用在较大overhead的场合,比如数据库连接池,线程池,因为新建数据库连接和线程都会花较长时间。预先建好的花可以缩短系统响应时间。代价是空间的提前分配,预分配太大有可能用不到,如果pool的太小且难以动态扩大的话,会限制系统并发处理能力。
而内存访问显然是很迅速的,没有较大overhead。所以内存池不大适用。这样无谓的增加了系统的复杂性,也就是所谓的over engineering。
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-11-21 20:56 , Processed in 0.022299 second(s), 5 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表