peakzhang 发表于 2007-12-13 23:46:25

proactor连接数4000+出现拒绝连接的问题


#ifndef _terminal_processor_h
#define _terminal_processor_h

#include "ace/OS_NS_string.h"
#include "ace/OS_main.h"
#include "ace/Service_Config.h"
#include "ace/Proactor.h"
#include "ace/Asynch_IO.h"
#include "ace/Asynch_IO_Impl.h"
#include "ace/Asynch_Acceptor.h"
#include "ace/INET_Addr.h"
#include "ace/SOCK_Connector.h"
#include "ace/SOCK_Acceptor.h"
#include "ace/SOCK_Stream.h"
#include "ace/SOCK_SEQPACK_Association.h"
#include "ace/Message_Block.h"
#include "ace/Get_Opt.h"
#include "ace/Log_Msg.h"
#include "ace/OS_NS_sys_stat.h"
#include "ace/OS_NS_sys_socket.h"
#include "ace/OS_NS_unistd.h"
#include "ace/OS_NS_fcntl.h"
#include "ace/message_queue_t.h"
#include "ace/task_t.h"
#include "../include/log_imp.h"
#include "../include/data_pool_imp.h"
#include "./terminal_message.h"

ACE_Recursive_Thread_Mutex      map_lock;

// 消息类型
#define MB_NORMAL_PACKET    0x201      // 普通
#define MB_CONNECT_PACKET    0x202      // 连接建立
#define MB_CLOSE_PACKET      0x203      // 连接关闭

typedef std::map<ACE_HANDLE, void*>                              client_connection;
typedef ACE_Singleton<client_connection, ACE_SYNCH_MUTEX>      _conn;

class PacketHeader{
public:
    void* handler;
    CommHead data_;
};
class NetworkThread;
class LogicThread;
NetworkThread      *io_thread;
LogicThread            *logic_thread;

class arg_{
public:
    arg_(){
    }
    void*                p_this;
    ACE_Message_Block*    mb;
    size_t                len;

    typedef int (*_fun)(arg_);
    _fun    fun;

    void set_fun(int (*fun)(arg_)){
      this->fun = fun;
    }
    int write_(){
      return fun(*this);
    }
};

typedef std::map<void*, arg_>                              map_conns;
typedef ACE_Singleton<map_conns, ACE_Null_Mutex>            conns;
ACE_Recursive_Thread_Mutex                                    lock_conns;

class LogicThread : public ACE_Task<ACE_MT_SYNCH>
{
public:
    LogicThread()
    {
    }

    ~LogicThread()
    {
    }

    int open(){
      return this->activate(THR_NEW_LWP, (int)para::instance()->thread_count);
    }

    int close(){
      ACE_Message_Block * hangup;
      for(int i=0; i<(int)para::instance()->thread_count; i++){
            ACE_NEW_RETURN (hangup, ACE_Message_Block(0, ACE_Message_Block::MB_HANGUP), -1);
            this->putq (hangup);
      }

      this->wait ();
      return 0;
    }

    int svc()
    {
      ACE_Message_Block * message;
      for (message = 0; ; )
      {
            if (this->getq (message) == -1)
                ACE_ERROR_RETURN ((LM_ERROR,ACE_TEXT("%p\n"), ACE_TEXT("getq")), -1);

            if (message->msg_type () == ACE_Message_Block::MB_HANGUP){
                //ACE_DEBUG ((LM_DEBUG, ACE_TEXT("LogicThread::svc hangup\n")));
                write_log("Process thread with parsing input exit.");
                message->release ();
                break;
            }

            // ...
            switch (message->msg_type())
            {
            case MB_NORMAL_PACKET:
                {
                  PacketHeader * hdr = reinterpret_cast<PacketHeader *> (message->rd_ptr ());
                  ACE_Message_Block * data_mb = message->cont();

                  //...校验处理,省略

                  // 回包
                  {
                  ACE_Guard<ACE_Recursive_Thread_Mutex>    _lock(lock_conns);
                  map_conns::iterator it = conns::instance()->find(hdr->handler);
                  if(it != conns::instance()->end()){
                        arg_ arg = it->second;

                        if((size_t)process_cont.CommSend.commHead.wLength <= sizeof(CommMessage)){
                            ACE_Message_Block* mb;
                            ACE_NEW_NORETURN(mb, ACE_Message_Block((size_t)process_cont.CommSend.commHead.wLength));
                            mb->copy((char*)&process_cont.CommSend, mb->space());
                            arg.p_this = hdr->handler;
                            arg.mb = mb;
                            arg.len = (size_t)process_cont.CommSend.commHead.wLength;
                            arg.write_();
                        }
                  }
                  }
                }
                break;

            default:
                ACE_ERROR ((LM_ERROR, ACE_TEXT("LogicThread::svc unkown msg_type %d"), message->msg_type()));
                break;
            }
            message->release();
      }

      return 0;
    }
};

class HA_Proactive_Service : public ACE_Service_Handler
{
public:
    HA_Proactive_Service() {
    }
    ~HA_Proactive_Service (){
      if (this->handle () != ACE_INVALID_HANDLE)
            ACE_OS::closesocket (this->handle ());
    }

    void open (ACE_HANDLE h, ACE_Message_Block&){
      this->handle (h);
      if (this->reader_.open (*this) != 0 || this->writer_.open (*this) != 0 ){
            delete this;
            return;
      }

      //write_log("Current died conn amounts is %lu.", conns::instance()->size());

      arg_ arg;
      arg.set_fun(init_write_stream);
      {
      ACE_Guard<ACE_Recursive_Thread_Mutex>    _lock(lock_conns);
      conns::instance()->insert(map_conns::value_type((void*)this, arg));
      }

      if(this->init_read_stream() != 0){
            return;
      }
    }

    void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result){
      ACE_Message_Block &mb = result.message_block ();
      if (!result.success () || result.bytes_transferred () == 0){
            ACE_Guard<ACE_Recursive_Thread_Mutex>    _lock(lock_conns);
            if(conns::instance()->erase((void*)this) == 1){
                recv_data_->release ();
                delete this;
            }

            return;
      }

      if(recv_data_->space() != 0){
            // 数据包长度信息还未接收完
            if(this->reader_.read (*recv_data_, recv_data_->space ())){
                ACE_Guard<ACE_Recursive_Thread_Mutex>    _lock(lock_conns);
                if(conns::instance()->erase((void*)this) == 1){
                  recv_data_->release ();
                  delete this;
                }

                return;
            }
            return;
      }

      if(para::instance()->b_debug){
            ACE_INET_Addr addr;
            ACE_SOCK_SEQPACK_Association ass(this->handle());
            size_t addr_size = 1;
            ass.get_local_addrs(&addr, addr_size);
            ACE_TCHAR tmp;
            ACE_OS::sprintf(tmp, "Recv from %s:%u", addr.get_host_addr(), addr.get_port_number());

            ACE_HEX_DUMP((LM_DEBUG, result.message_block().rd_ptr(), result.bytes_transferred(), tmp));
      }

      PacketHeader * hdr = reinterpret_cast<PacketHeader *> (recv_data_->rd_ptr());
      if(hdr->data_.wMessageType != MessageType ||
            (size_t)hdr->data_.wLength >= sizeof(CommMessage)){
            recv_data_->release ();
            this->init_read_stream();
            return;
      }

      ACE_Message_Block * data_mb = recv_data_->cont();
      if (!data_mb){
            // 刚刚接收完长度信息
            ACE_NEW (data_mb, ACE_Message_Block((size_t)hdr->data_.wLength-sizeof(hdr->data_)));
            recv_data_->cont (data_mb);
      }

      if(data_mb->space() == 0){
            //write_log("read %s.", recv_data_->rd_ptr());
            logic_thread->putq (recv_data_);

            this->init_read_stream();
            return;
      }

      // 否则继续接收该数据包
      if(this->reader_.read (*data_mb, data_mb->space ()) == -1){
            ACE_Guard<ACE_Recursive_Thread_Mutex>    _lock(lock_conns);
            if(conns::instance()->erase((void*)this) == 1){
                recv_data_->release ();
                delete this;
            }

            return;
      }
    }

    void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result){
      if(para::instance()->b_debug){
            ACE_INET_Addr addr;
            ACE_SOCK_SEQPACK_Association ass(this->handle());
            size_t addr_size = 1;
            ass.get_local_addrs(&addr, addr_size);
            ACE_TCHAR tmp;
            ACE_OS::sprintf(tmp, "Recv from %s:%u", addr.get_host_addr(), addr.get_port_number());

            ACE_HEX_DUMP((LM_DEBUG, result.message_block().base(), result.bytes_transferred(), tmp));
      }

      result.message_block ().release ();
    }

private:
    int init_read_stream(){
      ACE_NEW_NORETURN (recv_data_, ACE_Message_Block (sizeof(PacketHeader), MB_NORMAL_PACKET));
      //ACE_HANDLE handle = this->handle ();
      //void* p_this = (void*)this;
      recv_data_->copy ((const char *)(this), sizeof(this));
      if(this->reader_.read (*recv_data_, recv_data_->space ()) == -1){
            recv_data_->release();
            ACE_ERROR_RETURN((LM_ERROR, "%p\n", "ACE_Asynch_Read_Stream::read"), -1);

            {
            ACE_Guard<ACE_Recursive_Thread_Mutex>    _lock(lock_conns);
            if(conns::instance()->erase((void*)this) == 1)
                delete this;
            }

            return -1;
      }
      return 0;
    }
    int init_write_stream (ACE_Message_Block & mb, size_t nBytes){
      if (this->writer_.write (mb , nBytes ) == -1){
            mb.release ();
            ACE_ERROR_RETURN((LM_ERROR, "%p\n", "ACE_Asynch_Write_File::write"), -1);

            {
            ACE_Guard<ACE_Recursive_Thread_Mutex>    _lock(lock_conns);
            if(conns::instance()->erase((void*)this) == 1)
                delete this;
            }

            return -1;
      }
      return 0;
    }

private:
    ACE_Asynch_Read_Stream      reader_;
    ACE_Asynch_Write_Stream      writer_;

    ACE_Message_Block*            recv_data_;
};

class NetworkThread : public ACE_Task_Base
{
public:
    NetworkThread(){
    }

    ~NetworkThread(){
    }

    int open(){
      return this->activate(THR_NEW_LWP, 4);
    }

    int close(){
      ACE_Proactor::instance()->proactor_end_event_loop();
      this->wait();
      return 0;
    }

    int svc(){
      ACE_Proactor::instance ()->proactor_run_event_loop ();

      write_log("Process thread with recving input exit.");
      return 0;
    }
};

#endif

代码如上,缺省了几个具体业务流程模块,基本还全。
思路是仿造论坛上面的一个例子,唯一不同的是增加了关于连接句柄的保存,目的是在处理数据之后回包时判断连接是否有效,如果有别的方式给个意见
目前的问题是:客户端用tp_reactor测试连接,发现链接数在2000以下CPU占用率10%-,4000+以后客户端连接失败,原因是拒绝连接,同时服务端已有连接出现个别掉线现象,反复几次都是如此。:(

peakzhang 发表于 2007-12-13 23:46:59


int
ACE_TMAIN (int argc, ACE_TCHAR *argv[])
{
    //ACE_LOG_MSG->open(".\\tp.log", ACE_Log_Msg::LOGGER, ACE_DEFAULT_LOGGER_KEY);

    if (parse_args (argc, argv) == -1){
      char x;
      std::cin >> x;
      return -1;
    }

    if(1){
      io_thread = new NetworkThread ();
      logic_thread = new LogicThread ();

      ACE_INET_Addr listen_addr(para::instance()->port, para::instance()->ip.c_str());
      ACE_Asynch_Acceptor<HA_Proactive_Service> aio_acceptor;
      if (0 != aio_acceptor.open (listen_addr, 0, 0, 5, 1, 0, 0))
            ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("NetworkThread acceptor open")), 1);

      logic_thread->open ();
      io_thread->open ();

      int x;
      std::cin >> x;

      io_thread->close ();
      logic_thread->close ();

      delete io_thread;
      delete logic_thread;
    }
    else{

    }

    return 0;
}

这个是主函数,估计是没有多少帮助

peakzhang 发表于 2007-12-13 23:47:10

aio_acceptor.open (listen_addr, 0, 0, 5, 1, 0, 0) 中的5 改成200或者2000,再试试。
还得看你机器的内存、OS情况。

peakzhang 发表于 2007-12-13 23:47:28

改成aio_acceptor.open (listen_addr, 0, 0, SOMAXCONN, 1, 0, 0, 1, 5)效果好多了,连接数到达1.5w了...
看了代码,是open时候的一个bug,如果换个写法或许就没事了:(
客户端用的tp_reactor模型
谢谢大家

agent 发表于 2009-6-1 14:33:22

你好,我们现在做一个项目,服务器肯定要用proactor框架,我想请问客户端用什么最好,我们的流程是,客户端发送请求数据的信息给服务器端,服务器端分析后,发给客户端他所需要的数据。。。可能数据会很大,比如1个G的量。。。。

agent 发表于 2009-7-14 16:47:25

不知道楼主是怎么把这句话编通的 arg.set_fun(init_write_stream);
我的编译环境(VS2008)一直提示“HA_Proactive_Service::init_write_stream”: 函数调用缺少参数列表;请使用“&HA_Proactive_Service::init_write_stream”创建指向成员的指针。。
页: [1]
查看完整版本: proactor连接数4000+出现拒绝连接的问题