找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 13655|回复: 5

ACE_Reactor类学习

[复制链接]
发表于 2012-2-29 00:25:46 | 显示全部楼层 |阅读模式
ACE_Reactor :
      这是一个事件监听分派中心, 通过ACE_Reactor注册需要监控的事件,当事件发生时,ACE_Reactor就会自动调用注册时指定的控制程序进行处理。
转:
ACE Reactor 框架简化了事件驱动程序的开发,而事件驱动是许多网络化应用的特征。该框架实现Reactor模式,允许事件驱动的应用对源自许多不同事件源的事件作出反应,如I/O句柄,定时器,以及信号。应用重新定义框架所定义的挂钩方法,对其进行分派来处理事件。
一、ACE Reactor 框架的责任:
1、检测来自各种事件源的事件的发生。
2、将事件多路分离给其预先登记的事件处理器。
3、分派给处理器所定义的挂钩方法,从而以一种应用定义的方式处理这些事件。

源代码

[cpp] view plaincopyprint?


  • // $Id: Reactor.cpp 91368 2010-08-16 13:03:34Z mhengstmengel $

  • #include "ace/Reactor.h"

  • #if !defined (ACE_LACKS_ACE_SVCCONF)
  • #  include "ace/Service_Config.h"
  • #endif /* !ACE_LACKS_ACE_SVCCONF */

  • /*
  • * Hook to specialize the includes directly with the concrete
  • * Reactor type, e.g., select, thread pool reactor
  • * known at compile time. This hook results in all the
  • * #defines being commented
  • * out and the concrete header file directly included.
  • */
  • //@@ REACTOR_SPL_COMMENT_INCLUDE_START_HOOK
  • // Only include the headers needed to compile.
  • #if !defined (ACE_WIN32) \
  •       || !defined (ACE_HAS_WINSOCK2) || (ACE_HAS_WINSOCK2 == 0) \
  •       || defined (ACE_USE_SELECT_REACTOR_FOR_REACTOR_IMPL) \
  •       || defined (ACE_USE_TP_REACTOR_FOR_REACTOR_IMPL) \
  •       || defined (ACE_USE_DEV_POLL_REACTOR_FOR_REACTOR_IMPL)
  • #  if defined (ACE_USE_TP_REACTOR_FOR_REACTOR_IMPL)
  • #    include "ace/TP_Reactor.h"
  • #  else
  • #    if defined (ACE_USE_DEV_POLL_REACTOR_FOR_REACTOR_IMPL)
  • #      include "ace/Dev_Poll_Reactor.h"
  • #    else
  • #      include "ace/Select_Reactor.h"
  • #    endif /* ACE_USE_DEV_POLL_REACTOR_FOR_REACTOR_IMPL */
  • #  endif /* ACE_USE_TP_REACTOR_FOR_REACTOR_IMPL */
  • #else /* We are on Win32 and we have winsock and ACE_USE_SELECT_REACTOR_FOR_REACTOR_IMPL is not defined */
  • #  if defined (ACE_USE_MSG_WFMO_REACTOR_FOR_REACTOR_IMPL)
  • #    include "ace/Msg_WFMO_Reactor.h"
  • #  else
  • #    include "ace/WFMO_Reactor.h"
  • #  endif /* ACE_USE_MSG_WFMO_REACTOR_FOR_REACTOR_IMPL */
  • #endif /* !defined (ACE_WIN32) || !defined (ACE_HAS_WINSOCK2) || (ACE_HAS_WINSOCK2 == 0) || defined (ACE_USE_SELECT_REACTOR_FOR_REACTOR_IMPL) */

  • /*
  • * End comment hook.
  • */
  • //@@ REACTOR_SPL_COMMENT_INCLUDE_END_HOOK

  • #include "ace/Static_Object_Lock.h"
  • #include "ace/Framework_Component.h"
  • #include "ace/Guard_T.h"
  • #include "ace/Recursive_Thread_Mutex.h"

  • #if !defined (__ACE_INLINE__)
  •   #include "ace/Reactor.inl"
  • #endif /* __ACE_INLINE__ */

  • ACE_BEGIN_VERSIONED_NAMESPACE_DECL

  • ACE_ALLOC_HOOK_DEFINE(ACE_Reactor)

  • ACE_Reactor::ACE_Reactor (ACE_Reactor_Impl *impl,
  •                           bool delete_implementation)
  •   : implementation_ (0),
  •     delete_implementation_ (delete_implementation)
  • {
  •   this->implementation (impl);

  •   if (this->implementation () == 0)
  •     {
  • /*
  • * Hook to specialize the reactor implementation with the concrete
  • * Reactor implementation known at compile time. This hook will
  • * cause the conditionally defined code to be commented out and
  • * the concrete Reactor directly created.
  • */
  • //@@ REACTOR_SPL_CONSTRUCTOR_COMMENT_HOOK_START
  • #if !defined (ACE_WIN32) \
  •       || !defined (ACE_HAS_WINSOCK2) || (ACE_HAS_WINSOCK2 == 0) \
  •       || defined (ACE_USE_SELECT_REACTOR_FOR_REACTOR_IMPL) \
  •       || defined (ACE_USE_TP_REACTOR_FOR_REACTOR_IMPL) \
  •       || defined (ACE_USE_DEV_POLL_REACTOR_FOR_REACTOR_IMPL)
  • #  if defined (ACE_USE_TP_REACTOR_FOR_REACTOR_IMPL)
  •       ACE_NEW (impl,
  •                ACE_TP_Reactor);
  • #  else
  • #    if defined (ACE_USE_DEV_POLL_REACTOR_FOR_REACTOR_IMPL)
  •       ACE_NEW (impl,
  •                ACE_Dev_Poll_Reactor);
  • #    else
  •       ACE_NEW (impl,
  •                ACE_Select_Reactor);
  • #    endif /* ACE_USE_DEV_POLL_REACTOR_FOR_REACTOR_IMPL */
  • #  endif /* ACE_USE_TP_REACTOR_FOR_REACTOR_IMPL */
  • #else /* We are on Win32 and we have winsock and ACE_USE_SELECT_REACTOR_FOR_REACTOR_IMPL is not defined */
  •   #if defined (ACE_USE_MSG_WFMO_REACTOR_FOR_REACTOR_IMPL)
  •       ACE_NEW (impl,
  •                ACE_Msg_WFMO_Reactor);
  •   #else
  •       ACE_NEW (impl,
  •                ACE_WFMO_Reactor);
  •   #endif /* ACE_USE_MSG_WFMO_REACTOR_FOR_REACTOR_IMPL */
  • #endif /* !defined (ACE_WIN32) || !defined (ACE_HAS_WINSOCK2) || (ACE_HAS_WINSOCK2 == 0) || defined (ACE_USE_SELECT_REACTOR_FOR_REACTOR_IMPL) */

  • /*
  • * End hook.
  • */
  • //@@ REACTOR_SPL_CONSTRUCTOR_COMMENT_HOOK_END

  •       this->implementation (impl);
  •       this->delete_implementation_ = true;
  •     }
  • }

  • ACE_Reactor::~ACE_Reactor (void)
  • {
  •   this->implementation ()->close ();
  •   if (this->delete_implementation_)
  •     delete this->implementation ();
  • }

  • // Process-wide ACE_Reactor.
  • ACE_Reactor *ACE_Reactor::reactor_ = 0;

  • // Controls whether the Reactor is deleted when we shut down (we can
  • // only delete it safely if we created it!)
  • bool ACE_Reactor::delete_reactor_ = false;

  • ACE_Reactor *
  • ACE_Reactor::instance (void)
  • {
  •   ACE_TRACE ("ACE_Reactor::instance");

  •   if (ACE_Reactor::reactor_ == 0)
  •     {
  •       // Perform Double-Checked Locking Optimization.
  •       ACE_MT (ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, ace_mon,
  •                                 *ACE_Static_Object_Lock::instance (), 0));

  •       if (ACE_Reactor::reactor_ == 0)
  •         {
  •           ACE_NEW_RETURN (ACE_Reactor::reactor_,
  •                           ACE_Reactor,
  •                           0);
  •           ACE_Reactor::delete_reactor_ = true;
  •           ACE_REGISTER_FRAMEWORK_COMPONENT(ACE_Reactor, ACE_Reactor::reactor_)
  •         }
  •     }
  •   return ACE_Reactor::reactor_;
  • }

  • ACE_Reactor *
  • ACE_Reactor::instance (ACE_Reactor *r, bool delete_reactor)
  • {
  •   ACE_TRACE ("ACE_Reactor::instance");

  •   ACE_MT (ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, ace_mon,
  •                             *ACE_Static_Object_Lock::instance (), 0));
  •   ACE_Reactor *t = ACE_Reactor::reactor_;
  •   ACE_Reactor::delete_reactor_ = delete_reactor;

  •   ACE_Reactor::reactor_ = r;

  •   // We can't register the Reactor singleton as a framework component twice.
  •   // Therefore we test to see if we had an existing reactor instance, which
  •   // if so means it must have already been registered.
  •   if (t == 0)
  •     ACE_REGISTER_FRAMEWORK_COMPONENT(ACE_Reactor, ACE_Reactor::reactor_);

  •   return t;
  • }

  • void
  • ACE_Reactor::close_singleton (void)
  • {
  •   ACE_TRACE ("ACE_Reactor::close_singleton");

  •   ACE_MT (ACE_GUARD (ACE_Recursive_Thread_Mutex, ace_mon,
  •                      *ACE_Static_Object_Lock::instance ()));

  •   if (ACE_Reactor::delete_reactor_)
  •     {
  •       delete ACE_Reactor::reactor_;
  •       ACE_Reactor::reactor_ = 0;
  •       ACE_Reactor::delete_reactor_ = false;
  •     }
  • }

  • const ACE_TCHAR *
  • ACE_Reactor::dll_name (void)
  • {
  •   return ACE_TEXT ("ACE");
  • }

  • const ACE_TCHAR *
  • ACE_Reactor::name (void)
  • {
  •   return ACE_TEXT ("ACE_Reactor");
  • }

  • int
  • ACE_Reactor::check_reconfiguration (ACE_Reactor *)
  • {
  • #if !defined (ACE_HAS_WINCE)  &&  !defined (ACE_LACKS_ACE_SVCCONF)
  •   if (ACE_Service_Config::reconfig_occurred ())
  •     {
  •       ACE_Service_Config::reconfigure ();
  •       return 1;
  •     }
  • #endif /* ! ACE_HAS_WINCE || ! ACE_LACKS_ACE_SVCCONF */
  •   return 0;
  • }

  • int
  • ACE_Reactor::run_reactor_event_loop (REACTOR_EVENT_HOOK eh)
  • {
  •   ACE_TRACE ("ACE_Reactor::run_reactor_event_loop");

  •   if (this->reactor_event_loop_done ())
  •     return 0;

  •   while (1)
  •     {
  •       int const result = this->implementation_->handle_events ();

  •       if (eh != 0 && (*eh)(this))
  •         continue;
  •       else if (result == -1 && this->implementation_->deactivated ())
  •         return 0;
  •       else if (result == -1)
  •         return -1;
  •     }

  •   ACE_NOTREACHED (return 0;)
  • }

  • int
  • ACE_Reactor::run_alertable_reactor_event_loop (REACTOR_EVENT_HOOK eh)
  • {
  •   ACE_TRACE ("ACE_Reactor::run_alertable_reactor_event_loop");

  •   if (this->reactor_event_loop_done ())
  •     return 0;

  •   while (1)
  •     {
  •       int const result = this->implementation_->alertable_handle_events ();

  •       if (eh != 0 && (*eh)(this))
  •         continue;
  •       else if (result == -1 && this->implementation_->deactivated ())
  •         return 0;
  •       else if (result == -1)
  •         return -1;
  •     }

  •   ACE_NOTREACHED (return 0;)
  • }

  • int
  • ACE_Reactor::run_reactor_event_loop (ACE_Time_Value &tv,
  •                                      REACTOR_EVENT_HOOK eh)
  • {
  •   ACE_TRACE ("ACE_Reactor::run_reactor_event_loop");

  •   if (this->reactor_event_loop_done ())
  •     return 0;

  •   while (1)
  •     {
  •       int result = this->implementation_->handle_events (tv);

  •       if (eh != 0 && (*eh) (this))
  •         continue;
  •       else if (result == -1)
  •         {
  •           if (this->implementation_->deactivated ())
  •             result = 0;
  •           return result;
  •         }
  •       else if (result == 0)
  •         {
  •           // The <handle_events> method timed out without dispatching
  •           // anything.  Because of rounding and conversion errors and
  •           // such, it could be that the wait loop (WFMO, select, etc.)
  •           // timed out, but the timer queue said it wasn't quite ready
  •           // to expire a timer. In this case, the ACE_Time_Value we
  •           // passed into handle_events won't have quite been reduced
  •           // to 0, and we need to go around again. If we are all the
  •           // way to 0, just return, as the entire time the caller
  •           // wanted to wait has been used up.
  •           if (tv.usec () > 0)
  •             continue;
  •           return 0;
  •         }
  •       // Else there were some events dispatched; go around again
  •     }

  •   ACE_NOTREACHED (return 0;)
  • }

  • int
  • ACE_Reactor::run_alertable_reactor_event_loop (ACE_Time_Value &tv,
  •                                                REACTOR_EVENT_HOOK eh)
  • {
  •   ACE_TRACE ("ACE_Reactor::run_alertable_reactor_event_loop");

  •   if (this->reactor_event_loop_done ())
  •     return 0;

  •   for (;;)
  •     {
  •       int result = this->implementation_->alertable_handle_events (tv);

  •       if (eh != 0 && (*eh)(this))
  •         continue;
  •       else if (result == -1 && this->implementation_->deactivated ())
  •         return 0;
  •       else if (result <= 0)
  •         return result;
  •     }
  • }

  • int
  • ACE_Reactor::register_handler (ACE_Event_Handler *event_handler,
  •                                ACE_Reactor_Mask mask)
  • {
  •   // Remember the old reactor.
  •   ACE_Reactor *old_reactor = event_handler->reactor ();

  •   // Assign *this* <Reactor> to the <Event_Handler>.
  •   event_handler->reactor (this);

  •   int result = this->implementation ()->register_handler (event_handler, mask);
  •   if (result == -1)
  •     // Reset the old reactor in case of failures.
  •     event_handler->reactor (old_reactor);

  •   return result;
  • }

  • int
  • ACE_Reactor::register_handler (ACE_HANDLE io_handle,
  •                                ACE_Event_Handler *event_handler,
  •                                ACE_Reactor_Mask mask)
  • {
  •   // Remember the old reactor.
  •   ACE_Reactor *old_reactor = event_handler->reactor ();

  •   // Assign *this* <Reactor> to the <Event_Handler>.
  •   event_handler->reactor (this);

  •   int result = this->implementation ()->register_handler (io_handle,
  •                                                           event_handler,
  •                                                           mask);
  •   if (result == -1)
  •     // Reset the old reactor in case of failures.
  •     event_handler->reactor (old_reactor);

  •   return result;
  • }

  • #if defined (ACE_WIN32)

  • int
  • ACE_Reactor::register_handler (ACE_Event_Handler *event_handler,
  •                                ACE_HANDLE event_handle)
  • {
  •   // Remember the old reactor.
  •   ACE_Reactor *old_reactor = event_handler->reactor ();

  •   // Assign *this* <Reactor> to the <Event_Handler>.
  •   event_handler->reactor (this);

  •   int result = this->implementation ()->register_handler (event_handler,
  •                                                           event_handle);
  •   if (result == -1)
  •     // Reset the old reactor in case of failures.
  •     event_handler->reactor (old_reactor);

  •   return result;
  • }

  • #endif /* ACE_WIN32 */

  • int
  • ACE_Reactor::register_handler (ACE_HANDLE event_handle,
  •                                ACE_HANDLE io_handle,
  •                                ACE_Event_Handler *event_handler,
  •                                ACE_Reactor_Mask mask)
  • {
  •   // Remember the old reactor.
  •   ACE_Reactor *old_reactor = event_handler->reactor ();

  •   // Assign *this* <Reactor> to the <Event_Handler>.
  •   event_handler->reactor (this);

  •   int result = this->implementation ()->register_handler (event_handle,
  •                                                           io_handle,
  •                                                           event_handler,
  •                                                           mask);
  •   if (result == -1)
  •     // Reset the old reactor in case of failures.
  •     event_handler->reactor (old_reactor);

  •   return result;
  • }

  • int
  • ACE_Reactor::register_handler (const ACE_Handle_Set &handles,
  •                                ACE_Event_Handler *event_handler,
  •                                ACE_Reactor_Mask mask)
  • {
  •   // Remember the old reactor.
  •   ACE_Reactor *old_reactor = event_handler->reactor ();

  •   // Assign *this* <Reactor> to the <Event_Handler>.
  •   event_handler->reactor (this);

  •   int result = this->implementation ()->register_handler (handles,
  •                                                           event_handler,
  •                                                           mask);
  •   if (result == -1)
  •     // Reset the old reactor in case of failures.
  •     event_handler->reactor (old_reactor);

  •   return result;
  • }

  • long
  • ACE_Reactor::schedule_timer (ACE_Event_Handler *event_handler,
  •                              const void *arg,
  •                              const ACE_Time_Value &delta,
  •                              const ACE_Time_Value &interval)
  • {
  •   // Remember the old reactor.
  •   ACE_Reactor *old_reactor = event_handler->reactor ();

  •   // Assign *this* <Reactor> to the <Event_Handler>.
  •   event_handler->reactor (this);

  •   long result = this->implementation ()->schedule_timer (event_handler,
  •                                                          arg,
  •                                                          delta,
  •                                                          interval);
  •   if (result == -1)
  •     // Reset the old reactor in case of failures.
  •     event_handler->reactor (old_reactor);

  •   return result;
  • }

  • int
  • ACE_Reactor::schedule_wakeup (ACE_Event_Handler *event_handler,
  •                               ACE_Reactor_Mask masks_to_be_added)
  • {
  •   // Remember the old reactor.
  •   ACE_Reactor *old_reactor = event_handler->reactor ();

  •   // Assign *this* <Reactor> to the <Event_Handler>.
  •   event_handler->reactor (this);

  •   int result = this->implementation ()->schedule_wakeup (event_handler,
  •                                                          masks_to_be_added);
  •   if (result == -1)
  •     // Reset the old reactor in case of failures.
  •     event_handler->reactor (old_reactor);

  •   return result;
  • }

  • int
  • ACE_Reactor::notify (ACE_Event_Handler *event_handler,
  •                      ACE_Reactor_Mask mask,
  •                      ACE_Time_Value *tv)
  • {
  •   // First, try to remember this reactor in the event handler, in case
  •   // the event handler goes away before the notification is delivered.
  •   if (event_handler != 0 && event_handler->reactor () == 0)
  •     {
  •       event_handler->reactor (this);
  •     }
  •   return this->implementation ()->notify (event_handler, mask, tv);
  • }

  • int
  • ACE_Reactor::reset_timer_interval
  •   (long timer_id,
  •    const ACE_Time_Value &interval)
  • {
  •   ACE_TRACE ("ACE_Reactor::reset_timer_interval");

  •   return this->implementation ()->reset_timer_interval (timer_id, interval);
  • }

  • int
  • ACE_Reactor::cancel_timer (ACE_Event_Handler *event_handler,
  •                            int dont_call_handle_close)
  • {
  •   return this->implementation ()->cancel_timer (event_handler,
  •                                                 dont_call_handle_close);
  • }

  • int
  • ACE_Reactor::cancel_timer (long timer_id,
  •                            const void **arg,
  •                            int dont_call_handle_close)
  • {
  •   return this->implementation ()->cancel_timer (timer_id,
  •                                                 arg,
  •                                                 dont_call_handle_close);
  • }

  • ACE_END_VERSIONED_NAMESPACE_DECL


二、ACE Reactor 框架类
1、ACE_Time_Value :提供时间和持续时间的可移植、规范化的表示,使用C++运算符重载来简化与时间有关的算术和关系运算。
2、ACE_Event_Handler :抽象类,其接口定义的挂钩方法是 ACE_Reactor 回调的目标。大多数通过ACE开发的应用事件处理器都是ACE_Event_Handler的后代。
3、ACE_Timer_Queue:抽象类,定义定时器队列的能力和接口。ACE含有多种派生自ACE_Timer_Queue的类,为不同的定时机制提供了灵活的支持。
4、ACE_ Reactor:提供一个接口,用来在 Reactor 框架中管理事件处理器登记,并执行事件循环来驱动事件检测、多路分离的分派。
这些类在 Reactor 模式中扮演了以下角色:
1、事件基础设施层类(Event Infrastructure Layer Classes) 该类同步地检测事件并多路分离给事件处理器,并随即分派与之相关联的事件处理器挂钩方法。 ACE Reactor 框架中的基础设施层组件包括 ACE_Time_Value、ACE_Event_Handler、ACE定时器队列类,以及 ACE_Reactor的各种实现。
2、应用层类(Application Layer Classes) 该类定义事件处理器,以在其挂钩方法中执行应用所定义的处理。在 ACE Reactor 框架中,应用层类都是 ACE_Event_Handler 的后代。
三、ACE Reactor 框架的优点:
1、广泛的可移植性。
可以对框架进行配置,使用多种 OS 事件多路分离机制。
2、使事件检测、多路分离,以及分派自动化。
通过消除对不可移植的本地 OS 事件多路分离 API 的依赖,ACE Reactor 框架为应用提供了统一的面向对象事件检测,多路分离,以及分派机制,可以向 ACE_Reactor 登记事件处理器对象来处理各种类型的事件。
3、透明的可扩展性。
框架通过继承的动态绑定,采用挂钩方法,解除了“较低级的事件机制”(如检测多个I/O句柄上的事件,使定时器到期,以及多路分离和分派适当的事件处理器的方法来处理事件) 与“较高级的应用事件处理策略”(如连接建立策略、数据整编和解整编,以及对客户请求的处理) 的耦合。如此的设计使开发者能在不修改已有应用代码的情况下,对 ACE Reactor 框架进行透明扩展。
4、增加复用并使错误减至最少。
ACE Reactor 框架的事件检测、多路分离,以及分派机制是通用的,因而可被许多网络化应用复用。如此的事务分离使得开发者能够专注于高级的、应用所定义的事件处理器策略,而不是反复地与低级机制进行斗争。
5、高效的事件多路分离。
ACE Reactor 框架可高效地执行其事件多路分离和分派逻辑,如 ACE_Select_Reactor 使用了ACE_Handler_Set_Iteratorwrapper facade(包装外观)。它的 Iterator 模式中的优化实现基于成熟的算法,将运行时复杂度大大降低,从而极大地提高了大型应用的运行时性能。


更多文章欢迎访问:http://blog.csdn.net/wallwind

发表于 2012-8-25 13:36:20 | 显示全部楼层
好文。没人回复?应该能回复的吧、:lol
发表于 2012-9-6 12:36:39 | 显示全部楼层
注释太少
发表于 2012-11-12 14:19:18 | 显示全部楼层
看到这么多代码 ,有点晕了。
发表于 2013-9-16 12:51:00 | 显示全部楼层
好贴,版主应该增加这里代码级说明的帖子,另外,如果能够从总体架构上说明为什么这么设计,以及这样设计优点就更好了
发表于 2013-10-28 17:03:38 | 显示全部楼层
画个图示意下就更好了!好帖!
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-12-4 00:46 , Processed in 0.020395 second(s), 6 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表