ACE_Select_Reactor 多线程通知机制分析
本帖最后由 yunh 于 2014-3-5 14:04 编辑ACE_Select_Reactor 是最为常用的一种 Reactor 实现,通常在单线程情况下使用,如果希望并发的处理连接上的事件,多使用 ACE_TP_Reactor,但这并不意味着 Select_Reactor 不可以在多线程环境中使用,当多个线程同时调用 Select_Reactor 的接口时,会发生什么情况,是这篇文章分析的主要内容。
这里选取一个典型场景,主线程 M 负责检测并分派 Select_Reactor 上的 IO 事件,它主要调用 run_reactor_event_loop 接口,其中会使用 select 系统调用阻塞在被检测句柄上,直到超时(有定时器)或 IO 事件到达;子线程 N 负责在适当的时候改变 Select_Reactor 的状态,例如注册或反注册 IO 事件、定时器等,为简单起见,这里没有让线程 N 也调用 run_reactor_event_loop 运行事件循环,而只是调用一些改变状态的接口,如 register_handler/schedule_timer 等,实际例子只试验了调用定时器的情况,其它接口情况类似。
首先上述场景是多线程安全的,不会因为线程 N 改变了 Select_Reactor 的状态,导致多线程竞争问题,这是由于 Select_Reactor 所有对外接口都加锁保护的原因。但这也引出了另一个问题,如果主线程在进入 select 后,陷入等待,而此时并没有释放锁,当线程 N 想要调用 schedule_timer 时,进入接口时获取锁必然也会陷入等待,直到主线程 M 从 select 调用中唤醒,并分派完所有事件,再次准备进入事件检测时,线程 N 才有机会获取锁,并进行更新。这样一分析,如果 Select_Reactor 没有任何事件(IO、定时器)到达,线程 N 岂不要一直等待下去? 这绝对是不可忍受的。
写了一个小程序,想要验证下实际情况是否真的如此,这是一个简单的 Acceptor-Connector 实现的服务器,主线程等待连接到达、接收数据直到连接断开,启动一个单独的线程在 reactor 上不停的调度、取消定时器:
while (thr_mgr ()->testcancel (ACE_Thread::self ()) == 0)
{
if(timer_ == 0)
{
timer_ = reactor ()->schedule_timer (this, 0, ACE_Time_Value(1, 0));
ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) schedule timer %u.\n", timer_)));
}
else
{
reactor ()->cancel_timer (timer_);
ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) cancel timer %u.\n", timer_)));
timer_ = 0;
}
ACE_OS::sleep (3);
} 根据上面的分析,如果在没有连接的情况下,子线程的所有对 Select_Reactor 接口的调用都将是被阻塞的,不会有任何输出,但实际情况却不是这样:
(7108) acceptor opened.
(7960) schedule timer 29490900.
(7108) handle timeout !
(7960) cancel timer 29490900.
(7960) schedule timer 29490900.
(7108) handle timeout !
(7960) cancel timer 29490900. 主线程 7108 打开侦听端口后,并没有连接连上来,也就没有任何 IO 事件,应当阻塞在 select 调用 (虽然没有任何句柄可侦听,但经过调试确实如此,Select_Reactor 有一些内部使用的句柄),而此时子线程 7960 却可以成功的调用 schedule_timer!之后的 handle_timeout 是被主线程 7108 调度的,这可以理解,因为只有主线程在分派事件,但子线程 7960 是如何成功调用 schedule_timer 的呢? 按理说它必需在入口处等待锁呀:
ACE_TRACE ("ACE_Select_Reactor_T::schedule_timer");
ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1));
return this->timer_queue_->schedule
(handler,
arg,
timer_queue_->gettimeofday () + delay_time,
interval); 看下锁的类型,才发现这里边有玄机,ACE_SELECT_REACTOR_TOKEN 不是一般的 ACE_Thread_Mutex 或 ACE_Recursive_Thread_Mutex,而是这个:
typedef ACE_Select_Reactor_Token_T<ACE_SELECT_TOKEN> ACE_Select_Reactor_Token;
typedef ACE_Select_Reactor_T<ACE_Select_Reactor_Token> ACE_Select_Reactor; ACE_Select_Reactor 本身是个 typedef,是一个使用 ACE_Select_Reactor_Token 的模板,而后者又是一个typedef,是一个使用 ACE_SELECT_TOKEN 的模板,它的定义如下:
#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0)
typedef ACE_Token ACE_SELECT_TOKEN;
#else
typedef ACE_Noop_Token ACE_SELECT_TOKEN;
#endif /* ACE_MT_SAFE && ACE_MT_SAFE != 0 */ 原来就是 ACE_Token,最终的锁类型是:
ACE_Select_Reactor_Token_T <ACE_Token> 经过一番探究,发现当线程 N 在此锁上调用 acquire 之前,它会调用锁的当前拥有者 (owner) 线程 M 之 sleep_hook 方法,而后者会向 Select_Reactor 发送一个通知,从而将主线程 M 从 select 调用中唤醒,在之后的事件分派过程中,当分派通知事件时,主线程会主动放弃拥有权,将自己加入锁的等待队列,并通知线程 N 已经获取了锁,线程 N 从 acquire 中醒来,更新 Select_Reactor 内部状态后 release 锁,再次唤醒等待的主线程 M,M 继续向下分派其它事件,从而完成一次线程安全的更新操作。这里由于代码过于复杂,就不具体罗列了。
通过上面的分析可以得知,Select_Reactor 的这种线程合作机制有赖于 Notify 机制,如果我们将其关闭,上面的通知能力也就不复存在,为了验证这一点,在构造 Select_Reactor 时,传递第三个参数为 1,显示关闭通知机制:
ACE_NEW_RETURN(reactor, ACE_Reactor(new ACE_Select_Reactor(0, 0, 1), 1), -1); 重新编译工程,启动服务器端后,发现线程 N 的输出确实没有了:
(7756) acceptor opened. 如果此时使用一个客户端,连上服务器,由于 select 被 IO 事件唤醒,由线程 N 还可以继续运行并输出信息:
(7756) acceptor opened.
(7756) Connection from 127.0.0.1:57903
(6756) schedule timer 25427668.
(7756) recv message: Iteration 1.
(7756) handle timeout !
(7756) recv message: Iteration 2.
(6756) cancel timer 25427668.
(7756) recv message: Iteration 3.
(7756) recv message: Iteration 4.
(7756) recv message: Iteration 5.
(6756) schedule timer 25427668.
(7756) handle timeout !
(7756) Connection closed.
(2252|7756) handle_close
(6756) cancel timer 25427668. 事实上确实如此,线程 N 的输出被局限在有 IO 事件的时间段,一旦连接断开,它又被阻塞了。
总结一下,ACE_Token实现一种可重入锁,当新的线程在锁上等待时会通知当前 owner,而 ACE_Select_Reactor_Token_T 派生自它,重载了通知机制,用于向 Reactor 发送唤醒通知。如果子线程也运行 Select_Reactor 的事件循环,会怎么样? 如果 ACE 允许用户这样做,那么一个线程在进入 handle_events 时会通知另一个线程从 select 中离开,从而交替通知,无法正常工作。所以目前 ACE 禁止这样做,它会在 handle_events 中判断当前调用线程 ID,如果不是 open 时的线程,就直接返回 -1,用户可以通过 owner() 方法改变运行事件循环的线程为子线程,但主线程在下一次 handle_events 调用时会因为相同的原因退出,从而保证当前只有一个线程调用事件循环。
如果用户在处理器回调(handle_input / handle_signal 等) 中调用 Reactor 接口修改其状态,又会如何呢? 答案是没问题,首先回调函数一般由侦测事件的线程来分派,所以这是同一个线程,不存在多线程竞争问题,其次 ACE_Token 本身是可重入锁,同一个线程在不释放它的情况下,可以再次取得它,只要最终释放的次数与取得的次数匹配即可。但 Select_Reactor 一旦在分派事件的过程中发现用户改变了自己的状态 (register/remove_handler),它将停止继续分派剩余的事件,立即进入下一轮的事件检测,这是需要注意的地方。
通过上面的分析,最后得到一个结论:如果你希望使用其它线程更新 Select_Reactor 的状态,最好不要禁用 Reactor 通知机制。
本文的附件是测试程序,可以使用编译开关 DISABLE_NOTIFY 在两种情景下切换,查看运行效果。
如果Timerout事件里面有一个大的任务在处理,会导致select事件分派的性能问题。比如,你给你的timeout sleep 60秒。这时候,select会等待解锁。
所以我觉得,在设计上,定时器的reactor最好和实际负责IO事件分派的reactor完全独立分开,互不影响比较好。 是的,这确实是 ACE_Select_Reactor 的最大问题,不光是在定时器,就是分派 IO 事件时,也不能在处理函数中堵塞,否则整个线程就挂死了。
页:
[1]