找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 5117|回复: 1

ACE笔记(6) Proactor机制下的异步SOCKET开发

[复制链接]
发表于 2008-7-15 23:53:41 | 显示全部楼层 |阅读模式
from [url=http://blog.csdn.net/anyjack/]http://blog.csdn.net/anyjack/[/url]
ACE笔记(6) Proactor机制下的异步SOCKET开发
Proactor机制和reactor机制的不同
1、在reactor机制下,所有I/O请求是同步的,即接到信号请求后,立即执行信号处理,
执行完后才开始继续监听信号请求,其接收信号请求的机制是被动的
而在Proactor机制下,I/O请求是异步的,即接到信号请求后,不立即执行信号处理(而是在莫个时刻执行该处理),
然后再继续监听信号请求,其接收信号请求的机制是主动的
2、要想符合Proactor机制的信号处理,需要从 ACE_Service_Handler 派生,而reactor机制信号处理类要从ACE_Event_Handler派生
ACE_Service_Handler 中以定义的常见回调方法:
   /// 异步读完成时会被调用
  virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
  /// 在UDP SOCKET中,当异步写完成时会被调用
  virtual void handle_write_dgram (const ACE_Asynch_Write_Dgram::Result &result);
  ///  在UDP SOCKET中,当异步读完成时会被调用
  virtual void handle_read_dgram (const ACE_Asynch_Read_Dgram::Result &result);
  /// 当异步写完成时会被调用
  virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
  /// 当异步读文件完成时会被调用
  virtual void handle_read_file (const ACE_Asynch_Read_File::Result &result);
  /// 当异步写文件完成时会被调用
  virtual void handle_write_file (const ACE_Asynch_Write_File::Result &result);
  ///当异步接收完成时会被调用
  virtual void handle_accept (const ACE_Asynch_Accept::Result &result);
  ///当异步连接完成时会被调用
  virtual void handle_connect (const ACE_Asynch_Connect::Result &result);
  ///当异步传输文件完成时会被调用
  virtual void handle_transmit_file (const ACE_Asynch_Transmit_File::Result &result);
  ///超时时会被调用
  virtual void handle_time_out (const ACE_Time_Value &tv,
                                const void *act = 0);
ACE_Service_Handler 类OPEN方法使用注意:
方法定义:open (ACE_HANDLE handle,ACE_Message_Block &message_block)
当客户端连接时会触发此方法
message_block 参数附带了伴随客户端连接发送过来的消息块
所以在实现OPEN方法中,要注意判断message_block 参数是否附带了消息,如果附带了,如果不想改变现有的事件数据统一处理模式,则需要自己模拟一个读完成动作,如下:
   if (message_block.length () != 0)
    {
      // 复制消息块(引用)
      ACE_Message_Block &duplicate =*message_block.duplicate ();
      // 伪装一个事件读完成对象
      ACE_Asynch_Read_Stream_Result_Impl *fake_result =
        ACE_Proactor::instance ()->create_asynch_read_stream_result (*this,
                                                                     this->handle_,
                                                                     duplicate,
                                                                     initial_read_size,
                                                                     0,
                                                                     ACE_INVALID_HANDLE,
                                                                     0,
                                                                     0);
   //移动写指针到未写入的位置,因为读完成动作中会自动移动写指针   
      size_t bytes_transferred = message_block.length ();
      duplicate.wr_ptr (duplicate.wr_ptr () - bytes_transferred);
      //发出事件完成回调命令
      fake_result->complete (message_block.length (),
                             1,
                             0);
      // 删除伪装的对象
      delete fake_result;
    }
ACE_Asynch_Read_Stream 类常见方法
open 方法:初始化读操作
read 方法:读操作,把数据存放在一个 ACE_Message_Block 数据结构上,该结构会自动移动写指针(wr_ptr)
ACE_Asynch_Write_Stream 类常见方法
open 方法:初始化写操作
write方法:写操作,会把存在 ACE_Message_Block 数据结构上写入指定的handle中,该结构会自动移动读指针(rd_ptr)
ACE_Message_Block 类常见方法
构造函数:ACE_Message_Block (长度)
rd_ptr():返回读指针
wr_ptr(): 返回写指针
release():释放内存
init(data,len):分配内存
wr_ptr(len):把写指针向前移动LEN个位置
wr_ptr(×):把写指针指向当前指针
duplicate():复制当前消息块

ACE_Asynch_Read_Stream::Result 类常见方法、属性
用于在回调完成时获得相关完成信息的类
bytes_to_read ():想读取的字节数
bytes_transferred ():有多少个字节被接收
handle ():作用在那个handle上
success():操作是否成功
message_block ():返回消息块

下面附带一个异步I/O处理的例子(例子来源于ACE自带例子,稍有改动),该例子用来异步接收客户请求,并把客户请求的信息显示在控制台上

#include "ace/OS_main.h"
#include "ace/Service_Config.h"
#include "ace/Proactor.h"
#include "ace/Asynch_IO.h"
#include "ace/Asynch_IO_Impl.h"
#include "ace/Asynch_Acceptor.h"
#include "ace/INET_Addr.h"
#include "ace/SOCK_Connector.h"
#include "ace/SOCK_Acceptor.h"
#include "ace/SOCK_Stream.h"
#include "ace/Message_Block.h"
#include "ace/Get_Opt.h"
#include "ace/OS_NS_sys_stat.h"

static u_short port = ACE_DEFAULT_SERVER_PORT;
static int done = 0;
static int initial_read_size = BUFSIZ;
class Receiver : public ACE_Service_Handler
{
public:
  Receiver (void);
  ~Receiver (void);
  virtual void open (ACE_HANDLE handle,
                     ACE_Message_Block &message_block);
protected:
  virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
  virtual void handle_write_file (const ACE_Asynch_Write_File::Result &result);
private:
  int initiate_read_stream (void);
  ACE_Asynch_Read_Stream rs_;
  ACE_HANDLE handle_;
  // Handle for IO to remote peer.
};
Receiver::Receiver (void)
  : dump_file_ (ACE_INVALID_HANDLE),
    handle_ (ACE_INVALID_HANDLE)
{}
Receiver::~Receiver (void){}
void
Receiver::open (ACE_HANDLE handle,
                ACE_Message_Block &message_block)
{
  ACE_DEBUG ((LM_DEBUG,
              "%N:%l:Receiver::open called\n"));
  this->handle_ = handle;
  // 打开SOCKET读取流
  if (this->rs_.open (*this, this->handle_) == -1)
    {
      ACE_ERROR ((LM_ERROR,
                  "%p\n",
                  "ACE_Asynch_Read_Stream::open"));
      return;
    }
   if (message_block.length () != 0)
    {
      // 复制消息块(引用)
      ACE_Message_Block &duplicate =*message_block.duplicate ();
      // 伪装一个事件读完成对象
      ACE_Asynch_Read_Stream_Result_Impl *fake_result =
        ACE_Proactor::instance ()->create_asynch_read_stream_result (*this,
                                                                     this->handle_,
                                                                     duplicate,
                                                                     initial_read_size,
                                                                     0,
                                                                     ACE_INVALID_HANDLE,
                                                                     0,
                                                                     0);
   //移动写指针到未写入的位置,因为读完成动作中会自动移动写指针   
      size_t bytes_transferred = message_block.length ();
      duplicate.wr_ptr (duplicate.wr_ptr () - bytes_transferred);
      //发出事件完成回调命令
      fake_result->complete (message_block.length (),
                             1,
                             0);
      // 删除伪装的对象
      delete fake_result;
    }
  else
    // 没有附带数据,则开始一个新的读操作
    if (this->initiate_read_stream () == -1)
      return;
}
int
Receiver::initiate_read_stream (void)
{
  ACE_Message_Block *mb = 0;
  ACE_NEW_RETURN (mb,
                  ACE_Message_Block (BUFSIZ + 1),
                  -1);
  // 开始读操作
  if (this->rs_.read (*mb,
                      mb->size () - 1) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       "%p\n",
                       "ACE_Asynch_Read_Stream::read"),
                      -1);
  return 0;
}
void
Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
{
//开始读操作
  ACE_DEBUG ((LM_DEBUG,
              "handle_read_stream called\n"));
//显示读取的信息              
  result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0';
  ACE_DEBUG ((LM_DEBUG, "********************\n"));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message:", result.message_block ().rd_ptr ()));
  ACE_DEBUG ((LM_DEBUG, "********************\n"));
  if (result.success () && result.bytes_transferred () != 0)
    {
     result.message_block ().release();
      // 如果还存在未读取数据,则继续读取
      if (this->initiate_read_stream () == -1)
        return;
    }
  else
    {
     //不存在,则释放消息块并关闭SOCKET连接
      ACE_DEBUG ((LM_DEBUG,
                  "Receiver completed\n"));
    result.message_block ().release ();
    done = 0;
   ACE_OS::closesocket (this->handle_);
    }
}
int
ACE_TMAIN (int argc, ACE_TCHAR *argv[])
{
  ACE_Asynch_Acceptor<Receiver> acceptor;
//打开SOCKET端口
     if (acceptor.open (ACE_INET_Addr (port),
                        initial_read_size,
                        1) == -1)
       return -1;
  int success = 1;
  while (success > 0  && !done)
    // 处理和分发事件
    success = ACE_Proactor::instance ()->handle_events ();
  return 0;
}
//下面的代码时帮助编译器解析上面的模板
  1. #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
  2. template class ACE_Asynch_Acceptor<Receiver>;
  3. #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
  4. #pragma instantiate ACE_Asynch_Acceptor<Receiver>
  5. #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
  6.   
复制代码
发表于 2009-3-20 10:23:25 | 显示全部楼层
这代码是从CSDN上拷贝下来的,只接受,但是说道超时,确是只字未提

哎!
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-11-23 19:53 , Processed in 0.019052 second(s), 5 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表