peakzhang 发表于 2008-9-9 22:43:30

实现一个可限制最大连接数的Proactor服务器

作者:Qinglan
在服务器程序实现中,一般要求能够限制服务器的最大连接数,这主要是从服务器的性能方面考虑,当过多的连接到来时,服务器虽然能够处理,但效率非常低下,也就会出现卡机的现象。

在用Proactor框架实现的服务器中可以很容易地做到这一点。ACE_Asynch_Acceptor类有一个虚方法:make_handler(),默认情况下是new一个服务处理对象出来,我们可以让他在适当的时候返回一个空值,这样,新的连接便被拒绝了。

另外为了避免每次都完全构造一个全新的服务处理对象,这里还可以使用预分配的方法,一开始便创建出所有的service对象,当有连接到来时,直接取一个未使用的分配给该连接,当连接关闭时,同时也不delete该对象,而是加到空闲队列中,等下一次有新连接到来时再使用。

实现的代码如下:


#include <ace/Os_main.h>
#include <ace/Asynch_Acceptor.h>
#include <ace/Proactor.h>
#include <list>

#define LISTEN_PORT 5222            // 服务器监听端口
#define MAX_CONNS 2                     // 最大连接数

class HA_Proactive_Acceptor;

class HA_Proactive_Service : public ACE_Service_Handler
{
public:
       virtual ~HA_Proactive_Service ();

       void init( HA_Proactive_Acceptor * acceptor );

       virtual void open (ACE_HANDLE h, ACE_Message_Block&);
       virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
       virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);

private:
       ACE_Asynch_Read_Stream reader_;
       ACE_Asynch_Write_Stream writer_;

       HA_Proactive_Acceptor * acceptor_;
};


class HA_Proactive_Acceptor : public ACE_Asynch_Acceptor<HA_Proactive_Service>
{
public:
       HA_Proactive_Acceptor();

       void free_handler( HA_Proactive_Service * service );

private:
       virtual HA_Proactive_Service *make_handler (void);
       void init_handlers();

private:
       typedef std::list<HA_Proactive_Service *> T_Handler_List;
       T_Handler_List handler_list_;
};


HA_Proactive_Service::~HA_Proactive_Service ()
{
       if (this->handle () != ACE_INVALID_HANDLE)
            ACE_OS::closesocket (this->handle ());
}

void HA_Proactive_Service::init( HA_Proactive_Acceptor * acceptor )
{
       this->acceptor_ = acceptor;
}

void HA_Proactive_Service::open (ACE_HANDLE h, ACE_Message_Block&)
{
       this->handle (h);
       if (this->reader_.open (*this) != 0 || this->writer_.open (*this) != 0   )
       {
            ACE_OS::closesocket (this->handle ());
            this->acceptor_->free_handler( this );
            return;
       }

       ACE_Message_Block *mb;
       ACE_NEW_NORETURN (mb, ACE_Message_Block (1024));
       if (this->reader_.read (*mb, mb->space ()) != 0)
       {
            mb->release ();
            ACE_OS::closesocket (this->handle ());
            this->acceptor_->free_handler( this );
       }
}

void HA_Proactive_Service::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
{
       ACE_Message_Block &mb = result.message_block ();
       if (!result.success () || result.bytes_transferred () == 0)
       {
            mb.release ();
            ACE_OS::closesocket (this->handle ());
            this->acceptor_->free_handler( this );
       }
       else
       {
            ACE_DEBUG ((LM_DEBUG, ACE_TEXT("Received Data : %c\n"), *mb.rd_ptr()));
            mb.release();

            ACE_Message_Block *new_mb;
            ACE_NEW_NORETURN (new_mb, ACE_Message_Block (1024));
            this->reader_.read (*new_mb, new_mb->space ());
       }
}

void HA_Proactive_Service::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
{
       result.message_block ().release ();
}


HA_Proactive_Acceptor::HA_Proactive_Acceptor() : ACE_Asynch_Acceptor<HA_Proactive_Service>()
{
       init_handlers();
}

void HA_Proactive_Acceptor::free_handler( HA_Proactive_Service * service )
{
       this->handler_list_.push_back( service );
}

HA_Proactive_Service * HA_Proactive_Acceptor::make_handler (void)
{
       if( this->handler_list_.empty() )
            return 0;

       HA_Proactive_Service * service = this->handler_list_.front();
       this->handler_list_.pop_front();

       return service;
}

void HA_Proactive_Acceptor::init_handlers()
{
       for( int i = 0; i < MAX_CONNS; ++i )
       {
            HA_Proactive_Service * service;
            ACE_NEW( service, HA_Proactive_Service );
            service->init( this );
            this->handler_list_.push_back( service );
       }
}


int ACE_TMAIN (int, ACE_TCHAR *[])
{
       ACE_INET_Addr listen_addr( LISTEN_PORT );
       HA_Proactive_Acceptor aio_acceptor;
       if (0 != aio_acceptor.open (listen_addr, 0, 0, 5, 1, 0, 0))
            ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("acceptor open")), 1);

       ACE_Proactor::instance ()->proactor_run_event_loop ();

       return 0;
}

modern 发表于 2009-3-4 13:54:24

连接池?如果直接返回0的话,客户端是应该无法知道连接被拒绝的原因吧。

winston 发表于 2009-3-4 14:12:16

上面只是一个思路,实际应用,考虑的比这些多得多。

wishel 发表于 2009-6-15 15:06:51

原帖由 modern 于 2009-3-4 13:54 发表 http://www.acejoy.com/bbs/images/common/back.gif
连接池?如果直接返回0的话,客户端是应该无法知道连接被拒绝的原因吧。

可以在满了以后关闭listen的socket,客户连不上。
如果客户端一定要知道连接失败原因(服务器当机还是忙),就在应用层做个协议,连接成功后服务器发个buzy的信息,然后关闭连接。

modern 发表于 2009-6-15 17:03:24

呵呵,楼上没仔细看题目哦~
我的意思是,如果重载了make_handler,让其直接返回0了的话。
新连接根本就不会被建立的,socket直接就被框架给关闭了。
哪还会有给你传协议的机会呀。

HANDLER *new_handler = 0;
if (!error)
    {
      // The Template method
      new_handler = this->make_handler ();
      if (new_handler == 0)
      {
          error = 1;
      }
    }

// If no errors
if (!error)
    {
      // Update the Proactor.
      new_handler->proactor (this->proactor ());

      // Pass the addresses
      if (this->pass_addresses_)
      new_handler->addresses (remote_address,
                              local_address);

      // Pass the ACT
      if (result.act () != 0)
      new_handler->act (result.act ());

      // Set up the handler's new handle value
      new_handler->handle (result.accept_handle ());

      // Initiate the handler
      new_handler->open (result.accept_handle (),
                         result.message_block ());
    }

// On failure, no choice but to close the socket
if (error &&
      result.accept_handle() != ACE_INVALID_HANDLE )
    ACE_OS::closesocket (result.accept_handle ());

wishel 发表于 2009-6-16 15:09:42

汗。。有段时间没看ace的connector了,晚上回去再看看。unix api的connect是不用等server accept就返回成功的。ace的connector需要等acceptor传回信息才成功返回么?

我说的用协议约定拒绝连接理由当然不是返回0,拒绝连接了还怎么传?
“在应用层做个协议,连接成功后服务器发个buzy的信息,然后关闭连接。”

modern 发表于 2009-6-16 15:32:08

1.我贴的这段是Asyn_Acceptor的代码,我没有提到connector的问题,没看懂楼上的意思,可能是咱俩说的不是一回事。
2.楼上还是没有仔细看题目,你说的通过协议通知对端关闭连接当然是正确的而且得体的做法。
不过我说的make_handler返回0之后连接无从建立,因此没有机会通知客户端关闭连接的原因,这是顺着楼主的帖子的第二段说的,就这个单点问题,确实没有机会通知客户端连接被拒绝的原因。
呵呵,咱们说的不矛盾呀~

wishel 发表于 2009-6-16 15:44:33

可能是我没说明我不仅针对的是引用的你的那句话,还有winston 的那句“上面只是一个思路”。我的意思是给出另一个思路。所以你觉得我偏题。
我的意思是make_handler不返回0,而是正常返回,建立连接,然后写回一个协议约定的字节,client就知道服务器忙了。甚至也可在协议约定下次尝试时间等。

另外,拒绝连接与拒绝服务是不同的概念。按我现在的理解client申请连接,server只要open(listen)就可以连接成功。accept返回0只能是拒绝服务,这时tcp3次握手已经成功,连接已经成立,client可以发数据,处理与否由server决定。这个我晚上看下书确认下。

modern 发表于 2009-6-16 16:16:59

对头,根据具体情况,你的思路是可以考虑的,我说了,我们针对这个问题理解不矛盾嘛。

accept返回0只能是拒绝服务,这个指的是什么?是指make_handler函数返回0么?
如果是的话,那么三次握手确实已经成功了,不过由于make_handler返回,
很遗憾,框架替我们调用了ACE_OS::closesocket (result.accept_handle ());
如果另有所值的话,需要详细说明一下。

wishel 发表于 2009-6-16 16:29:09

“框架替我们调用了ACE_OS::closesocket (result.accept_handle ());”

呵呵,如果是这样的话,实现我所说的上面的那个协议就方便了。
make_handler如果要返回0,就先发一个buzy消息通知client,然后再ACE_OS::closesocket (result.accept_handle ())

说起来很绕,其实我的意思就是server并没有拒绝连接,而是接收连接之后因为忙,又关闭了连接。称为拒绝服务。可能用词不当,造成了误解。
页: [1]
查看完整版本: 实现一个可限制最大连接数的Proactor服务器