peakzhang 发表于 2007-12-24 23:34:08

我的Dev的Reactor与普通的连接数没区别

ACE_Dev_Poll_Reactor *dp_reactor;
dp_reactor=new ACE_Dev_Poll_Reactor;
ACE_Reactor::instance(new ACE_Reactor(dp_reactor))->run_reactor_event_loop();
我做的ACE_Dev_Poll_Reactor和普通的ACE_Reactor的用户连接数是一样多的。

peakzhang 发表于 2007-12-24 23:34:21

我做了服务器,又写了测试程序,当有1016个客户端接入服务后,就不能再有客户端连上了,因为看资料说ACE_Dev_Pool_Reactor可以实现大用户量接入的。但测试结果不像所说的那样。

peakzhang 发表于 2007-12-24 23:34:45

我怎么没发现这个问题呢?
你按照ACE的测试代码改写试试。

//=============================================================================
/**
*@file    Dev_Poll_Reactor_Test.cpp
*
*Dev_Poll_Reactor_Test.cpp,v 4.19 2005/11/15 14:32:27 shuston Exp
*
*This test verifies that the Dev_Poll_Reactor is functioning
*properly, and demonstrates how "speculative reads" can be
*performed."Speculative reads" are necessary when using event
*demultiplexing mechanisms that use a "state change" interface.
*Similarly, "speculative writes" may be necessary, i.e. keep
*writing until the connection is flow controlled.An example of a
*demuxing mechanism with such an interface is Linux's `/dev/epoll'
*character device.Mechansims with state change interfaces are
*also said to be "edge triggered," versus "level triggered"
*mechanisms such as select().
*
*@author Ossama Othman <ossama@dre.vanderbilt.edu>
*/
//=============================================================================


#include "test_config.h"

ACE_RCSID (tests,
         Dev_Poll_Reactor_Test,
         "Dev_Poll_Reactor_Test.cpp,v 4.19 2005/11/15 14:32:27 shuston Exp")


#if defined (ACE_HAS_DEV_POLL) || defined (ACE_HAS_EVENT_POLL)

#include "ace/OS_NS_signal.h"
#include "ace/Reactor.h"
#include "ace/Dev_Poll_Reactor.h"

#include "ace/Acceptor.h"
#include "ace/Connector.h"

#include "ace/SOCK_Acceptor.h"
#include "ace/SOCK_Connector.h"
#include "ace/SOCK_Stream.h"

#include "ace/OS_NS_unistd.h"
#include "ace/OS_NS_netdb.h"


typedef ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH> SVC_HANDLER;

// ----------------------------------------------------

class Client : public SVC_HANDLER
{
public:

Client (void);

virtual int open (void * = 0);

virtual int handle_output (ACE_HANDLE handle);

virtual int handle_timeout (const ACE_Time_Value &current_time,
                              const void *act);

virtual int handle_close (ACE_HANDLE handle,
                            ACE_Reactor_Mask mask);

private:

unsigned int call_count_;

};


class Server : public SVC_HANDLER
{
public:

Server (void);

virtual int handle_input (ACE_HANDLE handle);

virtual int handle_timeout (const ACE_Time_Value &current_time,
                              const void *act);

virtual int handle_close (ACE_HANDLE handle,
                            ACE_Reactor_Mask mask);

private:

unsigned int call_count_;

};

// ----------------------------------------------------

Client::Client (void)
: call_count_ (0)
{
}

int
Client::open (void *)
{
//ACE_ASSERT (this->reactor () != 0);

if (this->reactor ()
      && this->reactor ()->register_handler (
         this,
         ACE_Event_Handler::WRITE_MASK) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                     ACE_TEXT ("(%t) %p\n"),
                     ACE_TEXT ("unable to register client handler")),
                      -1);

return 0;
}

int
Client::handle_output (ACE_HANDLE)
{
for (int i = 1; i <= 5; ++i)
    {
      char buffer = { 0 };

      ACE_OS::sprintf (buffer, "test message %d.\n", i);

      ssize_t bytes_sent =
      this->peer ().send (buffer, ACE_OS::strlen (buffer));

      if (bytes_sent == -1)
      {
          if (errno == EWOULDBLOCK)
            return 0;// Flow control kicked in.
          else if (errno == EPIPE)
            {
            ACE_DEBUG ((LM_DEBUG,
                        ACE_TEXT ("(%t) Client::handle_output; server ")
                        ACE_TEXT ("closed handle %d\n"),
                        this->peer ().get_handle ()));
            return -1;
            }
          else
            ACE_ERROR_RETURN ((LM_ERROR,
                               ACE_TEXT ("(%t) %p\n"),
                               ACE_TEXT ("Client::handle_output")),
                              -1);
      }
      else if (bytes_sent == 0)
      return -1;
      else
      ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Sent %s"), buffer));
    }

return 0;
}

int
Client::handle_timeout (const ACE_Time_Value &, const void *)
{
ACE_DEBUG ((LM_INFO,
            ACE_TEXT ("(%t) Expected client timeout occured at: %T\n")));

this->call_count_++;

int status = this->handle_output (this->get_handle ());
if (status == -1 || this->call_count_ > 10)
    {
      if (this->reactor ()->end_reactor_event_loop () == 0)
      ACE_DEBUG ((LM_INFO,
                  ACE_TEXT ("(%t) Successful client reactor shutdown.\n")));
      else
      ACE_ERROR ((LM_ERROR,
                  ACE_TEXT ("(%t) %p\n"),
                  ACE_TEXT ("Failed client reactor shutdown")));

      // Force this service handler to be closed in either case.
      return -1;
    }

return 0;
}

int
Client::handle_close (ACE_HANDLE handle,
                      ACE_Reactor_Mask mask)
{
ACE_DEBUG ((LM_INFO,
            ACE_TEXT ("(%t) Client Svc_Handler closed ")
            ACE_TEXT ("handle <%d> with reactor mask <0x%x>.\n"),
            handle,
            mask));

return 0;
}

// ----------------------------------------------------

Server::Server (void)
: call_count_ (0)
{
}

int
Server::handle_input (ACE_HANDLE /* handle */)
{
char buffer = { 0 };    // Insure a trailing nul
ssize_t bytes_read = 0;

char * const begin = buffer;
char * const end   = buffer + BUFSIZ;

for (char * buf = begin; buf < end; buf += bytes_read)
    {
      // Keep reading until it is no longer possible to do so.
      //
      // This is done since the underlying event demultiplexing
      // mechanism may have a "state change" interface (as opposed to
      // "state monitoring"), in which case a "speculative" read is
      // done.
      bytes_read = this->peer ().recv (buf, end - buf);

      ACE_DEBUG ((LM_DEBUG,
                  ACE_TEXT ("****** bytes_read = %d\n"),
                  bytes_read));

      if (bytes_read == -1)
      {
          if (errno == EWOULDBLOCK)
            {

//               ACE_HEX_DUMP ((LM_DEBUG,
//                              buf,
//                              80,
//                              "BUFFER CONTENTS"));
            if (buf == buffer)
                return 0;
            else
                break;
            }
          else
            ACE_ERROR_RETURN ((LM_ERROR,
                               ACE_TEXT ("(%t) %p\n"),
                               ACE_TEXT ("Server::handle_input")),
                              -1);
      }
      else if (bytes_read == 0)
      return -1;
    }

ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Message received: %s\n"), buffer));

return 0;
}

int
Server::handle_timeout (const ACE_Time_Value &,
                        const void *)
{
ACE_DEBUG ((LM_INFO,
            ACE_TEXT ("(%t) Expected server timeout occured at: %T\n")));

//   if (this->call_count_ == 0
//       && this->handle_input (this->get_handle ()) != 0
//       && errno != EWOULDBLOCK)
//   return -1;

//   ACE_DEBUG ((LM_INFO,
//               "SERVER HANDLE = %d\n",
//               this->get_handle ()));


this->call_count_++;

if (this->call_count_ > 10)
    {
      if (this->reactor ()->end_reactor_event_loop () == 0)
      ACE_DEBUG ((LM_INFO,
                  ACE_TEXT ("(%t) Successful server reactor shutdown.\n")));
      else
      ACE_ERROR ((LM_ERROR,
                  ACE_TEXT ("(%t) %p\n"),
                  ACE_TEXT ("Failed server reactor shutdown")));

      // Force this service handler to be closed in either case.
      return -1;
    }

return 0;
}

int
Server::handle_close (ACE_HANDLE handle,
                      ACE_Reactor_Mask mask)
{
if (this->call_count_ > 4)
    {
      ACE_DEBUG ((LM_INFO,
                  ACE_TEXT ("(%t) Server Svc_Handler closing ")
                  ACE_TEXT ("handle <%d,%d> with reactor mask <0x%x>.\n"),
                  handle,
                  this->get_handle (),
                  mask));

      return this->peer ().close ();
    }

return 0;
}

// ----------------------------------------------------

typedef ACE_Acceptor<Server, ACE_SOCK_ACCEPTOR>   ACCEPTOR;
typedef ACE_Connector<Client, ACE_SOCK_CONNECTOR> CONNECTOR;

// ----------------------------------------------------

class TestAcceptor : public ACCEPTOR
{
public:

virtual int accept_svc_handler (Server * handler)
{
    int result = this->ACCEPTOR::accept_svc_handler (handler);

    if (result != 0)
      {
      if (errno != EWOULDBLOCK)
          ACE_ERROR ((LM_ERROR,
                      ACE_TEXT ("(%t) %p\n"),
                      ACE_TEXT ("Unable to accept connection")));

      return result;
      }

    ACE_DEBUG ((LM_DEBUG,
                ACE_TEXT ("(%t) Accepted connection.")
                ACE_TEXT ("Stream handle: <%d>\n"),
                handler->get_handle ()));

//   if (handler->handle_input (handler->get_handle ()) == -1
//         && errno != EWOULDBLOCK)
//       return -1;

// #if 0
    ACE_Time_Value delay (2, 0);
    ACE_Time_Value restart (2, 0);
    if (handler->reactor ()->schedule_timer (handler,
                                             0,
                                             delay,
                                             restart) == -1)
      {
      ACE_ERROR_RETURN ((LM_ERROR,
                           ACE_TEXT ("(%t) %p\n"),
                           ACE_TEXT ("Unable to schedule server side ")
                           ACE_TEXT ("timer in ACE_Dev_Poll_Reactor")),
                        -1);
      }
// #endif/* 0 */

    return result;
}

};

// ----------------------------------------------------

class TestConnector : public CONNECTOR
{
public:

virtual int connect_svc_handler (
    CONNECTOR::handler_type *& handler,
    const CONNECTOR::addr_type &remote_addr,
    ACE_Time_Value *timeout,
    const CONNECTOR::addr_type &local_addr,
    int reuse_addr,
    int flags,
    int perms)
{
    const int result = this->CONNECTOR::connect_svc_handler (handler,
                                                             remote_addr,
                                                             timeout,
                                                             local_addr,
                                                             reuse_addr,
                                                             flags,
                                                             perms);

    if (result != 0)
      return result;

    ACE_TCHAR hostname;
    if (remote_addr.get_host_name (hostname,
                                 sizeof (hostname)) != 0)
      {
      ACE_ERROR_RETURN ((LM_ERROR,
                           ACE_TEXT ("(%t) %p\n"),
                           ACE_TEXT ("Unable to retrieve hostname")),
                        -1);
      }

    ACE_DEBUG ((LM_DEBUG,
                ACE_TEXT ("(%t) Connected to <%s:%d>.\n"),
                hostname,
                (int) remote_addr.get_port_number ()));

// #if 0
    ACE_Time_Value delay (4, 0);
    ACE_Time_Value restart (3, 0);
    if (handler->reactor ()->schedule_timer (handler,
                                             0,
                                             delay,
                                             restart) == -1)
      {
      ACE_ERROR_RETURN ((LM_ERROR,
                           ACE_TEXT ("(%t) %p\n"),
                           ACE_TEXT ("Unable to schedule client side ")
                           ACE_TEXT ("timer in ACE_Dev_Poll_Reactor")),
                        -1);
      }
// #endif/* 0 */

    return result;
}

virtual int connect_svc_handler (
    CONNECTOR::handler_type *& handler,
    CONNECTOR::handler_type *& sh_copy,
    const CONNECTOR::addr_type &remote_addr,
    ACE_Time_Value *timeout,
    const CONNECTOR::addr_type &local_addr,
    int reuse_addr,
    int flags,
    int perms) {
    sh_copy = handler;
    return this->connect_svc_handler (handler, remote_addr, timeout,
                                    local_addr, reuse_addr, flags,
                                    perms);
}
};

// ----------------------------------------------------

ACE_THR_FUNC_RETURN
server_worker (void *p)
{
const unsigned short port = *(static_cast<unsigned short *> (p));

ACE_INET_Addr addr;

if (addr.set (port, INADDR_LOOPBACK) != 0)
    {
      ACE_ERROR ((LM_ERROR,
                  ACE_TEXT ("(%t) %p\n"),
                  ACE_TEXT ("server_worker - ACE_INET_Addr::set")));

      return (void *) -1;
    }

ACE_Dev_Poll_Reactor dp_reactor;
dp_reactor.restart (1);   // Restart on EINTR
ACE_Reactor reactor (&dp_reactor);

TestAcceptor server;

int flags = 0;
ACE_SET_BITS (flags, ACE_NONBLOCK);// Enable non-blocking in the
                                       // Svc_Handlers.

if (server.open (addr, &reactor, flags) != 0)
    {
      ACE_ERROR ((LM_ERROR,
                  ACE_TEXT ("(%t) %p\n"),
                  ACE_TEXT ("Unable to open server service handler")));

      return (void *) -1;
    }

if (reactor.run_reactor_event_loop () != 0)
    {
      ACE_ERROR ((LM_ERROR,
                  ACE_TEXT ("(%t) %p\n"),
                  ACE_TEXT ("Error when running server ")
                  ACE_TEXT ("reactor event loop")));

      return (void *) -1;
    }

ACE_DEBUG ((LM_INFO,
            ACE_TEXT ("(%t) Reactor event loop finished ")
            ACE_TEXT ("successfully.\n")));

return 0;
}

// ----------------------------------------------------

// struct server_arg
// {
//   unsigned short port;

//   ACE_Condition<ACE_SYNCH_MUTEX> * cv;
// };

// ----------------------------------------------------

int
run_main (int, ACE_TCHAR *[])
{
ACE_START_TEST (ACE_TEXT ("Dev_Poll_Reactor_Test"));

// Make sure we ignore SIGPIPE
sigset_t sigsetNew;
sigset_t sigsetOld;
ACE_OS::sigemptyset (sigsetNew);
ACE_OS::sigaddset (sigsetNew, SIGPIPE);
ACE_OS::sigprocmask (SIG_BLOCK, sigsetNew, sigsetOld);

ACE_Dev_Poll_Reactor dp_reactor;
dp_reactor.restart (1);          // Restart on EINTR
ACE_Reactor reactor (&dp_reactor);

TestConnector client;

int flags = 0;
ACE_SET_BITS (flags, ACE_NONBLOCK);// Enable non-blocking in the
                                       // Svc_Handlers.

if (client.open (&reactor, flags) != 0)
    ACE_ERROR_RETURN ((LM_ERROR,
                     ACE_TEXT ("(%t) %p\n"),
                     ACE_TEXT ("Unable to open client service handler")),
                      -1);

//   ACE_SYNCH_MUTEX mutex;
//   ACE_Condition<ACE_SYNCH_MUTEX> cv (mutex);

//   server_arg arg;
//   arg.port = 54678;// Port the server will listen on.
//   arg.cv = &cv;

unsigned short port = 54678;

if (ACE_Thread_Manager::instance ()->spawn (server_worker, &port) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                     ACE_TEXT ("(%t) %p\n"),
                     ACE_TEXT ("Unable to spawn server thread")),
                      -1);

ACE_OS::sleep (5);// Wait for the listening endpoint to be set up.

ACE_INET_Addr addr;
if (addr.set (port, INADDR_LOOPBACK) != 0)
    ACE_ERROR_RETURN ((LM_ERROR,
                     ACE_TEXT ("(%t) %p\n"),
                     ACE_TEXT ("ACE_INET_Addr::set")),
                      -1);

Client *client_handler = 0;

if (client.connect (client_handler, addr) != 0)
    ACE_ERROR_RETURN ((LM_ERROR,
                     ACE_TEXT ("(%t) %p\n"),
                     ACE_TEXT ("Unable to connect to server")),
                      -1);

if (reactor.run_reactor_event_loop () != 0)
    ACE_ERROR_RETURN ((LM_ERROR,
                     ACE_TEXT ("(%t) %p\n"),
                     ACE_TEXT ("Error when running client ")
                     ACE_TEXT ("reactor event loop")),
                      -1);

if (ACE_Thread_Manager::instance ()->wait () != 0)
    ACE_ERROR_RETURN ((LM_ERROR,
                     ACE_TEXT ("(%t) %p\n"),
                     ACE_TEXT ("Error waiting for threads to complete")),
                      -1);

ACE_END_TEST;

return 0;
}


#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)

template class ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>;
template class ACE_Acceptor<Server, ACE_SOCK_ACCEPTOR>;
template class ACE_Connector_Base<Client>;
template class ACE_Connector<Client, ACE_SOCK_CONNECTOR>;
template class ACE_NonBlocking_Connect_Handler<Client>;

#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)

#pragma instantiate ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>
#pragma instantiate ACE_Acceptor<Server, ACE_SOCK_ACCEPTOR>
#pragma instantiate ACE_Connector_Base<Client>
#pragma instantiate ACE_Connector<Client, ACE_SOCK_CONNECTOR>
#pragma instantiate ACE_NonBlocking_Connect_Handler<Client>

#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */

#else

int
run_main (int, ACE_TCHAR *[])
{
ACE_START_TEST (ACE_TEXT ("Dev_Poll_Reactor_Test"));
ACE_ERROR ((LM_INFO,
            ACE_TEXT ("Dev Poll and Event Poll are not supported ")
            ACE_TEXT ("on this platform\n")));
ACE_END_TEST;
return 0;
}

#endif/* ACE_HAS_DEV_POLL || ACE_HAS_EVENT_POLL */

peakzhang 发表于 2007-12-24 23:35:00

echo 4096>max_fd试试。

demon 发表于 2008-1-13 16:55:14

系统默认handle是1024...,需要改下。
页: [1]
查看完整版本: 我的Dev的Reactor与普通的连接数没区别