找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 9663|回复: 10

初学者提问...

[复制链接]
发表于 2008-6-19 10:52:46 | 显示全部楼层 |阅读模式
1) 服务器 用 recv_n函数 程序就无法向下执行了??? recv函数就可以正常执行?
2) 客户端连接后,发送消息,然后马上关闭...服务器会出错???--服务器也是ACE架构,线程池方式实现的...


有时间的朋友麻烦进来解答一下...谢谢了...
发表于 2008-6-19 11:22:20 | 显示全部楼层
1、recv_n是要求一直收到指定的字节数后才能返回,程序收不到,当然要等待了。
2、服务器代码有错误,不健壮。
发表于 2008-6-19 11:22:58 | 显示全部楼层
用ACE框架做程序,不代表不会出错。相反,它要求你对基础的东西理解深刻,否则一定错误很多。
 楼主| 发表于 2008-6-19 11:33:31 | 显示全部楼层
int Client_Handler::process (char *rdbuf,int rdbuf_len)
{
        //此次服务器接收到的字节数
        ssize_t                bytes_read;
        char*                buf = new char[ 1024 ];   

        //程序在这处理接收数据和发送数据
        switch ( (bytes_read = this->peer().recv(buf, 1024 )))
           {
        /***********************************************************************/
        //如果数据读取错误
        /***********************************************************************/
    case -1:
                ACE_ERROR_RETURN ((LM_ERROR,"(%P|%t) %p bad read\n","client"),-1);
                break;
        /***********************************************************************/
        //如果服务器关闭
        /***********************************************************************/
    case 0:
                {//注1.
                        ACE_ERROR_RETURN ((LM_ERROR,"(%P|%t) closing daemon (fd = %d)\n",this->peer().get_handle()),-1);
                }
                break;
        /***********************************************************************/
        //数据正常接收
        /***********************************************************************/
    default:
                {//注2.
                        ACE_DEBUG ((LM_DEBUG,"(%P|%t):client: %s\n",buf));
                        ACE_DEBUG ((LM_DEBUG,"(%P|%t):client: %d\n", nTestNum++ ));//记录正确收到的消息数量.

                        MsgHandle( buf ); //处理消息函数.

                }break;
        }
        return 100;
}

其他的地方都没有动...只是这里添加了对收到数据的处理.
但是在执行的时候(客户端发送 先正常的数据包,紧接着马上断开连接)...会先进入到 //注2 分支...但是在执行第一行后...马上会再跳到 //注1 里面去执行...

这样执行的顺序 是正常的吗?
发表于 2008-6-19 14:15:18 | 显示全部楼层
代码不全面,看不出问题。
 楼主| 发表于 2008-6-19 15:16:31 | 显示全部楼层

client_handler类...

client_handler.h

#ifndef __CLIENT_HANDLER_H__
#define __CLIENT_HANDLER_H__

#include "ace/Svc_Handler.h"
#include "ace/SOCK_Dgram.h"
#include "ace/SOCK_Stream.h"
#include "ace/SOCK_Connector.h"   
#include "ace/mutex.h"
#include "ace/Hash_Map_Manager.h"

class Client_Acceptor;
class Thread_Pool;

typedef ACE_thread_t  THREAD;

typedef ACE_Hash_Map_Manager_Ex<THREAD,int,ACE_Hash<THREAD>,ACE_Equal_To<THREAD>,ACE_Null_Mutex>   HASH_MAP_PTCL_THREAD;

class Client_Handler : public ACE_Svc_Handler <ACE_SOCK_STREAM, ACE_MT_SYNCH>
{
public:
        typedef ACE_Svc_Handler <ACE_SOCK_STREAM, ACE_MT_SYNCH> inherited;
        Client_Handler (void);
        virtual ~Client_Handler (void);
        void destroy (void);
        int open (void *acceptor);
        int close (u_long flags = 0);
        int handle_close (ACE_HANDLE handle,ACE_Reactor_Mask mask);
        int handle_input (ACE_HANDLE handle);

        inline int GetThread() { return thread_; }
        inline void SetThread(int thread){thread_ = thread;}
        int get_ptcl_thread(ACE_thread_t thread);

        //根据消息类型分派消息
        int                        MsgHandle( char* pBuf );

protected:
        //每连接每线程方式下会执行svc
        int svc (void);
        //分离出逻辑处理函数,方便后续的多线程并发加锁
        int process (char *rdbuf, int rdbuf_len);

        //返回acceptor
        Client_Acceptor *client_acceptor (void)
        {
                return this->client_acceptor_;
        }
        void client_acceptor (Client_Acceptor *_client_acceptor)
        {
                this->client_acceptor_ = _client_acceptor;
        }
        int concurrency(void);
        Thread_Pool                                        * thread_pool (void);
        Client_Acceptor                                *client_acceptor_;
        ACE_thread_t                                creator_;

        ACE_Mutex                                        mutex;

        HASH_MAP_PTCL_THREAD                map_ptcl_thread_;//线程安全

        int                                                        thread_;         //线程序号

        int                                                        index_;                         //线程编号
};
#endif

----------------------------------------------------------------------------------------------------------------
client_handler.cpp:

#include "client_acceptor.h"
#include "client_handler.h"
#include "../Common/config.h"
#include "mmsystem.h"
extern ACE_INET_Addr                        *LOGIN_SERVER_ADDR;                //导入逻辑服务器ADDR
extern        SERVER                                        connect_server;                        //导入连接服务器

int                                                nTestNum = 0;

//构造函数
Client_Handler::Client_Handler (void)
  : client_acceptor_(0),
    creator_ (ACE_Thread::self ())
{
        thread_=0;
        index_=0;
}
//析够函数
Client_Handler::~Client_Handler (void)
{
        this->peer().close();
}

//获得并行方式,同时检测acceptor是否已存在,并非优雅的方法。
int Client_Handler::concurrency(void)
{
        return this->client_acceptor()->concurrency();
}
//返回当前使用的线程池
Thread_Pool*  Client_Handler::thread_pool (void)
{
        return this->client_acceptor ()->thread_pool ();
}
//打开
int Client_Handler::open (void *acceptor)
{
        client_acceptor ((Client_Acceptor *) acceptor);
        if (concurrency () == Client_Acceptor::thread_per_connection_)
        {
                return this->activate (THR_DETACHED);
        }
        //连接
        this->reactor (client_acceptor()->reactor ());
        ACE_INET_Addr addr;
        //获得客户端的ADDR
        if (this->peer ().get_remote_addr (addr) == -1)
        {
                return -1;
        }
        //打印IP和端口号
        ACE_DEBUG((LM_DEBUG,"用户的IP为:%s;用户的端口号为:%d\n",addr.get_host_addr(),addr.get_port_number()));

        //注册句炳
        if (this->reactor ()->register_handler (this,REGISTER_MASK) == -1)
        {
            ACE_ERROR_RETURN ((LM_ERROR,"(%P|%t) can't register with reactor\n"),-1);
        }

        return 0;
}
//WINDOWS下应该使用DONT_CALL标志防止事件再次分发过来
void Client_Handler::destroy (void)
{
        this->reactor ()->remove_handler (this, REMOVE_MASK);
        delete this;
        //关闭并释放资源
}
int Client_Handler::close (u_long flags)
{
        ACE_UNUSED_ARG(flags);
        //从reactor清除自身并释放内存
        this->destroy ();
        //不用显示调用父类的函数,已经自动调用了。
        return 0;
}
//当handle_input返回-1时,会调用handle_close来防止内存泄漏。
int Client_Handler::handle_close (ACE_HANDLE handle,ACE_Reactor_Mask mask)
{
        ACE_UNUSED_ARG (handle);
        ACE_UNUSED_ARG (mask);
        delete this;
        return 0;
}

int Client_Handler::handle_input (ACE_HANDLE handle)
{
        ACE_UNUSED_ARG (handle);
        //并发方式如果是线程池,并且创建此handler的线程,reactor肯定是调用了此handler
        if (concurrency () == Client_Acceptor::thread_pool_)
        {
                if (ACE_OS::thr_equal (ACE_Thread::self(),creator_))
                {
                        this->reactor ()->remove_handler (this, REMOVE_MASK);
                        return this->thread_pool ()->enqueue (this);         
                }
        }
        //如果不是单连接单线程的方式,都会执行这段代码
        char         buf[MAX_BUFFER_SIZE];
        ACE_OS::memset( buf, 0, sizeof(buf) );

        int rval=0;

        ACE_SEH_TRY
        {
                //保存对数据的处理结果,单不立刻返回
                rval = this->process ( buf, MAX_BUFFER_SIZE );
        }
        ACE_SEH_EXCEPT (EXCEPTION_EXECUTE_HANDLER)
        {
                ACE_DEBUG((LM_DEBUG,"服务器处理客户端逻辑出现错误--1\n"));
        }

        if (concurrency () == Client_Acceptor::thread_pool_)
    {
                if (rval != -1)
                {
                this->reactor ()->register_handler (this, REGISTER_MASK);
                }
    }
        return rval;
}
//如果svc方法返回,close方法要注意释放handler。
int Client_Handler::svc (void)
{
        char buf[MAX_BUFFER_SIZE];
        ACE_OS::memset( buf, 0, sizeof(buf) );
        ACE_SEH_TRY
        {
                while (true)
                {
                        if (this->process (buf, sizeof (buf)) == -1)
                        {
                                return -1;
                        }
                }
        }
        ACE_SEH_EXCEPT (EXCEPTION_EXECUTE_HANDLER)
        {
                ACE_DEBUG((LM_DEBUG,"服务器处理客户端逻辑出现错误--2\n"));
        }

        return 0;
}
//网络数据包的实际处理,线程间并没有共享任何数据。
int Client_Handler::process (char *rdbuf,int rdbuf_len)
{
        //此次服务器接收到的字节数
        ssize_t                bytes_read;
        char                buf[ MAX_BUFFER_SIZE ];
        //ACE_OS::memset( buf, 0, sizeof(buf) );

        //程序在这处理接收数据和发送数据
        switch ( ( bytes_read = this->peer().recv(buf, MAX_BUFFER_SIZE) ) )
           {
    case -1:
                {
                        ACE_ERROR_RETURN ( ( LM_ERROR, "(%P|%t) 数据接收错误!--1\n" ), -1 );
                }break;
    case 0:
                {
                        ACE_ERROR_RETURN ( ( LM_ERROR, "(%P|%t) 客户端断开连接--2\n" ), -1 );
                }break;
    default:
                {
                        ACE_DEBUG ((LM_DEBUG,"(%P|%t):client: %s\n", buf ));
                        ACE_DEBUG ((LM_DEBUG,"(%P|%t):client: %d\n", nTestNum++ ));
                        
                        MsgHandle( buf );
//                        this->peer().send( buf, sizeof(buf) );
                }break;
        }
        return 100;
}
//转换线程编号
int Client_Handler::get_ptcl_thread(ACE_thread_t thread)
{
        //定义返回值和索引
        int ret=0,found_index=0;
        //
        if(!map_ptcl_thread_.find(thread,found_index))
        {
                ret = found_index;
        }
        else
        {
                map_ptcl_thread_.rebind(thread,index_);
                ret = index_;
                index_++;
        }
        return ret;
}
/***********************************************************************/
//        处理登陆消息
//        参数:客户端发过来信息.
/***********************************************************************/
int Client_Handler::MsgHandle( char* pBuf )
{
        this->peer().send( pBuf, sizeof(pBuf) );
        return 0;
}
 楼主| 发表于 2008-6-19 15:18:20 | 显示全部楼层

晕...open "o"变成表情了...

-----------------------------------------------------------------------------------------------
client_acceptor.h

/*
连接服务器
*/
#ifndef __CLIENT_ACCEPTOR_H__
#define __CLIENT_ACCEPTOR_H__
//包含ACE头文件
#include "ace/Acceptor.h"
#include "ace/SOCK_Acceptor.h"
//包含线程池头文件
#include "thread_pool.h"
//包含逻辑头文件
#include "client_handler.h"

//关联acceptor和handler,非常简洁的方式。

typedef ACE_Acceptor Client_Acceptor_Base;

//维护私有线程池的好处是可以根据acceptor目的的不同,对线程池进行配置。
//如果一定要使用共享线程池,则可以通过外部线程池来实现。

class Client_Acceptor : public Client_Acceptor_Base
{
public:
  typedef Client_Acceptor_Base inherited;

  //枚举可供使用的并发策略
  
  enum concurrency_t
  {
    single_threaded_,
    thread_per_connection_,
    thread_pool_
  };

  //缺省设置为线程池方式
  
  Client_Acceptor (int concurrency = thread_pool_);

  //也可以使用外部线程池

  Client_Acceptor (Thread_Pool &thread_pool);

  //析构函数要释放线程池

  ~Client_Acceptor (void);

  //使用已经存在的reactor,也可以指定线程池大小

  int open (const ACE_INET_Addr &addr,ACE_Reactor *reactor,int pool_size = Thread_Pool::default_pool_size_);

  //关闭时要释放线程池

  int close (void);

  //返回当前并发策略

        int concurrency (void)
        {
                return this->concurrency_; //程序会断在这里. concurrency_成空值了.
        }

  //获得线程池,这样handle_input方法才能将自身放置在线程池中。
  //也可以使用全局的线程池,用ACE_Singleton来实现。

  inline Thread_Pool *thread_pool (void) { return &this->the_thread_pool_; }

  //查询是私有线程池还是外部线程池

  inline int thread_pool_is_private (void) { return &the_thread_pool_ == &private_thread_pool_; }

protected:
  int concurrency_;
  Thread_Pool private_thread_pool_;
  Thread_Pool &the_thread_pool_;
};

#endif /* CLIENT_ACCEPTOR_H */

---------------------------------------------------------------------
client_acceptor.cpp

#include "client_acceptor.h"
//选择并行策略,缺省使用私有线程池。

Client_Acceptor::Client_Acceptor (int concurrency)
  : concurrency_ (concurrency),
    the_thread_pool_ (private_thread_pool_)
{
}

//当然,也可以显式指定外部线程池。

Client_Acceptor::Client_Acceptor (Thread_Pool &thread_pool)
  : concurrency_ (thread_pool_),
    the_thread_pool_ (thread_pool)
{
}

//析构函数中,需要关闭私有线程池。

Client_Acceptor::~Client_Acceptor (void)
{
        if (this->concurrency() == thread_pool_ && thread_pool_is_private ())
                thread_pool ()->close ();
}

//私有线程池需要初始化,父类的open方法被调用。

int Client_Acceptor::open (const ACE_INET_Addr &addr,ACE_Reactor *reactor,int pool_size)
{
        if (this->concurrency() == thread_pool_ && thread_pool_is_private ())
                thread_pool ()->start (pool_size);

        return inherited::open (addr, reactor);
}

//这里同样需要令私有线程池关闭,父类的close方法被调用。

int Client_Acceptor::close (void)
{
        if (this->concurrency() == thread_pool_ && thread_pool_is_private ())   
          thread_pool ()->stop ();

        return inherited::close ();
}

[ 本帖最后由 minglong717 于 2008-6-19 15:20 编辑 ]
 楼主| 发表于 2008-6-19 15:19:09 | 显示全部楼层
thread_pool.h

/*
线程池类
*/
#ifndef __THREAD_POOL_H__
#define __THREAD_POOL_H__

#include "ace/Task.h"
#include "ace/Mutex.h"
#include "ace/Atomic_Op.h"
//声明ACE的事件处理类
class ACE_Event_Handler;
//线程池类
class Thread_Pool : public ACE_Task
{
public:

  typedef ACE_Task inherited;

  //提供缺省的线程池大小

  enum size_t { default_pool_size_ = 5 };

  Thread_Pool (void);


  //开始线程,线程会执行svc方法

  int start (int pool_size = default_pool_size_);

  //关闭线程池

  virtual int stop (void);

  //将handler放入缓冲队列,并在后续调用其handle_input方法

  int enqueue (ACE_Event_Handler *handler);

  //ACE_Atomic_Op保证多线程环境下的安全操作
  typedef ACE_Atomic_Op counter_t;

protected:

        //从队列中提取handler并调用其handle_input方法
        int svc (void);

        //使用原子操作确保多个svc调用对活动线程数的保护,这对close方法很重要
        counter_t active_threads_;

};

#endif /* THREAD_POOL_H */

--------------------------------------------------------------------------------
thread_pool.cpp


#include "thread_pool.h"

//用线程从队列中取出handler,再调用handler中handle_input方法。

#include "ace/Event_Handler.h"

//初始时活动线程数为0

Thread_Pool::Thread_Pool (void)
  : active_threads_ (0)
{
}

//封装ACE_Task的active方法为start方法

int Thread_Pool::start (int pool_size)
{
        return this->activate (THR_NEW_LWP|THR_DETACHED, pool_size);
}

//向队列中加入隐蔽消息

int Thread_Pool::stop (void)
{
        //取得当前活动线程数
        int counter = active_threads_.value ();

  /* For each one of the active threads, enqueue a "null" event
    handler.  Below, we'll teach our svc() method that "null" means
    "shutdown".  */
        //对每个活动线程,加入NULL事件指针。
  while (counter--)
    this->enqueue (0);

  //每个svc方法退出时都会减少一个活动线程,等活动线程数到零时,线程需要等待。
  while (active_threads_.value ())
    ACE_OS::sleep (ACE_Time_Value (0, 250000));

  return(0);
}

//当handler可以工作时,首先要入队列。在这里使用了ACE_Message_Block。

int Thread_Pool::enqueue (ACE_Event_Handler *handler)
{
        //ACE_Message_Block是存在于ACE_Message_Queue中的一段数据,ACE_Task
        //内建了ACE_Message_Queue,并且设置了线程同步标志。

        //ACE_Message_Block使用char*类型的数据。这里传递了ACE_Event_Handler
        //句柄。指针转换是件危险的事情,所以首先将handler指针转换成void指针。
        //然后将void指针转换成char指针供ACE_Message_Block使用。

        void *v_data = (void *) handler;
        char *c_data = (char *) v_data;

        ACE_Message_Block *mb;

        //创建新的ACE_Message_Block。为了高效使用内存,应该是用缓存以便重用的,这里简化了实现。

        ACE_NEW_RETURN (mb,ACE_Message_Block (c_data),-1);

        //加入队列

        if (this->putq (mb) == -1)
    {
                //根据引用数来确定是否释放

                mb->release ();
                return -1;
    }
        return 0;
}

//使用Guard锁,跟踪线程池活动线程数。

class Counter_Guard
{
public:
  Counter_Guard (Thread_Pool::counter_t &counter)
    : counter_ (counter)
  {
    ++counter_;
  }

  ~Counter_Guard (void)
  {
    --counter_;
  }

protected:
  Thread_Pool::counter_t &counter_;
};

//使用Guard锁,保证消息块被彻底释放,防止内存泄漏。

class Message_Block_Guard
{
public:
  Message_Block_Guard (ACE_Message_Block *&mb)
    : mb_ (mb)
  {
  }

  ~Message_Block_Guard (void)
  {
    mb_->release ();
  }

protected:
  ACE_Message_Block *&mb_;
};

//线程池中每个线程都会执行的方法。这里用ACE_Message_Queue来缓存请求。

int Thread_Pool::svc (void)
{
        //getq需要一个指针。
        ACE_Message_Block *mb;

        //创建Guard对象确保多线程情况下的正确计数。

        Counter_Guard counter_guard (active_threads_);

        //从队列中取得消息,一旦异常,立刻退出。

        while (this->getq (mb) != -1)
    {
                //getq成功执行后会增加对"mb"的引用。使用Guard来确保释放此引用。

                Message_Block_Guard message_block_guard (mb);

                //取得数据指针。

                char *c_data = mb->base ();

                //NULL为从队列中退出,如果指针非NULL,则继续处理。

                if (c_data)
                {

                          void *v_data = (void *) c_data;

                          ACE_Event_Handler *handler = (ACE_Event_Handler *) v_data;

                          if (handler->handle_input (ACE_INVALID_HANDLE) == -1)
                            {
                                      handler->handle_close (handler->get_handle (), 0);
                                                  //如果这里直接退出,会导致线程池得不到释放
                            }
                }
                      else
                           //如果收到NULL标志,退出
                          return 0;

                //message_block_guard在这里失效,释放message_block实例
        }

        return 0;
}
发表于 2008-6-20 12:17:01 | 显示全部楼层
好多哦。我稍后看看。。。
发表于 2008-6-20 16:13:56 | 显示全部楼层
虽然不能调试程序,但是我查看了一下,感觉有些问题:
1、 return this->concurrency_; //程序会断在这里. concurrency_成空值了.
这个concurrency_没有线程保护,会在多线程条件下造成错误。
2、线程池使用方法怪异,从没见过这样用的。另外你怎么保证Client_Handler的svc方法,在对象删除的时候,会自动退出?
建议你修改一下,不要这样使用svc处理器,不要一个链接一个线程处理,也不要这样用线程池。
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-5-1 14:44 , Processed in 0.015380 second(s), 6 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表