找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 6057|回复: 2

proactor 多线程问题

[复制链接]
发表于 2009-6-16 17:24:07 | 显示全部楼层 |阅读模式
小弟是ACE初学者,看完两本教材和一些网上的帖子,自己写了个基于proactor框架的服务器,代码编译通过,如下:
//////////////////////////////////
server.h

class Server:public ACE_Service_Handler
{
public:
  Server (void);
  ~Server (void);
virtual void open (ACE_HANDLE handle,
                ACE_Message_Block &message_block);
protected:
  virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
   // This is called when asynchronous writes to the socket complete
  virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
  // This is called when asynchronous reads from the socket complete
  
private:
  int init_read_stream(void);

int initWriteStream(ACE_Message_Block &mb ,size_t nBytes);

  ACE_Asynch_Read_Stream rs_;

  ACE_Asynch_Write_Stream ws_;

};
////////////////////////////////////////////////////////////////////////////////
Server.cpp

class LogicThread:public ACE_Task<ACE_MT_SYNCH>
{
public:
enum{POOL_SIZE=5};
LogicThread(){}
int open()
{
  return this->activate(THR_NEW_LWP,POOL_SIZE);
}
int close()
{
        ACE_Message_Block * hangup;
        for(int i=0; i<POOL_SIZE; i++)
  {
            ACE_NEW_RETURN (hangup, ACE_Message_Block(0, ACE_Message_Block::MB_HANGUP), -1);
            this->putq (hangup);
  }
  this->wait ();
        return 0;
}
virtual int svc(void)
{
  while(1)
  {
   ACE_Message_Block *mb=NULL;
   if(this->getq(mb)==-1)
   {
    ACE_ERROR_RETURN ((LM_ERROR,ACE_TEXT("%p\n"), ACE_TEXT("getq")), -1);
    break;
   }
   if (mb->msg_type () == ACE_Message_Block::MB_HANGUP)
   {
                mb->release ();
                break;
   }
   process_message(mb);     //////////消息处理
  }
  return 0;
}
private:
void process_message (ACE_Message_Block *mb)
{
           //消息解析 略
          //根据解析的结果发送数据给客户端实现响应。
}
};
Server::Server (void){}
Server::~Server (void)
{
   if (this->handle () != ACE_INVALID_HANDLE)
     ACE_OS::closesocket (this->handle ());
}
void
Server::open (ACE_HANDLE h,ACE_Message_Block &message_block)
{
  this->handle(h);
  if (this->rs_.open (*this) != 0 || this->ws_.open (*this) != 0 )
  {
            delete this;
            return;
  }
      if(this->init_read_stream() != 0){
            return;
        }

}
void
Server::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
{
ACE_Message_Block &mb = result.message_block();
if (!result.success()||result.bytes_transferred()==0)
  {
   mb.release();
   delete this;
  }
LogicThread pool;
pool.putq(&mb);
}
int Server::init_read_stream(void)
{
ACE_Message_Block *mb=0;
ACE_NEW_RETURN(mb,ACE_Message_Block(1024),-1);
if(rs_.read(*mb,mb->space())==-1)
{
  ACE_ERROR_RETURN ((LM_ERROR,
   "%p\n",
   "ACE_Asynch_Read_Stream::read"),
   -1);
}
int Server:: initWriteStream(ACE_Message_Block &mb ,size_t nBytes)
{
  if (this->ws_.write (mb , nBytes ) == -1)
  {
   mb.release ();
   ACE_ERROR_RETURN((LM_ERROR, "%p\n", "ACE_Asynch_Write_File::write"), -1);
   return -1;
  }
  return 0;
}
return 0;
}
void
Server::handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
{
     ACE_DEBUG ((LM_DEBUG, ACE_TEXT("Write completely\n")));
        //释放消息
        result.message_block().release();
}
class NetThread : public ACE_Task_Base
{
public:
       int open()
       {
              return this->activate();
       }
       int close()
       {
              ACE_Proactor::instance()->proactor_end_event_loop();
              this->wait();
              return 0;
       }
       virtual int svc()
       {
              ACE_INET_Addr listen_addr(3000);
               ACE_Asynch_Acceptor<Server> aio_acceptor;
              if (0 != aio_acceptor.open (listen_addr, 0, 0, 5, 1, 0, 0))
                     ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("acceptor open")), 1);
              ACE_Proactor::instance ()->proactor_run_event_loop ();
              ACE_DEBUG ((LM_DEBUG, ACE_TEXT("Thread ended\n")));
              return 0;
       }
};
int ACE_TMAIN (int, ACE_TCHAR *[])
{
       NetThread net_thread;
    LogicThread logic_thread;
       net_thread.open();
     logic_thread.open();
       int x;
       std::cin >> x;
    logic_thread.close();
       net_thread.close();
       return 0;
}


线程NetThread和线程池LogicThread实现网络IO和逻辑处理的分离,在handle_read_stream()中,IO线程接收到的数据使用putq()方法将该数据包发送到逻辑线程的消息队列中,逻辑线程接收消息并处理。 请问各位高人,我的这种架构思想没错吧?在逻辑线程收到消息以后在process_message()函数中进行处理,根据不同的结果进行不同的响应。但是有个问题难倒我了,如果向客户端发送响应,怎么调用initWriteStream()发送呢,我该怎么写:'( ,谢谢赐教!其他地方有什么不对的,也可以提醒下我!:)
发表于 2009-6-16 18:28:11 | 显示全部楼层
这个问题有点复杂了。
一句两句说不清,需要参考一下C++网络编程卷2,
在这节:
6.3 The ACE_Task_Class

Sidebar 46: Closing TP_Logging_Handlers Concurrently
A challenge with thread pool servers is closing objects that can be accessed concurrently by multiple threads. In our thread pool logging server, TP_Logging_Handler pointers are used by TP_LOGGING_TASK threads. These service threads are separate from the thread running the reactor event loop that's driving callbacks to TP_Logging_Handler. We must therefore ensure that a TP_Logging_Handler object isn't destroyed while there are still pointers to it in use by TP_LOGGING_TASK.

When a logging client closes a connection, TP_Logging_Handler::handle_input() (page 197) returns -1. The reactor then calls the handler's handle_close() method, which ordinarily cleans up resources and deletes the handler. Unfortunately, that would wreak havoc if one or more pointers to that handler were still enqueued or being used by threads in the TP_LOGGING_TASK pool. We therefore use a reference counting protocol to ensure the handler isn't destroyed while a pointer to it is still in use. The UML activity diagram below illustrates the behavior this protocol enforces:



The protocol counts how many times a handler resides in the TP_LOGGING_TASK singleton's message queue. If the count is greater than 0 when the logging client socket is closed, TP_Logging_Handler::handle_close() can't yet destroy the handler. Later, as the TP_LOGGING_TASK processes each log record, the handler's reference count is decremented. When the count reaches 0, the handler can finish processing the close request that was deferred earlier.
发表于 2009-6-16 19:08:58 | 显示全部楼层
呵呵,首先楼主想往socket里写消息,肯定得需要该socket对应ACE_Service_Handler的指针呀。
至于如何获得一个ACE_Service_Handler有效地指针或者引用呢,顺着winston老大的提示看吧。
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-11-23 16:07 , Processed in 0.016502 second(s), 5 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表