|
楼主 |
发表于 2008-8-7 12:25:56
|
显示全部楼层
ClientHandler.cpp- #include <ace/CDR_Stream.h>
- #include "ClientHandler.h"
- #include "Packet.h"
- #include "ClientManager.h"
- #include "MsgBlockManager.h"
- #include "MsgProcessTask.h"
- void ClientHandler::Init()
- {
- mReadMsgBlock = MsgBlockManagerSingleton::instance()->AllocMsgBlock();
- mReadMsgBlock->rd_ptr(mReadMsgBlock->base());
- mReadMsgBlock->wr_ptr(mReadMsgBlock->base());
- this->handle(ACE_INVALID_HANDLE);
- }
- //回收前重置
- void ClientHandler::Reset()
- {
- MsgBlockManagerSingleton::instance()->ReleaseMsgBlock(mReadMsgBlock);
- mReadMsgBlock = NULL;
- ACE_OS::shutdown(this->handle(), ACE_SHUTDOWN_BOTH);
- ACE_OS::closesocket(this->handle());
- this->handle(ACE_INVALID_HANDLE);
- }
- void ClientHandler::addresses(const ACE_INET_Addr &remote_address, const ACE_INET_Addr &local_address)
- {
- remote_address.addr_to_string (mRemoteAddress, MAXHOSTNAMELEN);
- mRemotePort = remote_address.get_port_number();
- }
- //主动关闭连接
- void ClientHandler::Close()
- {
- ClientManagerSingleton::instance()->ReleaseClientHandler(this);
- ACE_OS::shutdown(this->handle(), ACE_SHUTDOWN_BOTH);
- ACE_OS::closesocket(this->handle());
- }
- void ClientHandler::open(ACE_HANDLE new_handle, ACE_Message_Block &message_block)
- {
- if (mReader.open(*this, this->handle()) == -1)
- {
- ACE_ERROR((LM_ERROR, ACE_TEXT("<%P|%t>ACE_Asynch_Read_Stream::open\n")));
- }
- else if (mWriter.open(*this, this->handle()) == -1)
- {
- ACE_ERROR((LM_ERROR, ACE_TEXT("<%P|%t>ACE_Asynch_Write_Stream::open\n")));
- }
- else
- {
- ACE_DEBUG((LM_DEBUG, ACE_TEXT("<%P|%t>ClientHandler::open, Remote ip is : %s\n"), mRemoteAddress));
- initiate_read_stream();
- }
- ClientManagerSingleton::instance()->ActiveClient(this->handle(), this);
- }
- void ClientHandler::handle_read_stream(const ACE_Asynch_Read_Stream::Result &result)
- {
- size_t bytesRecved = result.bytes_transferred();
- if (result.success() && (bytesRecved != 0))
- {
- ACE_DEBUG((LM_DEBUG, ACE_TEXT("<%P|%t>ClientHandler::handle_read_stream handle=%d complete : %d\n"),
- handle(), bytesRecved));
- if (mReadMsgBlock->length() < sizeof(PacketHeader)) //消息头尚未接收完全
- {
- ACE_Guard<ACE_Recursive_Thread_Mutex> locker(mLock);
- mReader.read(*mReadMsgBlock, sizeof(PacketHeader) - mReadMsgBlock->length());
- return;
- }
- PacketHeader *header = (PacketHeader *) (mReadMsgBlock->rd_ptr());
- ACE_Message_Block * mb = mReadMsgBlock->cont(); //cont()为数据信息
- if (!mb)
- {
- // 只是接收完长度信息
- ACE_Guard<ACE_Recursive_Thread_Mutex> locker(mLock);
- mb = MsgBlockManagerSingleton::instance()->AllocMsgBlock(); //这个mb在MsgProcessTask处理完数据后回收
- mReadMsgBlock->cont (mb);
- }
- if (mb->length() == header->mClientHeader.mDataLength) // 数据已接收完,再继续接收下一个数据包
- {
- ACE_Guard<ACE_Recursive_Thread_Mutex> locker(mLock);
- MsgProcessTaskSingleton::instance()->putq(mb);
- mReadMsgBlock->reset();
- mReadMsgBlock->cont(NULL);
- initiate_read_stream();
- return;
- }
- else // 否则继续接收该数据包
- {
- mReader.read (*mb, header->mClientHeader.mDataLength);
- }
- }
- else
- {
- ACE_DEBUG((LM_DEBUG,
- ACE_TEXT("<%P|%t>ClientHandler, remote: %s handle=%d shutdown \n"), mRemoteAddress, handle()));
- ClientManagerSingleton::instance()->ReleaseClientHandler(this);
- }
- }
- void ClientHandler::handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
- {
- }
- int ClientHandler::initiate_read_stream()
- {
- ACE_Guard<ACE_Recursive_Thread_Mutex> locker(mLock);
- //先接收包头
- int headerSize = sizeof(PacketHeader);
- ACE_HANDLE handle = this->handle();
- mReadMsgBlock->copy((const char*)&handle, sizeof(ACE_HANDLE));
- if (mReader.read(*mReadMsgBlock, sizeof(PacketHeader::PacketHeaderFromClient)) == -1)
- {
- ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("<%P|%t>ACE_Asynch_Read_Stream::read")), -1);
- }
- return 0;
- }
- int ClientHandler::initiate_write_stream(ACE_Message_Block &mb, size_t numBytes)
- {
- ACE_Guard<ACE_Recursive_Thread_Mutex> locker(mLock);
- if (mWriter.write(mb, numBytes) == -1)
- {
- ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("<%P|%t>ACE_Asynch_Write_Stream::write")), -1);
- }
- return 0;
- }
- void ClientHandler::handle_time_out(const ACE_Time_Value &tv, const void *act)
- {
- ACE_DEBUG((LM_DEBUG, ACE_TEXT("<%P|%t>ClientHandler::handle_time_out")));
- }
复制代码 |
|