peakzhang 发表于 2008-9-10 22:40:18

Reactor框架写的一个简单udp服务器框架的问题

我用ACE的Reactor框架写的一个简单udp服务器框架如下:我对其进行压力测试,结果发现问题很大。
当每秒接受的数据包到11000个(每个包1400字节,总共42K字节)左右时,服务器耗尽了所有的cpu,但内存占用非常少不到5%。pc是赛阳2.6,512M内存,系统为redhead9.0,2.6内核。各位看看问题出在什么地方!
以下内容为程序代码:


#include "ace/OS_main.h"
#include "ace/OS_NS_string.h"
#include "ace/OS_NS_unistd.h"
#include "ace/Reactor.h"
#include "ace/Process.h"
#include "ace/SOCK_Dgram.h"
#include "ace/INET_Addr.h"
#include "ace/Log_Msg.h"
#include "ace/Thread_Manager.h"
#include "ace/Task_T.h"
#include "ace/Shared_Memory_Pool.h"
//#include "ace/Process_Mutex.h"
#include "ace/Malloc.h"
#include "ace/Malloc_T.h"
#include "ace/Shared_Memory_MM.h"

#include "client.h"
#define SERVER_PORT 10101
#define POOLNAME "mypoll"
typedef ACE_Malloc<ACE_SHARED_MEMORY_POOL, ACE_SYNCH_MUTEX> myalloc;
static const size_t TASK_THREAD_POOL_SIZE = 5;
myalloc shm_malloc(POOLNAME);
static int packages = 0;
static int total_bytes = 0;
class Task_Worker: public ACE_Task<ACE_MT_SYNCH>
{
    public:
      virtual int svc(void)
      {
            while(1)
            {
                ACE_Message_Block *mb = NULL;
                if(this->getq(mb) == -1)
                {
                  continue;
                }
                process_task(mb);
            }
            return 0;
      }
    private:
      void process_task(ACE_Message_Block *mb)
      {
            PKG_HEADER header;
            memcpy(&header, mb->rd_ptr(), sizeof(PKG_HEADER));
            header.magic_num = ntohl(header.magic_num);
            header.index = ntohl(header.index);
            header.check_sum = ntohl(header.check_sum);
            //printf("magic[%d]index[%d]checksum[%d]\n", header.magic_num, header.index, header.check_sum);
            mb->release();
      }
};
class Task_Manager: public ACE_Task<ACE_MT_SYNCH>
{
    public:
      virtual int svc(void)
      {
            Task_Worker task_tp;
            task_tp.activate(THR_NEW_LWP | THR_JOINABLE, TASK_THREAD_POOL_SIZE);
            while(1)
            {
                ACE_Message_Block *mb = NULL;
                if(this->getq(mb) < 0)
                {
                  task_tp.msg_queue()->deactivate();
                  task_tp.wait();
                }
                task_tp.putq(mb);
            }
            return 0;
      }
};
class Dgram_Endpoint : public ACE_Event_Handler
{
public:
Task_Manager task_mgr;
Dgram_Endpoint (const ACE_INET_Addr &local_addr);
virtual ACE_HANDLE get_handle (void) const;
virtual int handle_input (ACE_HANDLE handle);
virtual int handle_timeout (const ACE_Time_Value & tv,const void *arg = 0);
virtual int handle_close (ACE_HANDLE handle,ACE_Reactor_Mask close_mask);
virtual int handle_signal (int signum, siginfo_t*, ucontext_t*);
int send (const char *buf, size_t len, const ACE_INET_Addr &);
private:
ACE_SOCK_Dgram endpoint_;
};

int Dgram_Endpoint::send (const char *buf,size_t len,const ACE_INET_Addr &addr)
{
return this->endpoint_.send (buf, len, addr);
}
Dgram_Endpoint::Dgram_Endpoint (const ACE_INET_Addr &local_addr)
: endpoint_(local_addr)
{
    task_mgr.activate();
}

ACE_HANDLE Dgram_Endpoint::get_handle (void) const
{
return this->endpoint_.get_handle();
}

int Dgram_Endpoint::handle_close (ACE_HANDLE handle,ACE_Reactor_Mask)
{
    ACE_DEBUG((LM_DEBUG, "************handle_close***********\n"));
    ACE_UNUSED_ARG (handle);
    this->endpoint_.close();
    delete this;
    return 0;
}
static int is_heart_package(char *buf, size_t size)
{
    unsigned long data;
    if (size<4)
      return -1;
    memcpy(&data, buf, sizeof(data));
    data = ntohl(data);
    if (data == 0x12345678)
    {
      //printf("yes it's heart beat package\n");
      return 0;
    }
    else
      return -1;
}
int Dgram_Endpoint::handle_input (ACE_HANDLE)
{
char buf;
ACE_INET_Addr from_addr;
char address;
ssize_t nbytes = this->endpoint_.recv (buf, sizeof(buf), from_addr);
packages++;
total_bytes += nbytes;

#if 0
if (nbytes == -1)
    ACE_ERROR ((LM_ERROR,"%p","handle_input error\n"));
else
    ACE_DEBUG ((LM_DEBUG, "[%d]bytes from[%s] received:%s\n", nbytes, address, buf));
#endif
if (0 == is_heart_package(buf, nbytes))
{
      return 0;
}
ACE_Message_Block *mb = NULL;
//mb = shm_malloc.malloc(sizeof(ACE_Message_Block));
ACE_NEW_RETURN(mb, ACE_Message_Block(nbytes, ACE_Message_Block::MB_DATA, 0, buf), -1);
mb->wr_ptr(nbytes);
this->task_mgr.putq(mb);
return 0;
}
int Dgram_Endpoint::handle_timeout (const ACE_Time_Value &,const void *)
{
ACE_DEBUG ((LM_DEBUG,"[%d] packages [%d]K bytes are received per secondt\n", packages, total_bytes/1024 ));
packages = 0;
total_bytes = 0;
return 0;
}
int Dgram_Endpoint::handle_signal (int signum, siginfo_t* siginfo, ucontext_t* context)
{
    return ACE_Event_Handler::handle_signal (signum, siginfo, context);
}
int ACE_TMAIN (int argc, ACE_TCHAR *argv[])
{
    ACE_INET_Addr local_addr(SERVER_PORT);
    Dgram_Endpoint *endpoint;
    ACE_NEW_RETURN (endpoint,Dgram_Endpoint (local_addr),-1);
    if (ACE_Reactor::instance ()->register_handler(endpoint,ACE_Event_Handler::READ_MASK) == -1)
    {
      ACE_ERROR_RETURN ((LM_ERROR,"ACE_Reactor::register_handler"),-1);
    }
    ACE_Time_Value time_out(1);
    ACE_Reactor::instance()->schedule_timer(endpoint, (void *)"time out",ACE_Time_Value::zero, time_out);
#if 0
    if (-1 == ACE_Reactor::instance()->register_handler(SIGINT, endpoint))
    {
      ACE_ERROR_RETURN((LM_ERROR, "fail to register SIGINT handler"), -1);
    }
#endif
    ACE_Reactor::instance()->run_event_loop();
return 0;
}

peakzhang 发表于 2008-9-10 22:40:27

除非用ACE_Dev_Poll_Reactor,各个平台的服务器端,都不推荐用Reactor框架。效率不高,不是ACE效率不高,而是各个操作系统实现中,这种模式下(select)效率很差。除非不用,否则没办法。

如果你要用Reactor,推荐你用ACE_Dev_Poll_Reactor,很简单,初始化的时候,替换默认的select_reactor即可。你可以试试EPoll的效率。或者如果能用AIO,试试Proactor(兼容性不够好)。

至于如何编译使用ACE_Dev_Poll_Reactor,本站有说明,照着做就可以了。
页: [1]
查看完整版本: Reactor框架写的一个简单udp服务器框架的问题