找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 9568|回复: 0

ACE_TP_Reactor 实现 Leader-Follower 线程模型分析

[复制链接]
发表于 2014-3-6 15:09:31 | 显示全部楼层 |阅读模式
本帖最后由 yunh 于 2014-3-6 15:53 编辑

        之前一篇文章分析过 ACE_Select_Reactor 在多线程环境下的运作情况(http://www.acejoy.com/ace/thread-5804-1-1.html),虽然可以使用多个线程在 Select_Reactor 上工作,但如果想要充分利用多线程并发的好处,ACE 推荐使用 ACE_TP_Reactor,其中 TP 即 Thread-Pool 的缩写。与 ACE_WFMO_Reactor 的完全多线程并行处理不同,TP_Reactor 仅实现了部分的并行,这是由于 select 系统调用本身不是多线程安全的,所以在侦测事件时必需保证单线程,但在分派事件时却可以使用多线程,为此 TP_Reactor 实现了一种称为 Leader-Follower 的线程模型:一组线程中,有一个线程充当 Leader 角色,负责侦测事件,其它线程处在空闲待命状态;当事件到达,Leader 线程开始分派事件,在此之前,它选取线程池中的一个线程充当 Leader 角色,而自己在分派完事件后,加入空闲线程池,成为 Follower 角色,如此循环往复,不断推动 Reactor 运行。TP_Reactor 本身派生自 Select_Reactor:


ACE_Select_Reactor 实际是 ACE_Select_Reactor_T 的模板实例化

        站在 Select_Reactor 的肩膀上,TP_Reactor 只做了少量修改,就实现了上述线程模型,具体罗列如下:
        1. 固定打开 Reactor Notify 机制;
        2. 固定压制分派通知事件的 renew 调用 (通过 supress_notify_renew 接口);
        3. 侦测到事件后,一个线程只分派一个事件就返回到事件循环开始处;
        4. 在分派事件前,先释放 Token;
        5. 对于 IO 事件,分派前先 suspend 之;分派后,再 resume 之;
        6. 事件循环中使用 ACE_TP_Token_Guard 取代一般的 ACE_GUARD 宏来获取 Token。
        下面分两个场景分析,第一个场景是主线程 M 运行 TP_Reactor 事件循环,子线程 N 只调用 Reactor 接口更新其状态,如 schedule_timer,线程函数如下:
  1.     while (thr_mgr ()->testcancel (ACE_Thread::self ()) == 0)
  2.     {
  3.         if(timer_ == 0)
  4.         {
  5.             timer_ = reactor ()->schedule_timer (this, 0, ACE_Time_Value(1, 0));
  6.             ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) schedule timer %u.\n", timer_)));
  7.         }
  8.         else
  9.         {
  10.             reactor ()->cancel_timer (timer_);
  11.             ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) cancel timer %u.\n", timer_)));
  12.             timer_ = 0;
  13.         }
  14.         ACE_OS::sleep (3);
  15.     }
复制代码
       同 Select_Reactor 的情况一样,线程 N 在调用接口前加锁,ACE_Token 的加锁通知机制会回调当前锁的拥有者线程 M 之 sleep_hook 方法,在 ACE_Select_Reactor_Token_T  的重载中,向 Reactor 发送 Notify 事件,导致线程 M 从 select 中唤醒,释放 Token,使得线程 N 成功获取锁,从而修改 Reactor 状态。之后线程 N 释放锁,又导致线程 M 获取锁成功,可以继续运行事件循环。注意这里与 Select_Reactor 不同的是,不在分派 Notify 事件时调用 renew (即使调用也无效,因为设置了 supress_notify_renew_ 标志),所以线程 M 是在回到循环开始处等待,而不是在分派事件时等待。运行测试程序,在没有任何连接的情况下,线程 N 可以正常调用 Reactor 接口并输出:
  1. (8036) acceptor opened.
  2. (2628) schedule timer 30277332.
  3. (8036) handle timeout !
  4. (2628) cancel timer 30277332.
  5. (2628) schedule timer 30277332.
  6. (8036) handle timeout !
  7. (2628) cancel timer 30277332.
复制代码
       第二个场景是主线程 M 与子线程 N 同时运行事件循环,线程函数如下:
  1.    ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) start reactor event loop !\n")));
  2.     //reactor ()->owner (ACE_Thread::self ());
  3.     reactor ()->run_reactor_event_loop ();
  4.     ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) end reactor event loop !\n")));
复制代码
       与 Select_Reactor 不同,TP_Reactor 在调用事件循环前,不需要指定线程为 owner。假定线程 M 先获取锁,TP_Reactor 的 handle_events 中加锁代码如下:
  1.   ACE_TP_Token_Guard guard (this->token_);
  2.   int result = guard.grab_token (max_wait_time);
复制代码
       这里没有像 Select_Reactor 一样使用一般的 ACE_GUARD_RETURN 宏,后者间接定义的是 ACE_Guard 模板对象,在构造器中默认会调用 LOCK 的 acquire 方法;而 ACE_TP_Token_Guard 则不会在构造器中调用任何锁的方法,而是对象构造后,使用 grab_token/acquire_token 来显示获取锁,析构器中调用 release 则是一致的。grab_token 的主要调用代码如下:
  1.       ACE_MT (result = this->token_.acquire_read (&ACE_TP_Reactor::no_op_sleep_hook));
复制代码
       这里有两个明显区别,一是调用 acqure_read,ACE_Token 有两个等待队列,读队列与写队列,写队列拥有更高的优先级,其它接口一般是调用 acqure 在写队列上等待,也就是说,如果两个线程,一个想运行事件循环,一个想修改 TP_Reactor 状态,那么后者将先被调度;二是提供锁的 sleep_hook 参数,这样一来,当取得锁失败 (被其它线程占有),它将调用这个传入的参数,而不是锁自己的同名回调函数,而传入的这个函数 no_op_sleep_hook 顾名思义是一个空方法,也就是说,它覆盖了默认向 Reactor 发送通知的实现,当多个线程同时运行事件循环时,保持当前线程处在 select 调用中。否则就如同之前对 Select_Reactor 的分析一样,多个线程会交替通知,导致 Reactor 无法正常工作。
        如果此时客户端连接服务器并发送数据,线程 M 从 select 中唤醒,处理 IO 事件,在处理前,先将此句柄从 Reactor 中挂起 (suspend),接着释放 Token,然后开始分派事件,最后从 Reactor 中恢复该句柄。在 Select_Reactor 中挂起句柄就是从侦听的句柄集合中移到备用集合,恢复操作正好相反,这样的处理主要是考虑到当线程 M 还未分派此句柄上的事件时,线程 N 进入 select 发现该句柄仍处在激活状态,并尝试对它继续分派,造成多次重复分派。开始分派事件前,线程 M 的侦测工作就正式结束了,所以它释放锁,这会唤醒在锁上等待的线程 N,线程 N 获取锁后,进入 select 接着再侦测事件,由此完成了一次线程交接班。线程 M 分派完事件后,又会回到事件循环开始,等待进入锁,如果整个线程池只有 M 一个线程,那就只有等它分派完事件后,才可以再次进行事件检测了,这就退化到了和 Select_Reactor 一致的情形。使用 TP_Reactor 后,分派事件的线程确实不在被限制为主线程了:
  1. (7436) acceptor opened.
  2. (5436) start reactor event loop !
  3. (7436) Connection from 127.0.0.1:62283
  4. (5436) recv message: Iteration 1.
  5. (7436) recv message: Iteration 2.
  6. (5436) recv message: Iteration 3.
  7. (7436) recv message: Iteration 4.
  8. (5436) recv message: Iteration 5.
  9. (7436) Connection closed.
  10. (6148|7436) handle_close
复制代码
       可以看到主线程 7436 与子线程 5436 都有机会分派事件(实际上基本是交替进行的)。到这里呢,就基本分析完了,但有一点还需要着重说明一下,就是在处理器回调(handle_input/output/signal) 中调用 Reactor 接口会有什么变化,例如当 Acceptor 的 handle_input 被调用时,它一般会调用 register_handler 将新建立的连接注册到 Reactor 中,在 Select_Reactor 中,由于分派事件的线程与负责事件循环的线程是一个线程,register_handler 在进锁时只增加锁的重入次数,所以这不成问题;但在 TP_Reactor 中,分派事件的线程往往不是当前侦测事件的线程,register_handler 在进锁时,会发送 Notify 事件将当前阻塞在 select 上的线程唤醒,并在锁上等待,由于它进入的是 Writer 队列,比其它所有在 Reader 队列上等待的空闲线程优先级都高,所以可以保证 Leader 线程在release 锁时,它第一个获取锁。其实之前曾经提到,一个线程在派发事件前后,会分别调用 suspend_handler/resume_handler 来临时挂起句柄,代码如下:
  1.   // Suspend the handler so that other threads don't start dispatching
  2.   // it.
  3.   //
  4.   // NOTE: This check was performed in older versions of the
  5.   // TP_Reactor. Looks like it is a waste..
  6.   if (dispatch_info.event_handler_ != this->notify_handler_)
  7.       this->suspend_i (dispatch_info.handle_);            // SUSPENDED
  8.   int resume_flag =
  9.     dispatch_info.event_handler_->resume_handler ();
  10.   int reference_counting_required =
  11.     dispatch_info.event_handler_->reference_counting_policy ().value () ==
  12.     ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
  13.   // Call add_reference() if needed.
  14.   if (reference_counting_required)
  15.     {
  16.       dispatch_info.event_handler_->add_reference ();
  17.     }
  18.   // Release the lock.  Others threads can start waiting.
  19.   guard.release_token ();
  20.   int result = 0;
  21.   // If there was an event handler ready, dispatch it.
  22.   // Decrement the event left
  23.   --event_count;
  24.   // Dispatched an event
  25. if (this->dispatch_socket_event (dispatch_info) == 0)
  26.     ++result;
  27.   // Resume handler if required.
  28.   if (dispatch_info.event_handler_ != this->notify_handler_ &&
  29.       resume_flag == ACE_Event_Handler::ACE_REACTOR_RESUMES_HANDLER)
  30.       this->resume_handler (dispatch_info.handle_);       // RESUMED
复制代码
       看出什么不同来了吗? release_token 前的调用是 suspend_i,即内部版本,不需要加锁,而 release_token 后的调用是 resume_handler,即外部版本,需要加锁。前面的很好理解,当前锁还未释放,所以可以直接调用内部版本,后面的也好理解,锁已经释放,事件已经派发完毕,完成 Leader 到 Follower 角色转变,所以调用接口要使用加锁版本。但这样一来,运行效率必然大大降低,因为 Follower 刚转变成 Leader,就被通知要修改 Reactor 状态,从 select 跳出,进入线程池等待,直到 resume_handler 完成,才可以重新进入事件检测。每次处理事件,都要来这么一下,对性能的影响究竟有多大,我在 Token 的 sleep_hook 中插入一段代码:
  1. template <class ACE_SELECT_REACTOR_MUTEX> void
  2. ACE_Select_Reactor_Token_T<ACE_SELECT_REACTOR_MUTEX>::sleep_hook (void)
  3. {
  4.   ACE_TRACE ("ACE_Select_Reactor_Token_T::sleep_hook");
  5.   ACE_DEBUG ((LM_DEBUG, "(%t) wake leader thread up\n"));        // ADDED
  6.   if (this->select_reactor_->notify () == -1)
  7.     ACE_ERROR ((LM_ERROR,
  8.                 ACE_LIB_TEXT ("%p\n"),
  9.                 ACE_LIB_TEXT ("sleep_hook failed")));
  10. }
复制代码
       重新编译 ACE,观察输出如下:
  1. (7780) acceptor opened.
  2. (4636) start reactor event loop !
  3. (7780) wake leader thread up
  4. (7780) Connection from 127.0.0.1:56831
  5. (4636) recv message: Iteration 1.
  6. (4636) wake leader thread up
  7. (7780) recv message: Iteration 2.
  8. (7780) wake leader thread up
  9. (4636) recv message: Iteration 3.
  10. (4636) wake leader thread up
  11. (7780) recv message: Iteration 4.
  12. (7780) wake leader thread up
  13. (4636) recv message: Iteration 5.
  14. (4636) wake leader thread up
  15. (7780) Connection closed.
  16. (7780) wake leader thread up
  17. (3392|7780) handle_close
  18. (7780) wake leader thread up
复制代码
       与预料的一样,几乎每次事件处理,都附加着对 leader 线程的通知,这不得不说是一个瑕疵。此外,从整个 TP_Reactor 的运行原理来看,可以看出,当一个句柄在分派事件时,它是不在 select 检测的句柄集中的,所以此时发生的任何事件,Reactor 都不再通知,除非事件处理器及时返回,否则会影响对句柄上其它事件的检测。另外句柄上的多种事件的回调函数不存在多线程竞争关系,因为一次只能回调一种,用户需要同步的应当是 handle_signal/timeout 与其它 IO 事件回调函数,IO 之间的回调不需要同步。如果回调函数返回值大于0,还可以持续获得该线程的回调,ACE 代码如下:
  1.   // Upcall. If the handler returns positive value (requesting a
  2.   // reactor callback) don't set the ready-bit because it will be
  3.   // ignored if the reactor state has changed. Just call back
  4.   // as many times as the handler requests it. Other threads are off
  5.   // handling other things.
  6.   int status = 1;
  7.   while (status > 0)
  8.     status = (event_handler->*callback) (handle);
复制代码
       因为当前线程已经脱离了 leader 角色,再次回调并不影响整个事件循环。如果线程池中的线程远远少于并发连接,那么就有可能存在这样一种情景,所有线程都忙于派发事件,而没有线程可以回到线程池并继续检测事件,此时其它链路上有事件发生时,将得不到及时的处理,所以一定要为你的应用提供足够多的线程来运行 TP_Reactor。
        至此整个分析就基本完成,附件是测试程序,与 Select_Reactor 的基本相同,只是将实现换为了 ACE_TP_Reactor 而已(可以通过 USE_TP_Reactor 切换),两种情景中线程函数的切换位于 Client_Service::svc 中,通过改变 #if 条件,可以测试不同的情形,默认是情形 2。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?用户注册

×
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-4-25 22:00 , Processed in 0.034701 second(s), 10 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表