初学者提问...
1) 服务器 用 recv_n函数 程序就无法向下执行了??? recv函数就可以正常执行?2) 客户端连接后,发送消息,然后马上关闭...服务器会出错???--服务器也是ACE架构,线程池方式实现的...
有时间的朋友麻烦进来解答一下...谢谢了... 1、recv_n是要求一直收到指定的字节数后才能返回,程序收不到,当然要等待了。
2、服务器代码有错误,不健壮。 用ACE框架做程序,不代表不会出错。相反,它要求你对基础的东西理解深刻,否则一定错误很多。 int Client_Handler::process (char *rdbuf,int rdbuf_len)
{
//此次服务器接收到的字节数
ssize_t bytes_read;
char* buf = new char[ 1024 ];
//程序在这处理接收数据和发送数据
switch ( (bytes_read = this->peer().recv(buf, 1024 )))
{
/***********************************************************************/
//如果数据读取错误
/***********************************************************************/
case -1:
ACE_ERROR_RETURN ((LM_ERROR,"(%P|%t) %p bad read\n","client"),-1);
break;
/***********************************************************************/
//如果服务器关闭
/***********************************************************************/
case 0:
{//注1.
ACE_ERROR_RETURN ((LM_ERROR,"(%P|%t) closing daemon (fd = %d)\n",this->peer().get_handle()),-1);
}
break;
/***********************************************************************/
//数据正常接收
/***********************************************************************/
default:
{//注2.
ACE_DEBUG ((LM_DEBUG,"(%P|%t):client: %s\n",buf));
ACE_DEBUG ((LM_DEBUG,"(%P|%t):client: %d\n", nTestNum++ ));//记录正确收到的消息数量.
MsgHandle( buf ); //处理消息函数.
}break;
}
return 100;
}
其他的地方都没有动...只是这里添加了对收到数据的处理.
但是在执行的时候(客户端发送 先正常的数据包,紧接着马上断开连接)...会先进入到 //注2 分支...但是在执行第一行后...马上会再跳到 //注1 里面去执行...
这样执行的顺序 是正常的吗? 代码不全面,看不出问题。
client_handler类...
client_handler.h#ifndef __CLIENT_HANDLER_H__
#define __CLIENT_HANDLER_H__
#include "ace/Svc_Handler.h"
#include "ace/SOCK_Dgram.h"
#include "ace/SOCK_Stream.h"
#include "ace/SOCK_Connector.h"
#include "ace/mutex.h"
#include "ace/Hash_Map_Manager.h"
class Client_Acceptor;
class Thread_Pool;
typedef ACE_thread_tTHREAD;
typedef ACE_Hash_Map_Manager_Ex<THREAD,int,ACE_Hash<THREAD>,ACE_Equal_To<THREAD>,ACE_Null_Mutex> HASH_MAP_PTCL_THREAD;
class Client_Handler : public ACE_Svc_Handler <ACE_SOCK_STREAM, ACE_MT_SYNCH>
{
public:
typedef ACE_Svc_Handler <ACE_SOCK_STREAM, ACE_MT_SYNCH> inherited;
Client_Handler (void);
virtual ~Client_Handler (void);
void destroy (void);
int open (void *acceptor);
int close (u_long flags = 0);
int handle_close (ACE_HANDLE handle,ACE_Reactor_Mask mask);
int handle_input (ACE_HANDLE handle);
inline int GetThread() { return thread_; }
inline void SetThread(int thread){thread_ = thread;}
int get_ptcl_thread(ACE_thread_t thread);
//根据消息类型分派消息
int MsgHandle( char* pBuf );
protected:
//每连接每线程方式下会执行svc
int svc (void);
//分离出逻辑处理函数,方便后续的多线程并发加锁
int process (char *rdbuf, int rdbuf_len);
//返回acceptor
Client_Acceptor *client_acceptor (void)
{
return this->client_acceptor_;
}
void client_acceptor (Client_Acceptor *_client_acceptor)
{
this->client_acceptor_ = _client_acceptor;
}
int concurrency(void);
Thread_Pool * thread_pool (void);
Client_Acceptor *client_acceptor_;
ACE_thread_t creator_;
ACE_Mutex mutex;
HASH_MAP_PTCL_THREAD map_ptcl_thread_;//线程安全
int thread_; //线程序号
int index_; //线程编号
};
#endif
----------------------------------------------------------------------------------------------------------------
client_handler.cpp:
#include "client_acceptor.h"
#include "client_handler.h"
#include "../Common/config.h"
#include "mmsystem.h"
extern ACE_INET_Addr *LOGIN_SERVER_ADDR; //导入逻辑服务器ADDR
extern SERVER connect_server; //导入连接服务器
int nTestNum = 0;
//构造函数
Client_Handler::Client_Handler (void)
: client_acceptor_(0),
creator_ (ACE_Thread::self ())
{
thread_=0;
index_=0;
}
//析够函数
Client_Handler::~Client_Handler (void)
{
this->peer().close();
}
//获得并行方式,同时检测acceptor是否已存在,并非优雅的方法。
int Client_Handler::concurrency(void)
{
return this->client_acceptor()->concurrency();
}
//返回当前使用的线程池
Thread_Pool*Client_Handler::thread_pool (void)
{
return this->client_acceptor ()->thread_pool ();
}
//打开
int Client_Handler::open (void *acceptor)
{
client_acceptor ((Client_Acceptor *) acceptor);
if (concurrency () == Client_Acceptor::thread_per_connection_)
{
return this->activate (THR_DETACHED);
}
//连接
this->reactor (client_acceptor()->reactor ());
ACE_INET_Addr addr;
//获得客户端的ADDR
if (this->peer ().get_remote_addr (addr) == -1)
{
return -1;
}
//打印IP和端口号
ACE_DEBUG((LM_DEBUG,"用户的IP为:%s;用户的端口号为:%d\n",addr.get_host_addr(),addr.get_port_number()));
//注册句炳
if (this->reactor ()->register_handler (this,REGISTER_MASK) == -1)
{
ACE_ERROR_RETURN ((LM_ERROR,"(%P|%t) can't register with reactor\n"),-1);
}
return 0;
}
//WINDOWS下应该使用DONT_CALL标志防止事件再次分发过来
void Client_Handler::destroy (void)
{
this->reactor ()->remove_handler (this, REMOVE_MASK);
delete this;
//关闭并释放资源
}
int Client_Handler::close (u_long flags)
{
ACE_UNUSED_ARG(flags);
//从reactor清除自身并释放内存
this->destroy ();
//不用显示调用父类的函数,已经自动调用了。
return 0;
}
//当handle_input返回-1时,会调用handle_close来防止内存泄漏。
int Client_Handler::handle_close (ACE_HANDLE handle,ACE_Reactor_Mask mask)
{
ACE_UNUSED_ARG (handle);
ACE_UNUSED_ARG (mask);
delete this;
return 0;
}
int Client_Handler::handle_input (ACE_HANDLE handle)
{
ACE_UNUSED_ARG (handle);
//并发方式如果是线程池,并且创建此handler的线程,reactor肯定是调用了此handler
if (concurrency () == Client_Acceptor::thread_pool_)
{
if (ACE_OS::thr_equal (ACE_Thread::self(),creator_))
{
this->reactor ()->remove_handler (this, REMOVE_MASK);
return this->thread_pool ()->enqueue (this);
}
}
//如果不是单连接单线程的方式,都会执行这段代码
char buf;
ACE_OS::memset( buf, 0, sizeof(buf) );
int rval=0;
ACE_SEH_TRY
{
//保存对数据的处理结果,单不立刻返回
rval = this->process ( buf, MAX_BUFFER_SIZE );
}
ACE_SEH_EXCEPT (EXCEPTION_EXECUTE_HANDLER)
{
ACE_DEBUG((LM_DEBUG,"服务器处理客户端逻辑出现错误--1\n"));
}
if (concurrency () == Client_Acceptor::thread_pool_)
{
if (rval != -1)
{
this->reactor ()->register_handler (this, REGISTER_MASK);
}
}
return rval;
}
//如果svc方法返回,close方法要注意释放handler。
int Client_Handler::svc (void)
{
char buf;
ACE_OS::memset( buf, 0, sizeof(buf) );
ACE_SEH_TRY
{
while (true)
{
if (this->process (buf, sizeof (buf)) == -1)
{
return -1;
}
}
}
ACE_SEH_EXCEPT (EXCEPTION_EXECUTE_HANDLER)
{
ACE_DEBUG((LM_DEBUG,"服务器处理客户端逻辑出现错误--2\n"));
}
return 0;
}
//网络数据包的实际处理,线程间并没有共享任何数据。
int Client_Handler::process (char *rdbuf,int rdbuf_len)
{
//此次服务器接收到的字节数
ssize_t bytes_read;
char buf[ MAX_BUFFER_SIZE ];
//ACE_OS::memset( buf, 0, sizeof(buf) );
//程序在这处理接收数据和发送数据
switch ( ( bytes_read = this->peer().recv(buf, MAX_BUFFER_SIZE) ) )
{
case -1:
{
ACE_ERROR_RETURN ( ( LM_ERROR, "(%P|%t) 数据接收错误!--1\n" ), -1 );
}break;
case 0:
{
ACE_ERROR_RETURN ( ( LM_ERROR, "(%P|%t) 客户端断开连接--2\n" ), -1 );
}break;
default:
{
ACE_DEBUG ((LM_DEBUG,"(%P|%t):client: %s\n", buf ));
ACE_DEBUG ((LM_DEBUG,"(%P|%t):client: %d\n", nTestNum++ ));
MsgHandle( buf );
// this->peer().send( buf, sizeof(buf) );
}break;
}
return 100;
}
//转换线程编号
int Client_Handler::get_ptcl_thread(ACE_thread_t thread)
{
//定义返回值和索引
int ret=0,found_index=0;
//
if(!map_ptcl_thread_.find(thread,found_index))
{
ret = found_index;
}
else
{
map_ptcl_thread_.rebind(thread,index_);
ret = index_;
index_++;
}
return ret;
}
/***********************************************************************/
// 处理登陆消息
// 参数:客户端发过来信息.
/***********************************************************************/
int Client_Handler::MsgHandle( char* pBuf )
{
this->peer().send( pBuf, sizeof(pBuf) );
return 0;
}
晕...open "o"变成表情了...
-----------------------------------------------------------------------------------------------client_acceptor.h
/*
连接服务器
*/
#ifndef __CLIENT_ACCEPTOR_H__
#define __CLIENT_ACCEPTOR_H__
//包含ACE头文件
#include "ace/Acceptor.h"
#include "ace/SOCK_Acceptor.h"
//包含线程池头文件
#include "thread_pool.h"
//包含逻辑头文件
#include "client_handler.h"
//关联acceptor和handler,非常简洁的方式。
typedef ACE_Acceptor Client_Acceptor_Base;
//维护私有线程池的好处是可以根据acceptor目的的不同,对线程池进行配置。
//如果一定要使用共享线程池,则可以通过外部线程池来实现。
class Client_Acceptor : public Client_Acceptor_Base
{
public:
typedef Client_Acceptor_Base inherited;
//枚举可供使用的并发策略
enum concurrency_t
{
single_threaded_,
thread_per_connection_,
thread_pool_
};
//缺省设置为线程池方式
Client_Acceptor (int concurrency = thread_pool_);
//也可以使用外部线程池
Client_Acceptor (Thread_Pool &thread_pool);
//析构函数要释放线程池
~Client_Acceptor (void);
//使用已经存在的reactor,也可以指定线程池大小
int open (const ACE_INET_Addr &addr,ACE_Reactor *reactor,int pool_size = Thread_Pool::default_pool_size_);
//关闭时要释放线程池
int close (void);
//返回当前并发策略
int concurrency (void)
{
return this->concurrency_; //程序会断在这里. concurrency_成空值了.
}
//获得线程池,这样handle_input方法才能将自身放置在线程池中。
//也可以使用全局的线程池,用ACE_Singleton来实现。
inline Thread_Pool *thread_pool (void) { return &this->the_thread_pool_; }
//查询是私有线程池还是外部线程池
inline int thread_pool_is_private (void) { return &the_thread_pool_ == &private_thread_pool_; }
protected:
int concurrency_;
Thread_Pool private_thread_pool_;
Thread_Pool &the_thread_pool_;
};
#endif /* CLIENT_ACCEPTOR_H */
---------------------------------------------------------------------
client_acceptor.cpp
#include "client_acceptor.h"
//选择并行策略,缺省使用私有线程池。
Client_Acceptor::Client_Acceptor (int concurrency)
: concurrency_ (concurrency),
the_thread_pool_ (private_thread_pool_)
{
}
//当然,也可以显式指定外部线程池。
Client_Acceptor::Client_Acceptor (Thread_Pool &thread_pool)
: concurrency_ (thread_pool_),
the_thread_pool_ (thread_pool)
{
}
//析构函数中,需要关闭私有线程池。
Client_Acceptor::~Client_Acceptor (void)
{
if (this->concurrency() == thread_pool_ && thread_pool_is_private ())
thread_pool ()->close ();
}
//私有线程池需要初始化,父类的open方法被调用。
int Client_Acceptor::open (const ACE_INET_Addr &addr,ACE_Reactor *reactor,int pool_size)
{
if (this->concurrency() == thread_pool_ && thread_pool_is_private ())
thread_pool ()->start (pool_size);
return inherited::open (addr, reactor);
}
//这里同样需要令私有线程池关闭,父类的close方法被调用。
int Client_Acceptor::close (void)
{
if (this->concurrency() == thread_pool_ && thread_pool_is_private ())
thread_pool ()->stop ();
return inherited::close ();
}
[ 本帖最后由 minglong717 于 2008-6-19 15:20 编辑 ] thread_pool.h
/*
线程池类
*/
#ifndef __THREAD_POOL_H__
#define __THREAD_POOL_H__
#include "ace/Task.h"
#include "ace/Mutex.h"
#include "ace/Atomic_Op.h"
//声明ACE的事件处理类
class ACE_Event_Handler;
//线程池类
class Thread_Pool : public ACE_Task
{
public:
typedef ACE_Task inherited;
//提供缺省的线程池大小
enum size_t { default_pool_size_ = 5 };
Thread_Pool (void);
//开始线程,线程会执行svc方法
int start (int pool_size = default_pool_size_);
//关闭线程池
virtual int stop (void);
//将handler放入缓冲队列,并在后续调用其handle_input方法
int enqueue (ACE_Event_Handler *handler);
//ACE_Atomic_Op保证多线程环境下的安全操作
typedef ACE_Atomic_Op counter_t;
protected:
//从队列中提取handler并调用其handle_input方法
int svc (void);
//使用原子操作确保多个svc调用对活动线程数的保护,这对close方法很重要
counter_t active_threads_;
};
#endif /* THREAD_POOL_H */
--------------------------------------------------------------------------------
thread_pool.cpp
#include "thread_pool.h"
//用线程从队列中取出handler,再调用handler中handle_input方法。
#include "ace/Event_Handler.h"
//初始时活动线程数为0
Thread_Pool::Thread_Pool (void)
: active_threads_ (0)
{
}
//封装ACE_Task的active方法为start方法
int Thread_Pool::start (int pool_size)
{
return this->activate (THR_NEW_LWP|THR_DETACHED, pool_size);
}
//向队列中加入隐蔽消息
int Thread_Pool::stop (void)
{
//取得当前活动线程数
int counter = active_threads_.value ();
/* For each one of the active threads, enqueue a "null" event
handler.Below, we'll teach our svc() method that "null" means
"shutdown".*/
//对每个活动线程,加入NULL事件指针。
while (counter--)
this->enqueue (0);
//每个svc方法退出时都会减少一个活动线程,等活动线程数到零时,线程需要等待。
while (active_threads_.value ())
ACE_OS::sleep (ACE_Time_Value (0, 250000));
return(0);
}
//当handler可以工作时,首先要入队列。在这里使用了ACE_Message_Block。
int Thread_Pool::enqueue (ACE_Event_Handler *handler)
{
//ACE_Message_Block是存在于ACE_Message_Queue中的一段数据,ACE_Task
//内建了ACE_Message_Queue,并且设置了线程同步标志。
//ACE_Message_Block使用char*类型的数据。这里传递了ACE_Event_Handler
//句柄。指针转换是件危险的事情,所以首先将handler指针转换成void指针。
//然后将void指针转换成char指针供ACE_Message_Block使用。
void *v_data = (void *) handler;
char *c_data = (char *) v_data;
ACE_Message_Block *mb;
//创建新的ACE_Message_Block。为了高效使用内存,应该是用缓存以便重用的,这里简化了实现。
ACE_NEW_RETURN (mb,ACE_Message_Block (c_data),-1);
//加入队列
if (this->putq (mb) == -1)
{
//根据引用数来确定是否释放
mb->release ();
return -1;
}
return 0;
}
//使用Guard锁,跟踪线程池活动线程数。
class Counter_Guard
{
public:
Counter_Guard (Thread_Pool::counter_t &counter)
: counter_ (counter)
{
++counter_;
}
~Counter_Guard (void)
{
--counter_;
}
protected:
Thread_Pool::counter_t &counter_;
};
//使用Guard锁,保证消息块被彻底释放,防止内存泄漏。
class Message_Block_Guard
{
public:
Message_Block_Guard (ACE_Message_Block *&mb)
: mb_ (mb)
{
}
~Message_Block_Guard (void)
{
mb_->release ();
}
protected:
ACE_Message_Block *&mb_;
};
//线程池中每个线程都会执行的方法。这里用ACE_Message_Queue来缓存请求。
int Thread_Pool::svc (void)
{
//getq需要一个指针。
ACE_Message_Block *mb;
//创建Guard对象确保多线程情况下的正确计数。
Counter_Guard counter_guard (active_threads_);
//从队列中取得消息,一旦异常,立刻退出。
while (this->getq (mb) != -1)
{
//getq成功执行后会增加对"mb"的引用。使用Guard来确保释放此引用。
Message_Block_Guard message_block_guard (mb);
//取得数据指针。
char *c_data = mb->base ();
//NULL为从队列中退出,如果指针非NULL,则继续处理。
if (c_data)
{
void *v_data = (void *) c_data;
ACE_Event_Handler *handler = (ACE_Event_Handler *) v_data;
if (handler->handle_input (ACE_INVALID_HANDLE) == -1)
{
handler->handle_close (handler->get_handle (), 0);
//如果这里直接退出,会导致线程池得不到释放
}
}
else
//如果收到NULL标志,退出
return 0;
//message_block_guard在这里失效,释放message_block实例
}
return 0;
} 好多哦。我稍后看看。。。 虽然不能调试程序,但是我查看了一下,感觉有些问题:
1、 return this->concurrency_; //程序会断在这里. concurrency_成空值了.
这个concurrency_没有线程保护,会在多线程条件下造成错误。
2、线程池使用方法怪异,从没见过这样用的。另外你怎么保证Client_Handler的svc方法,在对象删除的时候,会自动退出?
建议你修改一下,不要这样使用svc处理器,不要一个链接一个线程处理,也不要这样用线程池。
页:
[1]
2