找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 4201|回复: 4

我的Dev的Reactor与普通的连接数没区别

[复制链接]
发表于 2007-12-24 23:34:08 | 显示全部楼层 |阅读模式
ACE_Dev_Poll_Reactor *dp_reactor;
dp_reactor=new ACE_Dev_Poll_Reactor;
ACE_Reactor::instance(new ACE_Reactor(dp_reactor))->run_reactor_event_loop();
我做的ACE_Dev_Poll_Reactor和普通的ACE_Reactor的用户连接数是一样多的。
 楼主| 发表于 2007-12-24 23:34:21 | 显示全部楼层
我做了服务器,又写了测试程序,当有1016个客户端接入服务后,就不能再有客户端连上了,因为看资料说ACE_Dev_Pool_Reactor可以实现大用户量接入的。但测试结果不像所说的那样。
 楼主| 发表于 2007-12-24 23:34:45 | 显示全部楼层
我怎么没发现这个问题呢?
你按照ACE的测试代码改写试试。
  1. //=============================================================================
  2. /**
  3. *  @file    Dev_Poll_Reactor_Test.cpp
  4. *
  5. *  Dev_Poll_Reactor_Test.cpp,v 4.19 2005/11/15 14:32:27 shuston Exp
  6. *
  7. *  This test verifies that the Dev_Poll_Reactor is functioning
  8. *  properly, and demonstrates how "speculative reads" can be
  9. *  performed.  "Speculative reads" are necessary when using event
  10. *  demultiplexing mechanisms that use a "state change" interface.
  11. *  Similarly, "speculative writes" may be necessary, i.e. keep
  12. *  writing until the connection is flow controlled.  An example of a
  13. *  demuxing mechanism with such an interface is Linux's `/dev/epoll'
  14. *  character device.  Mechansims with state change interfaces are
  15. *  also said to be "edge triggered," versus "level triggered"
  16. *  mechanisms such as select().
  17. *
  18. *  @author Ossama Othman <[email]ossama@dre.vanderbilt.edu[/email]>
  19. */
  20. //=============================================================================
  21. #include "test_config.h"
  22. ACE_RCSID (tests,
  23.            Dev_Poll_Reactor_Test,
  24.            "Dev_Poll_Reactor_Test.cpp,v 4.19 2005/11/15 14:32:27 shuston Exp")
  25. #if defined (ACE_HAS_DEV_POLL) || defined (ACE_HAS_EVENT_POLL)
  26. #include "ace/OS_NS_signal.h"
  27. #include "ace/Reactor.h"
  28. #include "ace/Dev_Poll_Reactor.h"
  29. #include "ace/Acceptor.h"
  30. #include "ace/Connector.h"
  31. #include "ace/SOCK_Acceptor.h"
  32. #include "ace/SOCK_Connector.h"
  33. #include "ace/SOCK_Stream.h"
  34. #include "ace/OS_NS_unistd.h"
  35. #include "ace/OS_NS_netdb.h"
  36. typedef ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH> SVC_HANDLER;
  37. // ----------------------------------------------------
  38. class Client : public SVC_HANDLER
  39. {
  40. public:
  41.   Client (void);
  42.   virtual int open (void * = 0);
  43.   virtual int handle_output (ACE_HANDLE handle);
  44.   virtual int handle_timeout (const ACE_Time_Value &current_time,
  45.                               const void *act);
  46.   virtual int handle_close (ACE_HANDLE handle,
  47.                             ACE_Reactor_Mask mask);
  48. private:
  49.   unsigned int call_count_;
  50. };
  51. class Server : public SVC_HANDLER
  52. {
  53. public:
  54.   Server (void);
  55.   virtual int handle_input (ACE_HANDLE handle);
  56.   virtual int handle_timeout (const ACE_Time_Value &current_time,
  57.                               const void *act);
  58.   virtual int handle_close (ACE_HANDLE handle,
  59.                             ACE_Reactor_Mask mask);
  60. private:
  61.   unsigned int call_count_;
  62. };
  63. // ----------------------------------------------------
  64. Client::Client (void)
  65.   : call_count_ (0)
  66. {
  67. }
  68. int
  69. Client::open (void *)
  70. {
  71.   //  ACE_ASSERT (this->reactor () != 0);
  72.   if (this->reactor ()
  73.       && this->reactor ()->register_handler (
  74.            this,
  75.            ACE_Event_Handler::WRITE_MASK) == -1)
  76.     ACE_ERROR_RETURN ((LM_ERROR,
  77.                        ACE_TEXT ("(%t) %p\n"),
  78.                        ACE_TEXT ("unable to register client handler")),
  79.                       -1);
  80.   return 0;
  81. }
  82. int
  83. Client::handle_output (ACE_HANDLE)
  84. {
  85.   for (int i = 1; i <= 5; ++i)
  86.     {
  87.       char buffer[BUFSIZ] = { 0 };
  88.       ACE_OS::sprintf (buffer, "test message %d.\n", i);
  89.       ssize_t bytes_sent =
  90.         this->peer ().send (buffer, ACE_OS::strlen (buffer));
  91.       if (bytes_sent == -1)
  92.         {
  93.           if (errno == EWOULDBLOCK)
  94.             return 0;  // Flow control kicked in.
  95.           else if (errno == EPIPE)
  96.             {
  97.               ACE_DEBUG ((LM_DEBUG,
  98.                           ACE_TEXT ("(%t) Client::handle_output; server ")
  99.                           ACE_TEXT ("closed handle %d\n"),
  100.                           this->peer ().get_handle ()));
  101.               return -1;
  102.             }
  103.           else
  104.             ACE_ERROR_RETURN ((LM_ERROR,
  105.                                ACE_TEXT ("(%t) %p\n"),
  106.                                ACE_TEXT ("Client::handle_output")),
  107.                               -1);
  108.         }
  109.       else if (bytes_sent == 0)
  110.         return -1;
  111.       else
  112.         ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Sent %s"), buffer));
  113.     }
  114.   return 0;
  115. }
  116. int
  117. Client::handle_timeout (const ACE_Time_Value &, const void *)
  118. {
  119.   ACE_DEBUG ((LM_INFO,
  120.               ACE_TEXT ("(%t) Expected client timeout occured at: %T\n")));
  121.   this->call_count_++;
  122.   int status = this->handle_output (this->get_handle ());
  123.   if (status == -1 || this->call_count_ > 10)
  124.     {
  125.       if (this->reactor ()->end_reactor_event_loop () == 0)
  126.         ACE_DEBUG ((LM_INFO,
  127.                     ACE_TEXT ("(%t) Successful client reactor shutdown.\n")));
  128.       else
  129.         ACE_ERROR ((LM_ERROR,
  130.                     ACE_TEXT ("(%t) %p\n"),
  131.                     ACE_TEXT ("Failed client reactor shutdown")));
  132.       // Force this service handler to be closed in either case.
  133.       return -1;
  134.     }
  135.   return 0;
  136. }
  137. int
  138. Client::handle_close (ACE_HANDLE handle,
  139.                       ACE_Reactor_Mask mask)
  140. {
  141.   ACE_DEBUG ((LM_INFO,
  142.               ACE_TEXT ("(%t) Client Svc_Handler closed ")
  143.               ACE_TEXT ("handle <%d> with reactor mask <0x%x>.\n"),
  144.               handle,
  145.               mask));
  146.   return 0;
  147. }
  148. // ----------------------------------------------------
  149. Server::Server (void)
  150.   : call_count_ (0)
  151. {
  152. }
  153. int
  154. Server::handle_input (ACE_HANDLE /* handle */)
  155. {
  156.   char buffer[BUFSIZ+1] = { 0 };    // Insure a trailing nul
  157.   ssize_t bytes_read = 0;
  158.   char * const begin = buffer;
  159.   char * const end   = buffer + BUFSIZ;
  160.   for (char * buf = begin; buf < end; buf += bytes_read)
  161.     {
  162.       // Keep reading until it is no longer possible to do so.
  163.       //
  164.       // This is done since the underlying event demultiplexing
  165.       // mechanism may have a "state change" interface (as opposed to
  166.       // "state monitoring"), in which case a "speculative" read is
  167.       // done.
  168.       bytes_read = this->peer ().recv (buf, end - buf);
  169.       ACE_DEBUG ((LM_DEBUG,
  170.                   ACE_TEXT ("****** bytes_read = %d\n"),
  171.                   bytes_read));
  172.       if (bytes_read == -1)
  173.         {
  174.           if (errno == EWOULDBLOCK)
  175.             {
  176. //               ACE_HEX_DUMP ((LM_DEBUG,
  177. //                              buf,
  178. //                              80,
  179. //                              "BUFFER CONTENTS"));
  180.               if (buf == buffer)
  181.                 return 0;
  182.               else
  183.                 break;
  184.             }
  185.           else
  186.             ACE_ERROR_RETURN ((LM_ERROR,
  187.                                ACE_TEXT ("(%t) %p\n"),
  188.                                ACE_TEXT ("Server::handle_input")),
  189.                               -1);
  190.         }
  191.       else if (bytes_read == 0)
  192.         return -1;
  193.     }
  194.   ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Message received: %s\n"), buffer));
  195.   return 0;
  196. }
  197. int
  198. Server::handle_timeout (const ACE_Time_Value &,
  199.                         const void *)
  200. {
  201.   ACE_DEBUG ((LM_INFO,
  202.               ACE_TEXT ("(%t) Expected server timeout occured at: %T\n")));
  203. //   if (this->call_count_ == 0
  204. //       && this->handle_input (this->get_handle ()) != 0
  205. //       && errno != EWOULDBLOCK)
  206. //     return -1;
  207. //   ACE_DEBUG ((LM_INFO,
  208. //               "SERVER HANDLE = %d\n",
  209. //               this->get_handle ()));
  210.   this->call_count_++;
  211.   if (this->call_count_ > 10)
  212.     {
  213.       if (this->reactor ()->end_reactor_event_loop () == 0)
  214.         ACE_DEBUG ((LM_INFO,
  215.                     ACE_TEXT ("(%t) Successful server reactor shutdown.\n")));
  216.       else
  217.         ACE_ERROR ((LM_ERROR,
  218.                     ACE_TEXT ("(%t) %p\n"),
  219.                     ACE_TEXT ("Failed server reactor shutdown")));
  220.       // Force this service handler to be closed in either case.
  221.       return -1;
  222.     }
  223.   return 0;
  224. }
  225. int
  226. Server::handle_close (ACE_HANDLE handle,
  227.                       ACE_Reactor_Mask mask)
  228. {
  229.   if (this->call_count_ > 4)
  230.     {
  231.       ACE_DEBUG ((LM_INFO,
  232.                   ACE_TEXT ("(%t) Server Svc_Handler closing ")
  233.                   ACE_TEXT ("handle <%d,%d> with reactor mask <0x%x>.\n"),
  234.                   handle,
  235.                   this->get_handle (),
  236.                   mask));
  237.       return this->peer ().close ();
  238.     }
  239.   return 0;
  240. }
  241. // ----------------------------------------------------
  242. typedef ACE_Acceptor<Server, ACE_SOCK_ACCEPTOR>   ACCEPTOR;
  243. typedef ACE_Connector<Client, ACE_SOCK_CONNECTOR> CONNECTOR;
  244. // ----------------------------------------------------
  245. class TestAcceptor : public ACCEPTOR
  246. {
  247. public:
  248.   virtual int accept_svc_handler (Server * handler)
  249.   {
  250.     int result = this->ACCEPTOR::accept_svc_handler (handler);
  251.     if (result != 0)
  252.       {
  253.         if (errno != EWOULDBLOCK)
  254.           ACE_ERROR ((LM_ERROR,
  255.                       ACE_TEXT ("(%t) %p\n"),
  256.                       ACE_TEXT ("Unable to accept connection")));
  257.         return result;
  258.       }
  259.     ACE_DEBUG ((LM_DEBUG,
  260.                 ACE_TEXT ("(%t) Accepted connection.  ")
  261.                 ACE_TEXT ("Stream handle: <%d>\n"),
  262.                 handler->get_handle ()));
  263. //     if (handler->handle_input (handler->get_handle ()) == -1
  264. //         && errno != EWOULDBLOCK)
  265. //       return -1;
  266. // #if 0
  267.     ACE_Time_Value delay (2, 0);
  268.     ACE_Time_Value restart (2, 0);
  269.     if (handler->reactor ()->schedule_timer (handler,
  270.                                              0,
  271.                                              delay,
  272.                                              restart) == -1)
  273.       {
  274.         ACE_ERROR_RETURN ((LM_ERROR,
  275.                            ACE_TEXT ("(%t) %p\n"),
  276.                            ACE_TEXT ("Unable to schedule server side ")
  277.                            ACE_TEXT ("timer in ACE_Dev_Poll_Reactor")),
  278.                           -1);
  279.       }
  280. // #endif  /* 0 */
  281.     return result;
  282.   }
  283. };
  284. // ----------------------------------------------------
  285. class TestConnector : public CONNECTOR
  286. {
  287. public:
  288.   virtual int connect_svc_handler (
  289.     CONNECTOR::handler_type *& handler,
  290.     const CONNECTOR::addr_type &remote_addr,
  291.     ACE_Time_Value *timeout,
  292.     const CONNECTOR::addr_type &local_addr,
  293.     int reuse_addr,
  294.     int flags,
  295.     int perms)
  296.   {
  297.     const int result = this->CONNECTOR::connect_svc_handler (handler,
  298.                                                              remote_addr,
  299.                                                              timeout,
  300.                                                              local_addr,
  301.                                                              reuse_addr,
  302.                                                              flags,
  303.                                                              perms);
  304.     if (result != 0)
  305.       return result;
  306.     ACE_TCHAR hostname[MAXHOSTNAMELEN];
  307.     if (remote_addr.get_host_name (hostname,
  308.                                    sizeof (hostname)) != 0)
  309.       {
  310.         ACE_ERROR_RETURN ((LM_ERROR,
  311.                            ACE_TEXT ("(%t) %p\n"),
  312.                            ACE_TEXT ("Unable to retrieve hostname")),
  313.                           -1);
  314.       }
  315.     ACE_DEBUG ((LM_DEBUG,
  316.                 ACE_TEXT ("(%t) Connected to <%s:%d>.\n"),
  317.                 hostname,
  318.                 (int) remote_addr.get_port_number ()));
  319. // #if 0
  320.     ACE_Time_Value delay (4, 0);
  321.     ACE_Time_Value restart (3, 0);
  322.     if (handler->reactor ()->schedule_timer (handler,
  323.                                              0,
  324.                                              delay,
  325.                                              restart) == -1)
  326.       {
  327.         ACE_ERROR_RETURN ((LM_ERROR,
  328.                            ACE_TEXT ("(%t) %p\n"),
  329.                            ACE_TEXT ("Unable to schedule client side ")
  330.                            ACE_TEXT ("timer in ACE_Dev_Poll_Reactor")),
  331.                           -1);
  332.       }
  333. // #endif  /* 0 */
  334.     return result;
  335.   }
  336.   virtual int connect_svc_handler (
  337.     CONNECTOR::handler_type *& handler,
  338.     CONNECTOR::handler_type *& sh_copy,
  339.     const CONNECTOR::addr_type &remote_addr,
  340.     ACE_Time_Value *timeout,
  341.     const CONNECTOR::addr_type &local_addr,
  342.     int reuse_addr,
  343.     int flags,
  344.     int perms) {
  345.     sh_copy = handler;
  346.     return this->connect_svc_handler (handler, remote_addr, timeout,
  347.                                       local_addr, reuse_addr, flags,
  348.                                       perms);
  349.   }
  350. };
  351. // ----------------------------------------------------
  352. ACE_THR_FUNC_RETURN
  353. server_worker (void *p)
  354. {
  355.   const unsigned short port = *(static_cast<unsigned short *> (p));
  356.   ACE_INET_Addr addr;
  357.   if (addr.set (port, INADDR_LOOPBACK) != 0)
  358.     {
  359.       ACE_ERROR ((LM_ERROR,
  360.                   ACE_TEXT ("(%t) %p\n"),
  361.                   ACE_TEXT ("server_worker - ACE_INET_Addr::set")));
  362.       return (void *) -1;
  363.     }
  364.   ACE_Dev_Poll_Reactor dp_reactor;
  365.   dp_reactor.restart (1);     // Restart on EINTR
  366.   ACE_Reactor reactor (&dp_reactor);
  367.   TestAcceptor server;
  368.   int flags = 0;
  369.   ACE_SET_BITS (flags, ACE_NONBLOCK);  // Enable non-blocking in the
  370.                                        // Svc_Handlers.
  371.   if (server.open (addr, &reactor, flags) != 0)
  372.     {
  373.       ACE_ERROR ((LM_ERROR,
  374.                   ACE_TEXT ("(%t) %p\n"),
  375.                   ACE_TEXT ("Unable to open server service handler")));
  376.       return (void *) -1;
  377.     }
  378.   if (reactor.run_reactor_event_loop () != 0)
  379.     {
  380.       ACE_ERROR ((LM_ERROR,
  381.                   ACE_TEXT ("(%t) %p\n"),
  382.                   ACE_TEXT ("Error when running server ")
  383.                   ACE_TEXT ("reactor event loop")));
  384.       return (void *) -1;
  385.     }
  386.   ACE_DEBUG ((LM_INFO,
  387.               ACE_TEXT ("(%t) Reactor event loop finished ")
  388.               ACE_TEXT ("successfully.\n")));
  389.   return 0;
  390. }
  391. // ----------------------------------------------------
  392. // struct server_arg
  393. // {
  394. //   unsigned short port;
  395. //   ACE_Condition<ACE_SYNCH_MUTEX> * cv;
  396. // };
  397. // ----------------------------------------------------
  398. int
  399. run_main (int, ACE_TCHAR *[])
  400. {
  401.   ACE_START_TEST (ACE_TEXT ("Dev_Poll_Reactor_Test"));
  402.   // Make sure we ignore SIGPIPE
  403.   sigset_t sigsetNew[1];
  404.   sigset_t sigsetOld[1];
  405.   ACE_OS::sigemptyset (sigsetNew);
  406.   ACE_OS::sigaddset (sigsetNew, SIGPIPE);
  407.   ACE_OS::sigprocmask (SIG_BLOCK, sigsetNew, sigsetOld);
  408.   ACE_Dev_Poll_Reactor dp_reactor;
  409.   dp_reactor.restart (1);          // Restart on EINTR
  410.   ACE_Reactor reactor (&dp_reactor);
  411.   TestConnector client;
  412.   int flags = 0;
  413.   ACE_SET_BITS (flags, ACE_NONBLOCK);  // Enable non-blocking in the
  414.                                        // Svc_Handlers.
  415.   if (client.open (&reactor, flags) != 0)
  416.     ACE_ERROR_RETURN ((LM_ERROR,
  417.                        ACE_TEXT ("(%t) %p\n"),
  418.                        ACE_TEXT ("Unable to open client service handler")),
  419.                       -1);
  420. //   ACE_SYNCH_MUTEX mutex;
  421. //   ACE_Condition<ACE_SYNCH_MUTEX> cv (mutex);
  422. //   server_arg arg;
  423. //   arg.port = 54678;  // Port the server will listen on.
  424. //   arg.cv = &cv;
  425.   unsigned short port = 54678;
  426.   if (ACE_Thread_Manager::instance ()->spawn (server_worker, &port) == -1)
  427.     ACE_ERROR_RETURN ((LM_ERROR,
  428.                        ACE_TEXT ("(%t) %p\n"),
  429.                        ACE_TEXT ("Unable to spawn server thread")),
  430.                       -1);
  431.   ACE_OS::sleep (5);  // Wait for the listening endpoint to be set up.
  432.   ACE_INET_Addr addr;
  433.   if (addr.set (port, INADDR_LOOPBACK) != 0)
  434.     ACE_ERROR_RETURN ((LM_ERROR,
  435.                        ACE_TEXT ("(%t) %p\n"),
  436.                        ACE_TEXT ("ACE_INET_Addr::set")),
  437.                       -1);
  438.   Client *client_handler = 0;
  439.   if (client.connect (client_handler, addr) != 0)
  440.     ACE_ERROR_RETURN ((LM_ERROR,
  441.                        ACE_TEXT ("(%t) %p\n"),
  442.                        ACE_TEXT ("Unable to connect to server")),
  443.                       -1);
  444.   if (reactor.run_reactor_event_loop () != 0)
  445.     ACE_ERROR_RETURN ((LM_ERROR,
  446.                        ACE_TEXT ("(%t) %p\n"),
  447.                        ACE_TEXT ("Error when running client ")
  448.                        ACE_TEXT ("reactor event loop")),
  449.                       -1);
  450.   if (ACE_Thread_Manager::instance ()->wait () != 0)
  451.     ACE_ERROR_RETURN ((LM_ERROR,
  452.                        ACE_TEXT ("(%t) %p\n"),
  453.                        ACE_TEXT ("Error waiting for threads to complete")),
  454.                       -1);
  455.   ACE_END_TEST;
  456.   return 0;
  457. }
  458. #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
  459. template class ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>;
  460. template class ACE_Acceptor<Server, ACE_SOCK_ACCEPTOR>;
  461. template class ACE_Connector_Base<Client>;
  462. template class ACE_Connector<Client, ACE_SOCK_CONNECTOR>;
  463. template class ACE_NonBlocking_Connect_Handler<Client>;
  464. #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
  465. #pragma instantiate ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>
  466. #pragma instantiate ACE_Acceptor<Server, ACE_SOCK_ACCEPTOR>
  467. #pragma instantiate ACE_Connector_Base<Client>
  468. #pragma instantiate ACE_Connector<Client, ACE_SOCK_CONNECTOR>
  469. #pragma instantiate ACE_NonBlocking_Connect_Handler<Client>
  470. #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
  471. #else
  472. int
  473. run_main (int, ACE_TCHAR *[])
  474. {
  475.   ACE_START_TEST (ACE_TEXT ("Dev_Poll_Reactor_Test"));
  476.   ACE_ERROR ((LM_INFO,
  477.               ACE_TEXT ("Dev Poll and Event Poll are not supported ")
  478.               ACE_TEXT ("on this platform\n")));
  479.   ACE_END_TEST;
  480.   return 0;
  481. }
  482. #endif  /* ACE_HAS_DEV_POLL || ACE_HAS_EVENT_POLL */
复制代码
 楼主| 发表于 2007-12-24 23:35:00 | 显示全部楼层
echo 4096>max_fd试试。
发表于 2008-1-13 16:55:14 | 显示全部楼层
系统默认handle是1024...,需要改下。
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-11-23 13:31 , Processed in 0.016815 second(s), 5 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表