找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 5677|回复: 1

ACE前摄器Proactor模式

[复制链接]
发表于 2012-2-17 13:42:17 | 显示全部楼层 |阅读模式

当 OS 平台支持异步操作时,一种高效而方便的实现高性能 Web 服务器的方法是使用前摄式事件分派。使用前摄式事件分派模型设计的 Web 服务器通过一或多个线程控制来处理异步操作的完成。这样,通过集成完成事件多路分离(completion event demultiplexing)和事件处理器分派,前摄器模式简化了异步的 Web 服务器。

异步的 Web 服务器将这样来利用前摄器模式:首先让 Web 服务器向 OS 发出异步操作,并将回调方法登记到 Completion Dispatcher(完成分派器),后者将在操作完成时通知 Web 服务器。于是 OS 代表 Web 服务器执行操作,并随即在一个周知的地方将结果排队。Completion Dispatcher 负责使完成通知出队,并执行适当的、含有应用特有的 Web 服务器代码的回调。

使用前摄器模式的主要优点是可以启动多个并发操作,并可并行运行,而不要求应用必须拥有多个线程。操作被应用异步地启动,它们在 OS 的 I/O 子系统中运行直到完成。发起操作的线程现在可以服务 另外的请求了。

在ACE中,可以通过ACE_Proactor实现前摄器模式。实现方式如下。

1。创建服务处理器:

Proactor框架中服务处理器均派生自ACE_Service_Handler,它和Reactor框架的事件处理器非常类似。当发生IO操作完成事件时,会触发相应的事件完成会调函数。

2。实现服务处理器IO操作

Proactor框架中所有的IO操作都由相应的异步操作类来完成,这些异步操作类都继承自ACE_Asynch_Operation。常用的有以下几种。


  • ACE_Asynch_Read_Stream, 提供从TCP/IP socket连接中进行异步读操作.

  • ACE_Asynch_Write_Stream, 提供从TCP/IP socket连接中进行异步写操作.

使用这些操作类的一般方式如下:


  • 初始化
    将相关的操作注册到服务处理器中,一般可通过调用其open方法实现。
  • 发出IO操作
    发出异步IO操作请求,该操作不会阻塞,具体的IO操作过程由操作系统异步完成。
  • IO操作完成回调处理
    异步IO操作完成后,OS会触发服务处理器中的相应回调函数,可通过该函数的ACE_Asynch_Result参数获取相应的返回值。

3。使用连接器或接受器和远端进行连接

ACE为Proactor框架提供了两个工厂类来建立TCP/IP连接。


  • ACE_Asynch_Acceptor, 用于被动地建立连接

  • ACE_Asynch_Connector 用于主动地建立连接

当远端连接建立时,连接器或接受器便会创建相应的服务处理器,从而可以实现服务处理。

4。启动Proactor事件分发处理

启动事件分发处理只需如下调用:while(true)

  1.         ACE_Proactor::instance
  2. ()->handle_events ();
复制代码

5。程序示例

服务器端:

服务器端简单的实现了一个EchoServer,流程如下:

当客户端建立连接时,首先发出一个异步读的异步请求,当读完成时,将所读的数据打印出来,并发出一个新的异步请求。#include "ace/Message_Queue.h"

  1. #include "ace/Asynch_IO.h"
  2. #include "ace/OS.h"
  3. #include "ace/Proactor.h"
  4. #include "ace/Asynch_Acceptor.h"
  5. class HA_Proactive_Service : public ACE_Service_Handler
  6. {
  7. public:
  8. ~HA_Proactive_Service ()
  9. {
  10. if (this->handle
  11. () != ACE_INVALID_HANDLE)
  12. ACE_OS::closesocket (this->handle ());
  13. }
  14. virtual void open
  15. (ACE_HANDLE h, ACE_Message_Block&)
  16. {
  17.      this->handle (h);
  18.      if (this->reader_.open (*this) != 0 )
  19.      {
  20.          ACE_ERROR
  21. ((LM_ERROR, ACE_TEXT ("%p\n"),
  22.              ACE_TEXT ("HA_Proactive_Service open")));
  23.          delete this;
  24.          return;
  25.      }
  26.      ACE_Message_Block *mb
  27. = new ACE_Message_Block(buffer,1024);
  28.      if (this->reader_.read (*mb, mb->space ()) !=
  29. 0)
  30.      {
  31.          ACE_OS::printf("Begin read
  32. fail\n");
  33.          delete this;
  34.          return;
  35.      }
  36.      return;
  37. }
  38. //异步读完成后会调用此函数
  39. virtual void handle_read_stream
  40. (const ACE_Asynch_Read_Stream::Result
  41. &result)
  42. {
  43.      ACE_Message_Block &mb = result.message_block
  44. ();
  45.      if (!result.success () ||
  46. result.bytes_transferred () == 0)
  47.      {
  48.          mb.release
  49. ();
  50.          delete this;
  51.          return;
  52.      }
  53.      mb.copy("");    //为字符串添加结束标记'\0'
  54.      ACE_OS::printf("rev:\t%s\n",mb.rd_ptr());
  55.      
  56. mb.release();
  57.      ACE_Message_Block *nmb = new ACE_Message_Block(buffer,1024);
  58.      if (this->reader_.read (*nmb, nmb->space ()) !=
  59. 0)
  60.      return;
  61. }
  62. private:
  63. ACE_Asynch_Read_Stream reader_;
  64. char buffer[1024];
  65. };
  66. int main(int argc, char *argv[])
  67. {
  68.     int port=3000;
  69.     ACE_Asynch_Acceptor<HA_Proactive_Service>
  70. acceptor;
  71.    
  72.     if (acceptor.open
  73. (ACE_INET_Addr (port)) == -1)
  74.         return -1;
  75.     while(true)
  76.         ACE_Proactor::instance
  77. ()->handle_events ();
  78.    
  79.     return 0;
  80. }
复制代码

客户端:

客户端代码比较简单,就是每隔1秒钟将当前的系统时间转换为字符串形式通过异步形式发送给服务器,发送完成后,释放时间字符的内存空间。#include "ace/Message_Queue.h"

  1. #include "ace/Asynch_IO.h"
  2. #include "ace/OS.h"
  3. #include "ace/Proactor.h"
  4. #include "ace/Asynch_Connector.h"
  5. class HA_Proactive_Service : public ACE_Service_Handler
  6. {
  7. public:
  8. ~HA_Proactive_Service ()
  9. {
  10. if (this->handle
  11. () != ACE_INVALID_HANDLE)
  12. ACE_OS::closesocket (this->handle ());
  13. }
  14. virtual void open
  15. (ACE_HANDLE h, ACE_Message_Block&)
  16. {
  17.      this->handle (h);
  18.      if (this->writer_.open (*this) != 0 )
  19.      {
  20.          ACE_ERROR
  21. ((LM_ERROR, ACE_TEXT ("%p\n"),
  22.              ACE_TEXT ("HA_Proactive_Service open")));
  23.          delete this;
  24.          return;
  25.      }
  26.      ACE_OS::printf("connceted");
  27.      for(int i=0;i<10;i++)    //每隔秒中发送时间至服务器
  28.      {
  29.          
  30. ACE_OS::sleep(1);
  31.          time_t now =
  32. ACE_OS::gettimeofday().sec();
  33.          char *time = ctime(&now);        //获取当前时间的字符串格式
  34.          ACE_Message_Block *mb = new ACE_Message_Block(100);
  35.          
  36. mb->copy(time);
  37.          if (this->writer_.write(*mb,mb->length())
  38. !=0)
  39.          {
  40.              ACE_OS::printf("Begin read fail\n");
  41.              delete this;
  42.              return;
  43.          }
  44.      }
  45.      return;
  46. }
  47. //异步写完成后会调用此函数
  48. virtual void handle_write_dgram
  49. (const ACE_Asynch_Write_Stream::Result
  50. &result)
  51. {
  52.      ACE_Message_Block &mb = result.message_block
  53. ();
  54.      mb.release();
  55.      return;
  56. }
  57. private:
  58. ACE_Asynch_Write_Stream
  59. writer_;
  60. };
  61. int main(int argc, char *argv[])
  62. {
  63.    
  64.     ACE_INET_Addr
  65. addr(3000,"192.168.1.142");
  66.     HA_Proactive_Service
  67. *client = new HA_Proactive_Service();
  68.     ACE_Asynch_Connector<HA_Proactive_Service>
  69. connector;
  70.    
  71.     connector.open();
  72.     if (connector.connect(addr) == -1)
  73.         return -1;
  74.     while(true)
  75.         ACE_Proactor::instance
  76. ()->handle_events ();
  77.    
  78.     return 0;
  79. }
复制代码

作者:weiqubo 发表于2012-2-16 23:53:44 原文链接

发表于 2012-3-14 21:41:58 | 显示全部楼层
才开始接触ace, 还是有很多地方看不懂啊
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-11-23 17:33 , Processed in 0.015846 second(s), 5 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表