|
ClientHandler.h
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#ifndef CLIENT_HANDLER_H
#define CLIENT_HANDLER_H
#include <ace/OS.h>
#include <ace/Asynch_IO.h>
#include <ace/Atomic_Op.h>
#include <ace/Message_Block.h>
#include <ace/INet_Addr.h>
#include <ace/Recursive_Thread_Mutex.h>
class ClientAcceptor;
class ClientHandler : public ACE_Service_Handler
{
public:
ClientHandler(){}
~ClientHandler(){}
void Init();
virtual void open(ACE_HANDLE new_handle, ACE_Message_Block &message_block);
virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result);
virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result);
virtual void handle_time_out(const ACE_Time_Value &tv, const void *act = 0);
int initiate_read_stream();
int initiate_write_stream(ACE_Message_Block &mb, size_t numBytes);
private:
ACE_Asynch_Read_Stream mReader;
ACE_Asynch_Write_Stream mWriter;
ClientAcceptor* mAcceptor;
char mRemoteAddress[MAXHOSTNAMELEN];
u_short mRemotePort;
ACE_Message_Block* mReadMsgBlock;
time_t mLastNetIO;
ACE_Recursive_Thread_Mutex mLock;
};
#endif
ClientHandler.cpp
///////////////////////////////////////////////////////////////////////////////////////////////////
#include <ace/CDR_Stream.h>
#include "ClientHandler.h"
#include "Packet.h"
#include "ClientManager.h"
#include "MsgProcessTask.h"
void ClientHandler::open(ACE_HANDLE new_handle, ACE_Message_Block &message_block)
{
mLastNetIO = ACE_OS::time(NULL);
mIOCount = 0;
if (mReader.open(*this, this->handle()) == -1)
{
ACE_ERROR((LM_ERROR, ACE_TEXT("<%P|%t>ACE_Asynch_Read_Stream::open\n")));
}
else if (mWriter.open(*this, this->handle()) == -1)
{
ACE_ERROR((LM_ERROR, ACE_TEXT("<%P|%t>ACE_Asynch_Write_Stream::open\n")));
}
else
{
ACE_DEBUG((LM_DEBUG, ACE_TEXT("<%P|%t>ClientHandler::open, Remote ip is : %s\n"), mRemoteAddress));
initiate_read_stream();
}
}
void ClientHandler::handle_read_stream(const ACE_Asynch_Read_Stream::Result &result)
{
size_t bytesRecved = result.bytes_transferred();
if (result.success() && (bytesRecved != 0))
{
ACE_DEBUG((LM_DEBUG, ACE_TEXT("<%P|%t>ClientHandler::handle_read_stream complete : %d\n"), bytesRecved));
if (mReadMsgBlock->length() < sizeof(PacketHeader)) //消息头尚未接收完全
{
mReader.read(*mReadMsgBlock, mReadMsgBlock->space());
return;
}
PacketHeader *header = (PacketHeader *) (mReadMsgBlock->rd_ptr());
ACE_Message_Block * mb = mReadMsgBlock->cont(); //cont()为数据信息
if (!mb)
{
// 只是接收完长度信息
ACE_NEW (mb, ACE_Message_Block(header->mClientHeader.mDataLength));
mReadMsgBlock->cont (mb);
}
if (mb->length() == header->mClientHeader.mDataLength) // 数据已接收完,再继续接收下一个数据包
{
MsgProcessTaskSingleton::instance()->putq(mb);
initiate_read_stream();
return;
}
else // 否则继续接收该数据包
{
mReader.read (*mb, mb->space ());
}
}
else
{
}
}
void ClientHandler::handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
{
}
int ClientHandler::initiate_read_stream()
{
ACE_Guard<ACE_Recursive_Thread_Mutex> locker(mLock);
//先接收包头
int headerSize = sizeof(PacketHeader);
ACE_HANDLE handle = this->handle();
mReadMsgBlock->copy((const char*)&handle, sizeof(ACE_HANDLE));
if (mReader.read(*mReadMsgBlock, mReadMsgBlock->space()) == -1)
{
ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("<%P|%t>ACE_Asynch_Read_Stream::read")), -1);
}
mIOCount ++;
return 0;
}
int ClientHandler::initiate_write_stream(ACE_Message_Block &mb, size_t numBytes)
{
ACE_Guard<ACE_Recursive_Thread_Mutex> locker(mLock);
if (mWriter.write(mb, numBytes) == -1)
{
ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("<%P|%t>ACE_Asynch_Write_Stream::write")), -1);
}
return 0;
}
void ClientHandler::handle_time_out(const ACE_Time_Value &tv, const void *act)
{
ACE_DEBUG((LM_DEBUG, ACE_TEXT("<%P|%t>ClientHandler::handle_time_out")));
}
感觉handle_read_stream有问题,客户端循环发100次消息,服务器端只收到4,5次,而且客户端关闭了handle_read_stream也不会断点,请高人指点~ |
|