找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 5232|回复: 5

proactor连接数4000+出现拒绝连接的问题

[复制链接]
发表于 2007-12-13 23:46:25 | 显示全部楼层 |阅读模式
  1. #ifndef _terminal_processor_h
  2. #define _terminal_processor_h
  3. #include "ace/OS_NS_string.h"
  4. #include "ace/OS_main.h"
  5. #include "ace/Service_Config.h"
  6. #include "ace/Proactor.h"
  7. #include "ace/Asynch_IO.h"
  8. #include "ace/Asynch_IO_Impl.h"
  9. #include "ace/Asynch_Acceptor.h"
  10. #include "ace/INET_Addr.h"
  11. #include "ace/SOCK_Connector.h"
  12. #include "ace/SOCK_Acceptor.h"
  13. #include "ace/SOCK_Stream.h"
  14. #include "ace/SOCK_SEQPACK_Association.h"
  15. #include "ace/Message_Block.h"
  16. #include "ace/Get_Opt.h"
  17. #include "ace/Log_Msg.h"
  18. #include "ace/OS_NS_sys_stat.h"
  19. #include "ace/OS_NS_sys_socket.h"
  20. #include "ace/OS_NS_unistd.h"
  21. #include "ace/OS_NS_fcntl.h"
  22. #include "ace/message_queue_t.h"
  23. #include "ace/task_t.h"
  24. #include "../include/log_imp.h"
  25. #include "../include/data_pool_imp.h"
  26. #include "./terminal_message.h"
  27. ACE_Recursive_Thread_Mutex        map_lock;
  28. // 消息类型
  29. #define MB_NORMAL_PACKET    0x201        // 普通
  30. #define MB_CONNECT_PACKET    0x202        // 连接建立
  31. #define MB_CLOSE_PACKET        0x203        // 连接关闭
  32. typedef std::map<ACE_HANDLE, void*>                                client_connection;
  33. typedef ACE_Singleton<client_connection, ACE_SYNCH_MUTEX>        _conn;
  34. class PacketHeader{
  35. public:
  36.     void* handler;
  37.     CommHead data_;
  38. };
  39. class NetworkThread;
  40. class LogicThread;
  41. NetworkThread        *io_thread;
  42. LogicThread            *logic_thread;
  43. class arg_{
  44. public:
  45.     arg_(){
  46.     }
  47.     void*                p_this;
  48.     ACE_Message_Block*    mb;
  49.     size_t                len;
  50.     typedef int (*_fun)(arg_);
  51.     _fun    fun;
  52.     void set_fun(int (*fun)(arg_)){
  53.         this->fun = fun;
  54.     }
  55.     int write_(){
  56.         return fun(*this);
  57.     }
  58. };
  59. typedef std::map<void*, arg_>                                map_conns;
  60. typedef ACE_Singleton<map_conns, ACE_Null_Mutex>            conns;
  61. ACE_Recursive_Thread_Mutex                                    lock_conns;
  62. class LogicThread : public ACE_Task<ACE_MT_SYNCH>
  63. {
  64. public:
  65.     LogicThread()
  66.     {
  67.     }
  68.     ~LogicThread()
  69.     {
  70.     }
  71.     int open(){
  72.         return this->activate(THR_NEW_LWP, (int)para::instance()->thread_count);
  73.     }
  74.     int close(){
  75.         ACE_Message_Block * hangup;
  76.         for(int i=0; i<(int)para::instance()->thread_count; i++){
  77.             ACE_NEW_RETURN (hangup, ACE_Message_Block(0, ACE_Message_Block::MB_HANGUP), -1);
  78.             this->putq (hangup);
  79.         }
  80.         this->wait ();
  81.         return 0;
  82.     }
  83.     int svc()
  84.     {
  85.         ACE_Message_Block * message;
  86.         for (message = 0; ; )
  87.         {
  88.             if (this->getq (message) == -1)
  89.                 ACE_ERROR_RETURN ((LM_ERROR,ACE_TEXT("%p\n"), ACE_TEXT("getq")), -1);
  90.             if (message->msg_type () == ACE_Message_Block::MB_HANGUP){
  91.                 //ACE_DEBUG ((LM_DEBUG, ACE_TEXT("LogicThread::svc hangup\n")));
  92.                 write_log("Process thread with parsing input exit.");
  93.                 message->release ();
  94.                 break;
  95.             }
  96.             // ...
  97.             switch (message->msg_type())
  98.             {
  99.             case MB_NORMAL_PACKET:
  100.                 {
  101.                     PacketHeader * hdr = reinterpret_cast<PacketHeader *> (message->rd_ptr ());
  102.                     ACE_Message_Block * data_mb = message->cont();
  103.                     //...校验处理,省略
  104.                     // 回包
  105.                     {
  106.                     ACE_Guard<ACE_Recursive_Thread_Mutex>    _lock(lock_conns);
  107.                     map_conns::iterator it = conns::instance()->find(hdr->handler);
  108.                     if(it != conns::instance()->end()){
  109.                         arg_ arg = it->second;
  110.                         if((size_t)process_cont.CommSend.commHead.wLength <= sizeof(CommMessage)){
  111.                             ACE_Message_Block* mb;
  112.                             ACE_NEW_NORETURN(mb, ACE_Message_Block((size_t)process_cont.CommSend.commHead.wLength));
  113.                             mb->copy((char*)&process_cont.CommSend, mb->space());
  114.                             arg.p_this = hdr->handler;
  115.                             arg.mb = mb;
  116.                             arg.len = (size_t)process_cont.CommSend.commHead.wLength;
  117.                             arg.write_();
  118.                         }
  119.                     }
  120.                     }
  121.                 }
  122.                 break;
  123.             default:
  124.                 ACE_ERROR ((LM_ERROR, ACE_TEXT("LogicThread::svc unkown msg_type %d"), message->msg_type()));
  125.                 break;
  126.             }
  127.             message->release();
  128.         }
  129.         return 0;
  130.     }
  131. };
  132. class HA_Proactive_Service : public ACE_Service_Handler
  133. {
  134. public:
  135.     HA_Proactive_Service() {
  136.     }
  137.     ~HA_Proactive_Service (){
  138.         if (this->handle () != ACE_INVALID_HANDLE)
  139.             ACE_OS::closesocket (this->handle ());
  140.     }
  141.     void open (ACE_HANDLE h, ACE_Message_Block&){
  142.         this->handle (h);
  143.         if (this->reader_.open (*this) != 0 || this->writer_.open (*this) != 0 ){
  144.             delete this;
  145.             return;
  146.         }
  147.         //write_log("Current died conn amounts is %lu.", conns::instance()->size());
  148.         arg_ arg;
  149.         arg.set_fun(init_write_stream);
  150.         {
  151.         ACE_Guard<ACE_Recursive_Thread_Mutex>    _lock(lock_conns);
  152.         conns::instance()->insert(map_conns::value_type((void*)this, arg));
  153.         }
  154.         if(this->init_read_stream() != 0){
  155.             return;
  156.         }
  157.     }
  158.     void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result){
  159.         ACE_Message_Block &mb = result.message_block ();
  160.         if (!result.success () || result.bytes_transferred () == 0){
  161.             ACE_Guard<ACE_Recursive_Thread_Mutex>    _lock(lock_conns);
  162.             if(conns::instance()->erase((void*)this) == 1){
  163.                 recv_data_->release ();
  164.                 delete this;
  165.             }
  166.             return;
  167.         }
  168.         if(recv_data_->space() != 0){
  169.             // 数据包长度信息还未接收完
  170.             if(this->reader_.read (*recv_data_, recv_data_->space ())){
  171.                 ACE_Guard<ACE_Recursive_Thread_Mutex>    _lock(lock_conns);
  172.                 if(conns::instance()->erase((void*)this) == 1){
  173.                     recv_data_->release ();
  174.                     delete this;
  175.                 }
  176.                 return;
  177.             }
  178.             return;
  179.         }
  180.         if(para::instance()->b_debug){
  181.             ACE_INET_Addr addr;
  182.             ACE_SOCK_SEQPACK_Association ass(this->handle());
  183.             size_t addr_size = 1;
  184.             ass.get_local_addrs(&addr, addr_size);
  185.             ACE_TCHAR tmp[128];
  186.             ACE_OS::sprintf(tmp, "Recv from %s:%u", addr.get_host_addr(), addr.get_port_number());
  187.             ACE_HEX_DUMP((LM_DEBUG, result.message_block().rd_ptr(), result.bytes_transferred(), tmp));
  188.         }
  189.         PacketHeader * hdr = reinterpret_cast<PacketHeader *> (recv_data_->rd_ptr());
  190.         if(hdr->data_.wMessageType != MessageType ||
  191.             (size_t)hdr->data_.wLength >= sizeof(CommMessage)){
  192.             recv_data_->release ();
  193.             this->init_read_stream();
  194.             return;
  195.         }
  196.         ACE_Message_Block * data_mb = recv_data_->cont();
  197.         if (!data_mb){
  198.             // 刚刚接收完长度信息
  199.             ACE_NEW (data_mb, ACE_Message_Block((size_t)hdr->data_.wLength-sizeof(hdr->data_)));
  200.             recv_data_->cont (data_mb);
  201.         }
  202.         if(data_mb->space() == 0){
  203.             //write_log("read %s.", recv_data_->rd_ptr());
  204.             logic_thread->putq (recv_data_);
  205.             this->init_read_stream();
  206.             return;
  207.         }
  208.         // 否则继续接收该数据包
  209.         if(this->reader_.read (*data_mb, data_mb->space ()) == -1){
  210.             ACE_Guard<ACE_Recursive_Thread_Mutex>    _lock(lock_conns);
  211.             if(conns::instance()->erase((void*)this) == 1){
  212.                 recv_data_->release ();
  213.                 delete this;
  214.             }
  215.             return;
  216.         }
  217.     }
  218.     void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result){
  219.         if(para::instance()->b_debug){
  220.             ACE_INET_Addr addr;
  221.             ACE_SOCK_SEQPACK_Association ass(this->handle());
  222.             size_t addr_size = 1;
  223.             ass.get_local_addrs(&addr, addr_size);
  224.             ACE_TCHAR tmp[128];
  225.             ACE_OS::sprintf(tmp, "Recv from %s:%u", addr.get_host_addr(), addr.get_port_number());
  226.             ACE_HEX_DUMP((LM_DEBUG, result.message_block().base(), result.bytes_transferred(), tmp));
  227.         }
  228.         result.message_block ().release ();
  229.     }
  230. private:
  231.     int init_read_stream(){
  232.         ACE_NEW_NORETURN (recv_data_, ACE_Message_Block (sizeof(PacketHeader), MB_NORMAL_PACKET));
  233.         //ACE_HANDLE handle = this->handle ();
  234.         //void* p_this = (void*)this;
  235.         recv_data_->copy ((const char *)(this), sizeof(this));
  236.         if(this->reader_.read (*recv_data_, recv_data_->space ()) == -1){
  237.             recv_data_->release();
  238.             ACE_ERROR_RETURN((LM_ERROR, "%p\n", "ACE_Asynch_Read_Stream::read"), -1);
  239.             {
  240.             ACE_Guard<ACE_Recursive_Thread_Mutex>    _lock(lock_conns);
  241.             if(conns::instance()->erase((void*)this) == 1)
  242.                 delete this;
  243.             }
  244.             return -1;
  245.         }
  246.         return 0;
  247.     }
  248.     int init_write_stream (ACE_Message_Block & mb, size_t nBytes){
  249.         if (this->writer_.write (mb , nBytes ) == -1){
  250.             mb.release ();
  251.             ACE_ERROR_RETURN((LM_ERROR, "%p\n", "ACE_Asynch_Write_File::write"), -1);
  252.             {
  253.             ACE_Guard<ACE_Recursive_Thread_Mutex>    _lock(lock_conns);
  254.             if(conns::instance()->erase((void*)this) == 1)
  255.                 delete this;
  256.             }
  257.             return -1;
  258.         }
  259.         return 0;
  260.     }
  261. private:
  262.     ACE_Asynch_Read_Stream        reader_;
  263.     ACE_Asynch_Write_Stream        writer_;
  264.     ACE_Message_Block*            recv_data_;
  265. };
  266. class NetworkThread : public ACE_Task_Base
  267. {
  268. public:
  269.     NetworkThread(){
  270.     }
  271.     ~NetworkThread(){
  272.     }
  273.     int open(){
  274.         return this->activate(THR_NEW_LWP, 4);
  275.     }
  276.     int close(){
  277.         ACE_Proactor::instance()->proactor_end_event_loop();
  278.         this->wait();
  279.         return 0;
  280.     }
  281.     int svc(){
  282.         ACE_Proactor::instance ()->proactor_run_event_loop ();
  283.         write_log("Process thread with recving input exit.");
  284.         return 0;
  285.     }
  286. };
  287. #endif
复制代码
代码如上,缺省了几个具体业务流程模块,基本还全。
思路是仿造论坛上面的一个例子,唯一不同的是增加了关于连接句柄的保存,目的是在处理数据之后回包时判断连接是否有效,如果有别的方式给个意见
目前的问题是:客户端用tp_reactor测试连接,发现链接数在2000以下CPU占用率10%-,4000+以后客户端连接失败,原因是拒绝连接,同时服务端已有连接出现个别掉线现象,反复几次都是如此。:(
 楼主| 发表于 2007-12-13 23:46:59 | 显示全部楼层
  1. int
  2. ACE_TMAIN (int argc, ACE_TCHAR *argv[])
  3. {
  4.     //ACE_LOG_MSG->open(".\\tp.log", ACE_Log_Msg::LOGGER, ACE_DEFAULT_LOGGER_KEY);
  5.     if (parse_args (argc, argv) == -1){
  6.         char x;
  7.         std::cin >> x;
  8.         return -1;
  9.     }
  10.     if(1){
  11.         io_thread = new NetworkThread ();
  12.         logic_thread = new LogicThread ();
  13.         ACE_INET_Addr listen_addr(para::instance()->port, para::instance()->ip.c_str());
  14.         ACE_Asynch_Acceptor<HA_Proactive_Service> aio_acceptor;
  15.         if (0 != aio_acceptor.open (listen_addr, 0, 0, 5, 1, 0, 0))
  16.             ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("NetworkThread acceptor open")), 1);
  17.         logic_thread->open ();
  18.         io_thread->open ();
  19.         int x;
  20.         std::cin >> x;
  21.         io_thread->close ();
  22.         logic_thread->close ();
  23.         delete io_thread;
  24.         delete logic_thread;
  25.     }
  26.     else{
  27.     }
  28.     return 0;
  29. }
复制代码
这个是主函数,估计是没有多少帮助
 楼主| 发表于 2007-12-13 23:47:10 | 显示全部楼层
aio_acceptor.open (listen_addr, 0, 0, 5, 1, 0, 0) 中的5 改成200或者2000,再试试。
还得看你机器的内存、OS情况。
 楼主| 发表于 2007-12-13 23:47:28 | 显示全部楼层
改成aio_acceptor.open (listen_addr, 0, 0, SOMAXCONN, 1, 0, 0, 1, 5)效果好多了,连接数到达1.5w了...
看了代码,是open时候的一个bug,如果换个写法或许就没事了:(
客户端用的tp_reactor模型
谢谢大家
发表于 2009-6-1 14:33:22 | 显示全部楼层
你好,我们现在做一个项目,服务器肯定要用proactor框架,我想请问客户端用什么最好,我们的流程是,客户端发送请求数据的信息给服务器端,服务器端分析后,发给客户端他所需要的数据。。。可能数据会很大,比如1个G的量。。。。
发表于 2009-7-14 16:47:25 | 显示全部楼层
不知道楼主是怎么把这句话编通的 arg.set_fun(init_write_stream);
我的编译环境(VS2008)一直提示“HA_Proactive_Service::init_write_stream”: 函数调用缺少参数列表;请使用“&HA_Proactive_Service::init_write_stream”创建指向成员的指针。。
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-11-23 00:22 , Processed in 0.013984 second(s), 5 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表