找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 5790|回复: 9

实现一个可限制最大连接数的Proactor服务器

[复制链接]
发表于 2008-9-9 22:43:30 | 显示全部楼层 |阅读模式
作者:Qinglan
在服务器程序实现中,一般要求能够限制服务器的最大连接数,这主要是从服务器的性能方面考虑,当过多的连接到来时,服务器虽然能够处理,但效率非常低下,也就会出现卡机的现象。

在用Proactor框架实现的服务器中可以很容易地做到这一点。ACE_Asynch_Acceptor类有一个虚方法:make_handler(),默认情况下是new一个服务处理对象出来,我们可以让他在适当的时候返回一个空值,这样,新的连接便被拒绝了。

另外为了避免每次都完全构造一个全新的服务处理对象,这里还可以使用预分配的方法,一开始便创建出所有的service对象,当有连接到来时,直接取一个未使用的分配给该连接,当连接关闭时,同时也不delete该对象,而是加到空闲队列中,等下一次有新连接到来时再使用。

实现的代码如下:

  1. #include <ace/Os_main.h>
  2. #include <ace/Asynch_Acceptor.h>
  3. #include <ace/Proactor.h>
  4. #include <list>
  5. #define LISTEN_PORT 5222              // 服务器监听端口
  6. #define MAX_CONNS 2                     // 最大连接数
  7. class HA_Proactive_Acceptor;
  8. class HA_Proactive_Service : public ACE_Service_Handler
  9. {
  10. public:
  11.        virtual ~HA_Proactive_Service ();
  12.        void init( HA_Proactive_Acceptor * acceptor );
  13.        virtual void open (ACE_HANDLE h, ACE_Message_Block&);
  14.        virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
  15.        virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
  16. private:
  17.        ACE_Asynch_Read_Stream reader_;
  18.        ACE_Asynch_Write_Stream writer_;
  19.        HA_Proactive_Acceptor * acceptor_;
  20. };
  21. class HA_Proactive_Acceptor : public ACE_Asynch_Acceptor<HA_Proactive_Service>
  22. {
  23. public:
  24.        HA_Proactive_Acceptor();
  25.        void free_handler( HA_Proactive_Service * service );
  26. private:
  27.        virtual HA_Proactive_Service *make_handler (void);
  28.        void init_handlers();
  29. private:
  30.        typedef std::list<HA_Proactive_Service *> T_Handler_List;
  31.        T_Handler_List handler_list_;
  32. };
  33. HA_Proactive_Service::~HA_Proactive_Service ()
  34. {
  35.        if (this->handle () != ACE_INVALID_HANDLE)
  36.               ACE_OS::closesocket (this->handle ());
  37. }
  38. void HA_Proactive_Service::init( HA_Proactive_Acceptor * acceptor )
  39. {
  40.        this->acceptor_ = acceptor;
  41. }
  42. void HA_Proactive_Service::open (ACE_HANDLE h, ACE_Message_Block&)
  43. {
  44.        this->handle (h);
  45.        if (this->reader_.open (*this) != 0 || this->writer_.open (*this) != 0   )
  46.        {
  47.               ACE_OS::closesocket (this->handle ());
  48.               this->acceptor_->free_handler( this );
  49.               return;
  50.        }
  51.        ACE_Message_Block *mb;
  52.        ACE_NEW_NORETURN (mb, ACE_Message_Block (1024));
  53.        if (this->reader_.read (*mb, mb->space ()) != 0)
  54.        {
  55.               mb->release ();
  56.               ACE_OS::closesocket (this->handle ());
  57.               this->acceptor_->free_handler( this );
  58.        }
  59. }
  60. void HA_Proactive_Service::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
  61. {
  62.        ACE_Message_Block &mb = result.message_block ();
  63.        if (!result.success () || result.bytes_transferred () == 0)
  64.        {
  65.               mb.release ();
  66.               ACE_OS::closesocket (this->handle ());
  67.               this->acceptor_->free_handler( this );
  68.        }
  69.        else
  70.        {
  71.               ACE_DEBUG ((LM_DEBUG, ACE_TEXT("Received Data : %c\n"), *mb.rd_ptr()));
  72.               mb.release();
  73.               ACE_Message_Block *new_mb;
  74.               ACE_NEW_NORETURN (new_mb, ACE_Message_Block (1024));
  75.               this->reader_.read (*new_mb, new_mb->space ());
  76.        }
  77. }
  78. void HA_Proactive_Service::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
  79. {
  80.        result.message_block ().release ();
  81. }
  82. HA_Proactive_Acceptor::HA_Proactive_Acceptor() : ACE_Asynch_Acceptor<HA_Proactive_Service>()
  83. {
  84.        init_handlers();
  85. }
  86. void HA_Proactive_Acceptor::free_handler( HA_Proactive_Service * service )
  87. {
  88.        this->handler_list_.push_back( service );
  89. }
  90. HA_Proactive_Service * HA_Proactive_Acceptor::make_handler (void)
  91. {
  92.        if( this->handler_list_.empty() )
  93.               return 0;
  94.        HA_Proactive_Service * service = this->handler_list_.front();
  95.        this->handler_list_.pop_front();
  96.        return service;
  97. }
  98. void HA_Proactive_Acceptor::init_handlers()
  99. {
  100.        for( int i = 0; i < MAX_CONNS; ++i )
  101.        {
  102.               HA_Proactive_Service * service;
  103.               ACE_NEW( service, HA_Proactive_Service );
  104.               service->init( this );
  105.               this->handler_list_.push_back( service );
  106.        }
  107. }
  108. int ACE_TMAIN (int, ACE_TCHAR *[])
  109. {
  110.        ACE_INET_Addr listen_addr( LISTEN_PORT );
  111.        HA_Proactive_Acceptor aio_acceptor;
  112.        if (0 != aio_acceptor.open (listen_addr, 0, 0, 5, 1, 0, 0))
  113.               ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("acceptor open")), 1);
  114.        ACE_Proactor::instance ()->proactor_run_event_loop ();
  115.        return 0;
  116. }
复制代码
发表于 2009-3-4 13:54:24 | 显示全部楼层
连接池?如果直接返回0的话,客户端是应该无法知道连接被拒绝的原因吧。
发表于 2009-3-4 14:12:16 | 显示全部楼层
上面只是一个思路,实际应用,考虑的比这些多得多。
发表于 2009-6-15 15:06:51 | 显示全部楼层
原帖由 modern 于 2009-3-4 13:54 发表
连接池?如果直接返回0的话,客户端是应该无法知道连接被拒绝的原因吧。

可以在满了以后关闭listen的socket,客户连不上。
如果客户端一定要知道连接失败原因(服务器当机还是忙),就在应用层做个协议,连接成功后服务器发个buzy的信息,然后关闭连接。
发表于 2009-6-15 17:03:24 | 显示全部楼层
呵呵,楼上没仔细看题目哦~
我的意思是,如果重载了make_handler,让其直接返回0了的话。
新连接根本就不会被建立的,socket直接就被框架给关闭了。
哪还会有给你传协议的机会呀。

HANDLER *new_handler = 0;
  if (!error)
    {
      // The Template method
      new_handler = this->make_handler ();
      if (new_handler == 0)
        {
          error = 1;
        }
    }

  // If no errors
  if (!error)
    {
      // Update the Proactor.
      new_handler->proactor (this->proactor ());

      // Pass the addresses
      if (this->pass_addresses_)
        new_handler->addresses (remote_address,
                                local_address);

      // Pass the ACT
      if (result.act () != 0)
        new_handler->act (result.act ());

      // Set up the handler's new handle value
      new_handler->handle (result.accept_handle ());

      // Initiate the handler
      new_handler->open (result.accept_handle (),
                         result.message_block ());
    }

  // On failure, no choice but to close the socket
  if (error &&
      result.accept_handle() != ACE_INVALID_HANDLE )
    ACE_OS::closesocket (result.accept_handle ());
发表于 2009-6-16 15:09:42 | 显示全部楼层
汗。。有段时间没看ace的connector了,晚上回去再看看。unix api的connect是不用等server accept就返回成功的。ace的connector需要等acceptor传回信息才成功返回么?

我说的用协议约定拒绝连接理由当然不是返回0,拒绝连接了还怎么传?
“在应用层做个协议,连接成功后服务器发个buzy的信息,然后关闭连接。”
发表于 2009-6-16 15:32:08 | 显示全部楼层
1.我贴的这段是Asyn_Acceptor的代码,我没有提到connector的问题,没看懂楼上的意思,可能是咱俩说的不是一回事。
2.楼上还是没有仔细看题目,你说的通过协议通知对端关闭连接当然是正确的而且得体的做法。
不过我说的make_handler返回0之后连接无从建立,因此没有机会通知客户端关闭连接的原因,这是顺着楼主的帖子的第二段说的,就这个单点问题,确实没有机会通知客户端连接被拒绝的原因。
呵呵,咱们说的不矛盾呀~
发表于 2009-6-16 15:44:33 | 显示全部楼层
可能是我没说明我不仅针对的是引用的你的那句话,还有winston 的那句“上面只是一个思路”。我的意思是给出另一个思路。所以你觉得我偏题。
我的意思是make_handler不返回0,而是正常返回,建立连接,然后写回一个协议约定的字节,client就知道服务器忙了。甚至也可在协议约定下次尝试时间等。

另外,拒绝连接与拒绝服务是不同的概念。按我现在的理解client申请连接,server只要open(listen)就可以连接成功。accept返回0只能是拒绝服务,这时tcp3次握手已经成功,连接已经成立,client可以发数据,处理与否由server决定。这个我晚上看下书确认下。
发表于 2009-6-16 16:16:59 | 显示全部楼层
对头,根据具体情况,你的思路是可以考虑的,我说了,我们针对这个问题理解不矛盾嘛。

accept返回0只能是拒绝服务,这个指的是什么?是指make_handler函数返回0么?
如果是的话,那么三次握手确实已经成功了,不过由于make_handler返回,
很遗憾,框架替我们调用了ACE_OS::closesocket (result.accept_handle ());
如果另有所值的话,需要详细说明一下。
发表于 2009-6-16 16:29:09 | 显示全部楼层
“框架替我们调用了ACE_OS::closesocket (result.accept_handle ());”

呵呵,如果是这样的话,实现我所说的上面的那个协议就方便了。
make_handler如果要返回0,就先发一个buzy消息通知client,然后再ACE_OS::closesocket (result.accept_handle ())

说起来很绕,其实我的意思就是server并没有拒绝连接,而是接收连接之后因为忙,又关闭了连接。称为拒绝服务。可能用词不当,造成了误解。
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-4-29 09:56 , Processed in 0.014142 second(s), 6 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表