|
楼主 |
发表于 2008-6-19 15:16:31
|
显示全部楼层
client_handler类...
client_handler.h
#ifndef __CLIENT_HANDLER_H__
#define __CLIENT_HANDLER_H__
#include "ace/Svc_Handler.h"
#include "ace/SOCK_Dgram.h"
#include "ace/SOCK_Stream.h"
#include "ace/SOCK_Connector.h"
#include "ace/mutex.h"
#include "ace/Hash_Map_Manager.h"
class Client_Acceptor;
class Thread_Pool;
typedef ACE_thread_t THREAD;
typedef ACE_Hash_Map_Manager_Ex<THREAD,int,ACE_Hash<THREAD>,ACE_Equal_To<THREAD>,ACE_Null_Mutex> HASH_MAP_PTCL_THREAD;
class Client_Handler : public ACE_Svc_Handler <ACE_SOCK_STREAM, ACE_MT_SYNCH>
{
public:
typedef ACE_Svc_Handler <ACE_SOCK_STREAM, ACE_MT_SYNCH> inherited;
Client_Handler (void);
virtual ~Client_Handler (void);
void destroy (void);
int open (void *acceptor);
int close (u_long flags = 0);
int handle_close (ACE_HANDLE handle,ACE_Reactor_Mask mask);
int handle_input (ACE_HANDLE handle);
inline int GetThread() { return thread_; }
inline void SetThread(int thread){thread_ = thread;}
int get_ptcl_thread(ACE_thread_t thread);
//根据消息类型分派消息
int MsgHandle( char* pBuf );
protected:
//每连接每线程方式下会执行svc
int svc (void);
//分离出逻辑处理函数,方便后续的多线程并发加锁
int process (char *rdbuf, int rdbuf_len);
//返回acceptor
Client_Acceptor *client_acceptor (void)
{
return this->client_acceptor_;
}
void client_acceptor (Client_Acceptor *_client_acceptor)
{
this->client_acceptor_ = _client_acceptor;
}
int concurrency(void);
Thread_Pool * thread_pool (void);
Client_Acceptor *client_acceptor_;
ACE_thread_t creator_;
ACE_Mutex mutex;
HASH_MAP_PTCL_THREAD map_ptcl_thread_;//线程安全
int thread_; //线程序号
int index_; //线程编号
};
#endif
----------------------------------------------------------------------------------------------------------------
client_handler.cpp:
#include "client_acceptor.h"
#include "client_handler.h"
#include "../Common/config.h"
#include "mmsystem.h"
extern ACE_INET_Addr *LOGIN_SERVER_ADDR; //导入逻辑服务器ADDR
extern SERVER connect_server; //导入连接服务器
int nTestNum = 0;
//构造函数
Client_Handler::Client_Handler (void)
: client_acceptor_(0),
creator_ (ACE_Thread::self ())
{
thread_=0;
index_=0;
}
//析够函数
Client_Handler::~Client_Handler (void)
{
this->peer().close();
}
//获得并行方式,同时检测acceptor是否已存在,并非优雅的方法。
int Client_Handler::concurrency(void)
{
return this->client_acceptor()->concurrency();
}
//返回当前使用的线程池
Thread_Pool* Client_Handler::thread_pool (void)
{
return this->client_acceptor ()->thread_pool ();
}
//打开
int Client_Handler::open (void *acceptor)
{
client_acceptor ((Client_Acceptor *) acceptor);
if (concurrency () == Client_Acceptor::thread_per_connection_)
{
return this->activate (THR_DETACHED);
}
//连接
this->reactor (client_acceptor()->reactor ());
ACE_INET_Addr addr;
//获得客户端的ADDR
if (this->peer ().get_remote_addr (addr) == -1)
{
return -1;
}
//打印IP和端口号
ACE_DEBUG((LM_DEBUG,"用户的IP为:%s;用户的端口号为:%d\n",addr.get_host_addr(),addr.get_port_number()));
//注册句炳
if (this->reactor ()->register_handler (this,REGISTER_MASK) == -1)
{
ACE_ERROR_RETURN ((LM_ERROR,"(%P|%t) can't register with reactor\n"),-1);
}
return 0;
}
//WINDOWS下应该使用DONT_CALL标志防止事件再次分发过来
void Client_Handler::destroy (void)
{
this->reactor ()->remove_handler (this, REMOVE_MASK);
delete this;
//关闭并释放资源
}
int Client_Handler::close (u_long flags)
{
ACE_UNUSED_ARG(flags);
//从reactor清除自身并释放内存
this->destroy ();
//不用显示调用父类的函数,已经自动调用了。
return 0;
}
//当handle_input返回-1时,会调用handle_close来防止内存泄漏。
int Client_Handler::handle_close (ACE_HANDLE handle,ACE_Reactor_Mask mask)
{
ACE_UNUSED_ARG (handle);
ACE_UNUSED_ARG (mask);
delete this;
return 0;
}
int Client_Handler::handle_input (ACE_HANDLE handle)
{
ACE_UNUSED_ARG (handle);
//并发方式如果是线程池,并且创建此handler的线程,reactor肯定是调用了此handler
if (concurrency () == Client_Acceptor::thread_pool_)
{
if (ACE_OS::thr_equal (ACE_Thread::self(),creator_))
{
this->reactor ()->remove_handler (this, REMOVE_MASK);
return this->thread_pool ()->enqueue (this);
}
}
//如果不是单连接单线程的方式,都会执行这段代码
char buf[MAX_BUFFER_SIZE];
ACE_OS::memset( buf, 0, sizeof(buf) );
int rval=0;
ACE_SEH_TRY
{
//保存对数据的处理结果,单不立刻返回
rval = this->process ( buf, MAX_BUFFER_SIZE );
}
ACE_SEH_EXCEPT (EXCEPTION_EXECUTE_HANDLER)
{
ACE_DEBUG((LM_DEBUG,"服务器处理客户端逻辑出现错误--1\n"));
}
if (concurrency () == Client_Acceptor::thread_pool_)
{
if (rval != -1)
{
this->reactor ()->register_handler (this, REGISTER_MASK);
}
}
return rval;
}
//如果svc方法返回,close方法要注意释放handler。
int Client_Handler::svc (void)
{
char buf[MAX_BUFFER_SIZE];
ACE_OS::memset( buf, 0, sizeof(buf) );
ACE_SEH_TRY
{
while (true)
{
if (this->process (buf, sizeof (buf)) == -1)
{
return -1;
}
}
}
ACE_SEH_EXCEPT (EXCEPTION_EXECUTE_HANDLER)
{
ACE_DEBUG((LM_DEBUG,"服务器处理客户端逻辑出现错误--2\n"));
}
return 0;
}
//网络数据包的实际处理,线程间并没有共享任何数据。
int Client_Handler::process (char *rdbuf,int rdbuf_len)
{
//此次服务器接收到的字节数
ssize_t bytes_read;
char buf[ MAX_BUFFER_SIZE ];
//ACE_OS::memset( buf, 0, sizeof(buf) );
//程序在这处理接收数据和发送数据
switch ( ( bytes_read = this->peer().recv(buf, MAX_BUFFER_SIZE) ) )
{
case -1:
{
ACE_ERROR_RETURN ( ( LM_ERROR, "(%P|%t) 数据接收错误!--1\n" ), -1 );
}break;
case 0:
{
ACE_ERROR_RETURN ( ( LM_ERROR, "(%P|%t) 客户端断开连接--2\n" ), -1 );
}break;
default:
{
ACE_DEBUG ((LM_DEBUG,"(%P|%t):client: %s\n", buf ));
ACE_DEBUG ((LM_DEBUG,"(%P|%t):client: %d\n", nTestNum++ ));
MsgHandle( buf );
// this->peer().send( buf, sizeof(buf) );
}break;
}
return 100;
}
//转换线程编号
int Client_Handler::get_ptcl_thread(ACE_thread_t thread)
{
//定义返回值和索引
int ret=0,found_index=0;
//
if(!map_ptcl_thread_.find(thread,found_index))
{
ret = found_index;
}
else
{
map_ptcl_thread_.rebind(thread,index_);
ret = index_;
index_++;
}
return ret;
}
/***********************************************************************/
// 处理登陆消息
// 参数:客户端发过来信息.
/***********************************************************************/
int Client_Handler::MsgHandle( char* pBuf )
{
this->peer().send( pBuf, sizeof(pBuf) );
return 0;
} |
|