ACE_TP_Reactor 实现 Leader-Follower 线程模型分析
本帖最后由 yunh 于 2014-3-6 15:53 编辑之前一篇文章分析过 ACE_Select_Reactor 在多线程环境下的运作情况(http://www.acejoy.com/ace/thread-5804-1-1.html),虽然可以使用多个线程在 Select_Reactor 上工作,但如果想要充分利用多线程并发的好处,ACE 推荐使用 ACE_TP_Reactor,其中 TP 即 Thread-Pool 的缩写。与 ACE_WFMO_Reactor 的完全多线程并行处理不同,TP_Reactor 仅实现了部分的并行,这是由于 select 系统调用本身不是多线程安全的,所以在侦测事件时必需保证单线程,但在分派事件时却可以使用多线程,为此 TP_Reactor 实现了一种称为 Leader-Follower 的线程模型:一组线程中,有一个线程充当 Leader 角色,负责侦测事件,其它线程处在空闲待命状态;当事件到达,Leader 线程开始分派事件,在此之前,它选取线程池中的一个线程充当 Leader 角色,而自己在分派完事件后,加入空闲线程池,成为 Follower 角色,如此循环往复,不断推动 Reactor 运行。TP_Reactor 本身派生自 Select_Reactor:
ACE_Select_Reactor 实际是 ACE_Select_Reactor_T 的模板实例化
站在 Select_Reactor 的肩膀上,TP_Reactor 只做了少量修改,就实现了上述线程模型,具体罗列如下:
1. 固定打开 Reactor Notify 机制;
2. 固定压制分派通知事件的 renew 调用 (通过 supress_notify_renew 接口);
3. 侦测到事件后,一个线程只分派一个事件就返回到事件循环开始处;
4. 在分派事件前,先释放 Token;
5. 对于 IO 事件,分派前先 suspend 之;分派后,再 resume 之;
6. 事件循环中使用 ACE_TP_Token_Guard 取代一般的 ACE_GUARD 宏来获取 Token。
下面分两个场景分析,第一个场景是主线程 M 运行 TP_Reactor 事件循环,子线程 N 只调用 Reactor 接口更新其状态,如 schedule_timer,线程函数如下:
while (thr_mgr ()->testcancel (ACE_Thread::self ()) == 0)
{
if(timer_ == 0)
{
timer_ = reactor ()->schedule_timer (this, 0, ACE_Time_Value(1, 0));
ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) schedule timer %u.\n", timer_)));
}
else
{
reactor ()->cancel_timer (timer_);
ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) cancel timer %u.\n", timer_)));
timer_ = 0;
}
ACE_OS::sleep (3);
} 同 Select_Reactor 的情况一样,线程 N 在调用接口前加锁,ACE_Token 的加锁通知机制会回调当前锁的拥有者线程 M 之 sleep_hook 方法,在 ACE_Select_Reactor_Token_T的重载中,向 Reactor 发送 Notify 事件,导致线程 M 从 select 中唤醒,释放 Token,使得线程 N 成功获取锁,从而修改 Reactor 状态。之后线程 N 释放锁,又导致线程 M 获取锁成功,可以继续运行事件循环。注意这里与 Select_Reactor 不同的是,不在分派 Notify 事件时调用 renew (即使调用也无效,因为设置了 supress_notify_renew_ 标志),所以线程 M 是在回到循环开始处等待,而不是在分派事件时等待。运行测试程序,在没有任何连接的情况下,线程 N 可以正常调用 Reactor 接口并输出:
(8036) acceptor opened.
(2628) schedule timer 30277332.
(8036) handle timeout !
(2628) cancel timer 30277332.
(2628) schedule timer 30277332.
(8036) handle timeout !
(2628) cancel timer 30277332. 第二个场景是主线程 M 与子线程 N 同时运行事件循环,线程函数如下:
ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) start reactor event loop !\n")));
//reactor ()->owner (ACE_Thread::self ());
reactor ()->run_reactor_event_loop ();
ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) end reactor event loop !\n"))); 与 Select_Reactor 不同,TP_Reactor 在调用事件循环前,不需要指定线程为 owner。假定线程 M 先获取锁,TP_Reactor 的 handle_events 中加锁代码如下:
ACE_TP_Token_Guard guard (this->token_);
int result = guard.grab_token (max_wait_time); 这里没有像 Select_Reactor 一样使用一般的 ACE_GUARD_RETURN 宏,后者间接定义的是 ACE_Guard 模板对象,在构造器中默认会调用 LOCK 的 acquire 方法;而 ACE_TP_Token_Guard 则不会在构造器中调用任何锁的方法,而是对象构造后,使用 grab_token/acquire_token 来显示获取锁,析构器中调用 release 则是一致的。grab_token 的主要调用代码如下:
ACE_MT (result = this->token_.acquire_read (&ACE_TP_Reactor::no_op_sleep_hook)); 这里有两个明显区别,一是调用 acqure_read,ACE_Token 有两个等待队列,读队列与写队列,写队列拥有更高的优先级,其它接口一般是调用 acqure 在写队列上等待,也就是说,如果两个线程,一个想运行事件循环,一个想修改 TP_Reactor 状态,那么后者将先被调度;二是提供锁的 sleep_hook 参数,这样一来,当取得锁失败 (被其它线程占有),它将调用这个传入的参数,而不是锁自己的同名回调函数,而传入的这个函数 no_op_sleep_hook 顾名思义是一个空方法,也就是说,它覆盖了默认向 Reactor 发送通知的实现,当多个线程同时运行事件循环时,保持当前线程处在 select 调用中。否则就如同之前对 Select_Reactor 的分析一样,多个线程会交替通知,导致 Reactor 无法正常工作。
如果此时客户端连接服务器并发送数据,线程 M 从 select 中唤醒,处理 IO 事件,在处理前,先将此句柄从 Reactor 中挂起 (suspend),接着释放 Token,然后开始分派事件,最后从 Reactor 中恢复该句柄。在 Select_Reactor 中挂起句柄就是从侦听的句柄集合中移到备用集合,恢复操作正好相反,这样的处理主要是考虑到当线程 M 还未分派此句柄上的事件时,线程 N 进入 select 发现该句柄仍处在激活状态,并尝试对它继续分派,造成多次重复分派。开始分派事件前,线程 M 的侦测工作就正式结束了,所以它释放锁,这会唤醒在锁上等待的线程 N,线程 N 获取锁后,进入 select 接着再侦测事件,由此完成了一次线程交接班。线程 M 分派完事件后,又会回到事件循环开始,等待进入锁,如果整个线程池只有 M 一个线程,那就只有等它分派完事件后,才可以再次进行事件检测了,这就退化到了和 Select_Reactor 一致的情形。使用 TP_Reactor 后,分派事件的线程确实不在被限制为主线程了:
(7436) acceptor opened.
(5436) start reactor event loop !
(7436) Connection from 127.0.0.1:62283
(5436) recv message: Iteration 1.
(7436) recv message: Iteration 2.
(5436) recv message: Iteration 3.
(7436) recv message: Iteration 4.
(5436) recv message: Iteration 5.
(7436) Connection closed.
(6148|7436) handle_close 可以看到主线程 7436 与子线程 5436 都有机会分派事件(实际上基本是交替进行的)。到这里呢,就基本分析完了,但有一点还需要着重说明一下,就是在处理器回调(handle_input/output/signal) 中调用 Reactor 接口会有什么变化,例如当 Acceptor 的 handle_input 被调用时,它一般会调用 register_handler 将新建立的连接注册到 Reactor 中,在 Select_Reactor 中,由于分派事件的线程与负责事件循环的线程是一个线程,register_handler 在进锁时只增加锁的重入次数,所以这不成问题;但在 TP_Reactor 中,分派事件的线程往往不是当前侦测事件的线程,register_handler 在进锁时,会发送 Notify 事件将当前阻塞在 select 上的线程唤醒,并在锁上等待,由于它进入的是 Writer 队列,比其它所有在 Reader 队列上等待的空闲线程优先级都高,所以可以保证 Leader 线程在release 锁时,它第一个获取锁。其实之前曾经提到,一个线程在派发事件前后,会分别调用 suspend_handler/resume_handler 来临时挂起句柄,代码如下:
// Suspend the handler so that other threads don't start dispatching
// it.
//
// NOTE: This check was performed in older versions of the
// TP_Reactor. Looks like it is a waste..
if (dispatch_info.event_handler_ != this->notify_handler_)
this->suspend_i (dispatch_info.handle_); // SUSPENDED
int resume_flag =
dispatch_info.event_handler_->resume_handler ();
int reference_counting_required =
dispatch_info.event_handler_->reference_counting_policy ().value () ==
ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
// Call add_reference() if needed.
if (reference_counting_required)
{
dispatch_info.event_handler_->add_reference ();
}
// Release the lock.Others threads can start waiting.
guard.release_token ();
int result = 0;
// If there was an event handler ready, dispatch it.
// Decrement the event left
--event_count;
// Dispatched an event
if (this->dispatch_socket_event (dispatch_info) == 0)
++result;
// Resume handler if required.
if (dispatch_info.event_handler_ != this->notify_handler_ &&
resume_flag == ACE_Event_Handler::ACE_REACTOR_RESUMES_HANDLER)
this->resume_handler (dispatch_info.handle_); // RESUMED
看出什么不同来了吗? release_token 前的调用是 suspend_i,即内部版本,不需要加锁,而 release_token 后的调用是 resume_handler,即外部版本,需要加锁。前面的很好理解,当前锁还未释放,所以可以直接调用内部版本,后面的也好理解,锁已经释放,事件已经派发完毕,完成 Leader 到 Follower 角色转变,所以调用接口要使用加锁版本。但这样一来,运行效率必然大大降低,因为 Follower 刚转变成 Leader,就被通知要修改 Reactor 状态,从 select 跳出,进入线程池等待,直到 resume_handler 完成,才可以重新进入事件检测。每次处理事件,都要来这么一下,对性能的影响究竟有多大,我在 Token 的 sleep_hook 中插入一段代码:
template <class ACE_SELECT_REACTOR_MUTEX> void
ACE_Select_Reactor_Token_T<ACE_SELECT_REACTOR_MUTEX>::sleep_hook (void)
{
ACE_TRACE ("ACE_Select_Reactor_Token_T::sleep_hook");
ACE_DEBUG ((LM_DEBUG, "(%t) wake leader thread up\n")); // ADDED
if (this->select_reactor_->notify () == -1)
ACE_ERROR ((LM_ERROR,
ACE_LIB_TEXT ("%p\n"),
ACE_LIB_TEXT ("sleep_hook failed")));
} 重新编译 ACE,观察输出如下:
(7780) acceptor opened.
(4636) start reactor event loop !
(7780) wake leader thread up
(7780) Connection from 127.0.0.1:56831
(4636) recv message: Iteration 1.
(4636) wake leader thread up
(7780) recv message: Iteration 2.
(7780) wake leader thread up
(4636) recv message: Iteration 3.
(4636) wake leader thread up
(7780) recv message: Iteration 4.
(7780) wake leader thread up
(4636) recv message: Iteration 5.
(4636) wake leader thread up
(7780) Connection closed.
(7780) wake leader thread up
(3392|7780) handle_close
(7780) wake leader thread up 与预料的一样,几乎每次事件处理,都附加着对 leader 线程的通知,这不得不说是一个瑕疵。此外,从整个 TP_Reactor 的运行原理来看,可以看出,当一个句柄在分派事件时,它是不在 select 检测的句柄集中的,所以此时发生的任何事件,Reactor 都不再通知,除非事件处理器及时返回,否则会影响对句柄上其它事件的检测。另外句柄上的多种事件的回调函数不存在多线程竞争关系,因为一次只能回调一种,用户需要同步的应当是 handle_signal/timeout 与其它 IO 事件回调函数,IO 之间的回调不需要同步。如果回调函数返回值大于0,还可以持续获得该线程的回调,ACE 代码如下:// Upcall. If the handler returns positive value (requesting a
// reactor callback) don't set the ready-bit because it will be
// ignored if the reactor state has changed. Just call back
// as many times as the handler requests it. Other threads are off
// handling other things.
int status = 1;
while (status > 0)
status = (event_handler->*callback) (handle); 因为当前线程已经脱离了 leader 角色,再次回调并不影响整个事件循环。如果线程池中的线程远远少于并发连接,那么就有可能存在这样一种情景,所有线程都忙于派发事件,而没有线程可以回到线程池并继续检测事件,此时其它链路上有事件发生时,将得不到及时的处理,所以一定要为你的应用提供足够多的线程来运行 TP_Reactor。
至此整个分析就基本完成,附件是测试程序,与 Select_Reactor 的基本相同,只是将实现换为了 ACE_TP_Reactor 而已(可以通过 USE_TP_Reactor 切换),两种情景中线程函数的切换位于 Client_Service::svc 中,通过改变 #if 条件,可以测试不同的情形,默认是情形 2。
页:
[1]