glen_dai 发表于 2008-8-6 16:02:30

服务器内存线性增长的问题,附工程~

ftp://glenndai@go3.icpcn.com/   用户名: glenndai密码: qazwsx123
用的是ace静态库,在xp下用vs2005 +ace5.6或者5.6.5都可以编译。

出现的问题是一旦有新连接,内存就会增长, 关闭连接之后内存也没有降。:(

wishel 发表于 2008-8-6 16:54:31

等不上去,看不到程序。
ACE_Asynch_Acceptor<Handler>的话注意,每次accept都会新产生一个Handler,需要自己释放。不像reactor可以在handle_*函数返回-1就会由reactor自动remove注册的handler。proactor是没有remove的,因为异步I/O,proactor framework无法确定何时I/O complete,然后安全释放handler。所以一定要用户自己在handle_*函数中自己判断是否结束并释放资源。

比如
if (!result.success () || result.bytes_transferred () == 0)
    delete this;

winston 发表于 2008-8-6 23:44:41

wishel 正解!就是这样用的。

glen_dai 发表于 2008-8-7 09:22:22

不会吧,应该能登上去的。
我是这样做的
if (!result.success () || result.bytes_transferred () == 0)
    HandlerMgr::instance()->ReleaseHandler(this);
将这个Handler回收,Acceptor里面的make_handler方法我重写成从HandlerMgr中取出一个空闲的Handler。

应该不是这里出的问题,我用Visual Leak Detector也没有发现这部分的内存泄露。

[ 本帖最后由 glen_dai 于 2008-8-7 09:30 编辑 ]

wishel 发表于 2008-8-7 11:31:12

我在公司的网络登不上去,等我晚上回家看看能不能上
你做了个Handler的池,用个singleton的HandlerMgr统一管理,有必要搞这么复杂么?新建和释放handler的overhead也不是很严重吧。
这个池有多大?太大的话会占用过多内存,小的话同时服务的用户数就受限制了。
accepter和handler的代码很大么,不太大的话贴上来看看吧

[ 本帖最后由 wishel 于 2008-8-7 11:38 编辑 ]

glen_dai 发表于 2008-8-7 12:18:17

#ifndef CLIENT_ACCEPTOR_H
#define CLIENT_ACCEPTOR_H
#include <ace/Asynch_Acceptor.h>
#include <ace/Recursive_Thread_Mutex.h>
#include "ClientHandler.h"

class ClientAcceptor : public ACE_Asynch_Acceptor<ClientHandler>
{
public:
ClientAcceptor();
~ClientAcceptor();
int Start(int port);
protected:
virtual ClientHandler* make_handler();
//private:
// ACE_Recursive_Thread_MutexmLock;
};
#endif

#include <ace/Null_Mutex.h>
#include "ClientAcceptor.h"
#include "ClientManager.h"

ClientAcceptor::ClientAcceptor()
:ACE_Asynch_Acceptor<ClientHandler> ()
{
ACE_DEBUG ((LM_TRACE, ACE_TEXT ("<%P|%t>ClientAcceptor::ClientAcceptor\n")));
}
ClientAcceptor::~ClientAcceptor()
{
ACE_DEBUG ((LM_TRACE, ACE_TEXT ("<%P|%t>ClientAcceptor::~ClientAcceptor\n")));
cancel();
ACE_OS::closesocket(handle());
}
int ClientAcceptor::Start(int port)
{
ACE_DEBUG ((LM_TRACE, ACE_TEXT ("<%P|%t>ClientAcceptor::Listen\n")));
ACE_INET_Addr addr(port);
if ( open(addr,//addr
0,//bytes_to_read
1,//pass addr
10
) == -1)
{
ACE_ERROR((LM_ERROR, "Acceptor Open Failure : %P\n"));
return -1;
}
return 0;
}
ClientHandler* ClientAcceptor::make_handler()
{
return ClientManagerSingleton::instance()->AllocClientHandler();
}

glen_dai 发表于 2008-8-7 12:21:32

ClientHandler.h
#ifndef CLIENT_HANDLER_H
#define CLIENT_HANDLER_H
#include <ace/OS.h>
#include <ace/Asynch_IO.h>
#include <ace/Atomic_Op.h>
#include <ace/Message_Block.h>
#include <ace/INet_Addr.h>
#include <ace/Recursive_Thread_Mutex.h>

class ClientHandler : public ACE_Service_Handler
{
public:
ClientHandler(){}
~ClientHandler(){}
void Init();
//回收前重置
void Reset();
//主动关闭连接
void Close();
// 当新连接进入时,由ACCEPTOR调用用于传送地址
virtual void addresses(const ACE_INET_Addr &remote_address, const ACE_INET_Addr &local_address);
// 当客户端发起连接请求被接受时被调用
virtual void open(ACE_HANDLE new_handle, ACE_Message_Block &message_block);
virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result);
virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result);
virtual void handle_time_out(const ACE_Time_Value &tv, const void *act = 0);
int initiate_read_stream();
int initiate_write_stream(ACE_Message_Block &mb, size_t numBytes);

private:
ACE_Asynch_Read_Stream mReader;
ACE_Asynch_Write_Stream mWriter;
char   mRemoteAddress;
u_short   mRemotePort;
ACE_Message_Block*mReadMsgBlock;
ACE_Recursive_Thread_Mutex mLock;
};
#endif

glen_dai 发表于 2008-8-7 12:25:56

ClientHandler.cpp
#include <ace/CDR_Stream.h>
#include "ClientHandler.h"
#include "Packet.h"
#include "ClientManager.h"
#include "MsgBlockManager.h"
#include "MsgProcessTask.h"
void ClientHandler::Init()
{
mReadMsgBlock = MsgBlockManagerSingleton::instance()->AllocMsgBlock();
mReadMsgBlock->rd_ptr(mReadMsgBlock->base());
mReadMsgBlock->wr_ptr(mReadMsgBlock->base());
this->handle(ACE_INVALID_HANDLE);
}
//回收前重置
void ClientHandler::Reset()
{
MsgBlockManagerSingleton::instance()->ReleaseMsgBlock(mReadMsgBlock);
mReadMsgBlock = NULL;
ACE_OS::shutdown(this->handle(), ACE_SHUTDOWN_BOTH);
ACE_OS::closesocket(this->handle());
this->handle(ACE_INVALID_HANDLE);
}
void ClientHandler::addresses(const ACE_INET_Addr &remote_address, const ACE_INET_Addr &local_address)
{
remote_address.addr_to_string (mRemoteAddress, MAXHOSTNAMELEN);
mRemotePort = remote_address.get_port_number();
}
//主动关闭连接
void ClientHandler::Close()
{
ClientManagerSingleton::instance()->ReleaseClientHandler(this);
ACE_OS::shutdown(this->handle(), ACE_SHUTDOWN_BOTH);
ACE_OS::closesocket(this->handle());
}
void ClientHandler::open(ACE_HANDLE new_handle, ACE_Message_Block &message_block)
{
if (mReader.open(*this, this->handle()) == -1)
{
ACE_ERROR((LM_ERROR, ACE_TEXT("<%P|%t>ACE_Asynch_Read_Stream::open\n")));
}
else if (mWriter.open(*this, this->handle()) == -1)
{
ACE_ERROR((LM_ERROR, ACE_TEXT("<%P|%t>ACE_Asynch_Write_Stream::open\n")));
}
else
{
ACE_DEBUG((LM_DEBUG, ACE_TEXT("<%P|%t>ClientHandler::open, Remote ip is : %s\n"), mRemoteAddress));
initiate_read_stream();
}
ClientManagerSingleton::instance()->ActiveClient(this->handle(), this);
}
void ClientHandler::handle_read_stream(const ACE_Asynch_Read_Stream::Result &result)
{
size_t bytesRecved = result.bytes_transferred();
if (result.success() && (bytesRecved != 0))
{
ACE_DEBUG((LM_DEBUG, ACE_TEXT("<%P|%t>ClientHandler::handle_read_stream handle=%d complete : %d\n"),
   handle(), bytesRecved));
if (mReadMsgBlock->length() < sizeof(PacketHeader)) //消息头尚未接收完全
{
   ACE_Guard<ACE_Recursive_Thread_Mutex> locker(mLock);
   mReader.read(*mReadMsgBlock, sizeof(PacketHeader) - mReadMsgBlock->length());
   return;
}
PacketHeader *header = (PacketHeader *) (mReadMsgBlock->rd_ptr());
ACE_Message_Block * mb = mReadMsgBlock->cont(); //cont()为数据信息
if (!mb)
{
   // 只是接收完长度信息
   ACE_Guard<ACE_Recursive_Thread_Mutex> locker(mLock);
   mb = MsgBlockManagerSingleton::instance()->AllocMsgBlock(); //这个mb在MsgProcessTask处理完数据后回收
   mReadMsgBlock->cont (mb);
}
if (mb->length() == header->mClientHeader.mDataLength) // 数据已接收完,再继续接收下一个数据包
{
   ACE_Guard<ACE_Recursive_Thread_Mutex> locker(mLock);
   MsgProcessTaskSingleton::instance()->putq(mb);
   mReadMsgBlock->reset();
   mReadMsgBlock->cont(NULL);
   initiate_read_stream();
   return;
}
else // 否则继续接收该数据包
{
   mReader.read (*mb, header->mClientHeader.mDataLength);
}
}
else
{
ACE_DEBUG((LM_DEBUG,
   ACE_TEXT("<%P|%t>ClientHandler, remote: %s handle=%d shutdown \n"), mRemoteAddress, handle()));
ClientManagerSingleton::instance()->ReleaseClientHandler(this);
}
}
void ClientHandler::handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
{
}
int ClientHandler::initiate_read_stream()
{
ACE_Guard<ACE_Recursive_Thread_Mutex> locker(mLock);
//先接收包头
int headerSize = sizeof(PacketHeader);
ACE_HANDLE handle = this->handle();
mReadMsgBlock->copy((const char*)&handle, sizeof(ACE_HANDLE));
if (mReader.read(*mReadMsgBlock, sizeof(PacketHeader::PacketHeaderFromClient)) == -1)
{
ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("<%P|%t>ACE_Asynch_Read_Stream::read")), -1);
}

return 0;
}
int ClientHandler::initiate_write_stream(ACE_Message_Block &mb, size_t numBytes)
{
ACE_Guard<ACE_Recursive_Thread_Mutex> locker(mLock);
if (mWriter.write(mb, numBytes) == -1)
{
ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("<%P|%t>ACE_Asynch_Write_Stream::write")), -1);
}
return 0;
}
void ClientHandler::handle_time_out(const ACE_Time_Value &tv, const void *act)
{
ACE_DEBUG((LM_DEBUG, ACE_TEXT("<%P|%t>ClientHandler::handle_time_out")));
}

glen_dai 发表于 2008-8-7 12:50:27

Handler预先new了5000, msgblock也是5000

wishel 发表于 2008-8-7 14:19:16


HandlerMgr::instance()->ReleaseHandler(this);
前面再加一句:
MsgBlockManagerSingleton::instance()->ReleaseMsgBlock(&result.message_block());
把message block也释放了。
如果还不行的话,要看看你设计的几个缓冲池对资源的分配和释放是否正确了。

另外,个人对内存池的做法保留意见。
pool的作用是用空间换时间。通常用在较大overhead的场合,比如数据库连接池,线程池,因为新建数据库连接和线程都会花较长时间。预先建好的花可以缩短系统响应时间。代价是空间的提前分配,预分配太大有可能用不到,如果pool的太小且难以动态扩大的话,会限制系统并发处理能力。
而内存访问显然是很迅速的,没有较大overhead。所以内存池不大适用。这样无谓的增加了系统的复杂性,也就是所谓的over engineering。
页: [1] 2 3
查看完整版本: 服务器内存线性增长的问题,附工程~