站在 Select_Reactor 的肩膀上,TP_Reactor 只做了少量修改,就实现了上述线程模型,具体罗列如下:
1. 固定打开 Reactor Notify 机制;
2. 固定压制分派通知事件的 renew 调用 (通过 supress_notify_renew 接口);
3. 侦测到事件后,一个线程只分派一个事件就返回到事件循环开始处;
4. 在分派事件前,先释放 Token;
5. 对于 IO 事件,分派前先 suspend 之;分派后,再 resume 之;
6. 事件循环中使用 ACE_TP_Token_Guard 取代一般的 ACE_GUARD 宏来获取 Token。
下面分两个场景分析,第一个场景是主线程 M 运行 TP_Reactor 事件循环,子线程 N 只调用 Reactor 接口更新其状态,如 schedule_timer,线程函数如下:
- while (thr_mgr ()->testcancel (ACE_Thread::self ()) == 0)
- {
- if(timer_ == 0)
- {
- timer_ = reactor ()->schedule_timer (this, 0, ACE_Time_Value(1, 0));
- ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) schedule timer %u.\n", timer_)));
- }
- else
- {
- reactor ()->cancel_timer (timer_);
- ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) cancel timer %u.\n", timer_)));
- timer_ = 0;
- }
- ACE_OS::sleep (3);
- }
复制代码 同 Select_Reactor 的情况一样,线程 N 在调用接口前加锁,ACE_Token 的加锁通知机制会回调当前锁的拥有者线程 M 之 sleep_hook 方法,在 ACE_Select_Reactor_Token_T 的重载中,向 Reactor 发送 Notify 事件,导致线程 M 从 select 中唤醒,释放 Token,使得线程 N 成功获取锁,从而修改 Reactor 状态。之后线程 N 释放锁,又导致线程 M 获取锁成功,可以继续运行事件循环。注意这里与 Select_Reactor 不同的是,不在分派 Notify 事件时调用 renew (即使调用也无效,因为设置了 supress_notify_renew_ 标志),所以线程 M 是在回到循环开始处等待,而不是在分派事件时等待。运行测试程序,在没有任何连接的情况下,线程 N 可以正常调用 Reactor 接口并输出:
- (8036) acceptor opened.
- (2628) schedule timer 30277332.
- (8036) handle timeout !
- (2628) cancel timer 30277332.
- (2628) schedule timer 30277332.
- (8036) handle timeout !
- (2628) cancel timer 30277332.
复制代码 第二个场景是主线程 M 与子线程 N 同时运行事件循环,线程函数如下:
- ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) start reactor event loop !\n")));
- //reactor ()->owner (ACE_Thread::self ());
- reactor ()->run_reactor_event_loop ();
- ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) end reactor event loop !\n")));
复制代码 与 Select_Reactor 不同,TP_Reactor 在调用事件循环前,不需要指定线程为 owner。假定线程 M 先获取锁,TP_Reactor 的 handle_events 中加锁代码如下:
- ACE_TP_Token_Guard guard (this->token_);
- int result = guard.grab_token (max_wait_time);
复制代码 这里没有像 Select_Reactor 一样使用一般的 ACE_GUARD_RETURN 宏,后者间接定义的是 ACE_Guard 模板对象,在构造器中默认会调用 LOCK 的 acquire 方法;而 ACE_TP_Token_Guard 则不会在构造器中调用任何锁的方法,而是对象构造后,使用 grab_token/acquire_token 来显示获取锁,析构器中调用 release 则是一致的。grab_token 的主要调用代码如下:
- ACE_MT (result = this->token_.acquire_read (&ACE_TP_Reactor::no_op_sleep_hook));
复制代码 这里有两个明显区别,一是调用 acqure_read,ACE_Token 有两个等待队列,读队列与写队列,写队列拥有更高的优先级,其它接口一般是调用 acqure 在写队列上等待,也就是说,如果两个线程,一个想运行事件循环,一个想修改 TP_Reactor 状态,那么后者将先被调度;二是提供锁的 sleep_hook 参数,这样一来,当取得锁失败 (被其它线程占有),它将调用这个传入的参数,而不是锁自己的同名回调函数,而传入的这个函数 no_op_sleep_hook 顾名思义是一个空方法,也就是说,它覆盖了默认向 Reactor 发送通知的实现,当多个线程同时运行事件循环时,保持当前线程处在 select 调用中。否则就如同之前对 Select_Reactor 的分析一样,多个线程会交替通知,导致 Reactor 无法正常工作。
如果此时客户端连接服务器并发送数据,线程 M 从 select 中唤醒,处理 IO 事件,在处理前,先将此句柄从 Reactor 中挂起 (suspend),接着释放 Token,然后开始分派事件,最后从 Reactor 中恢复该句柄。在 Select_Reactor 中挂起句柄就是从侦听的句柄集合中移到备用集合,恢复操作正好相反,这样的处理主要是考虑到当线程 M 还未分派此句柄上的事件时,线程 N 进入 select 发现该句柄仍处在激活状态,并尝试对它继续分派,造成多次重复分派。开始分派事件前,线程 M 的侦测工作就正式结束了,所以它释放锁,这会唤醒在锁上等待的线程 N,线程 N 获取锁后,进入 select 接着再侦测事件,由此完成了一次线程交接班。线程 M 分派完事件后,又会回到事件循环开始,等待进入锁,如果整个线程池只有 M 一个线程,那就只有等它分派完事件后,才可以再次进行事件检测了,这就退化到了和 Select_Reactor 一致的情形。使用 TP_Reactor 后,分派事件的线程确实不在被限制为主线程了:
- (7436) acceptor opened.
- (5436) start reactor event loop !
- (7436) Connection from 127.0.0.1:62283
- (5436) recv message: Iteration 1.
- (7436) recv message: Iteration 2.
- (5436) recv message: Iteration 3.
- (7436) recv message: Iteration 4.
- (5436) recv message: Iteration 5.
- (7436) Connection closed.
- (6148|7436) handle_close
复制代码 可以看到主线程 7436 与子线程 5436 都有机会分派事件(实际上基本是交替进行的)。到这里呢,就基本分析完了,但有一点还需要着重说明一下,就是在处理器回调(handle_input/output/signal) 中调用 Reactor 接口会有什么变化,例如当 Acceptor 的 handle_input 被调用时,它一般会调用 register_handler 将新建立的连接注册到 Reactor 中,在 Select_Reactor 中,由于分派事件的线程与负责事件循环的线程是一个线程,register_handler 在进锁时只增加锁的重入次数,所以这不成问题;但在 TP_Reactor 中,分派事件的线程往往不是当前侦测事件的线程,register_handler 在进锁时,会发送 Notify 事件将当前阻塞在 select 上的线程唤醒,并在锁上等待,由于它进入的是 Writer 队列,比其它所有在 Reader 队列上等待的空闲线程优先级都高,所以可以保证 Leader 线程在release 锁时,它第一个获取锁。其实之前曾经提到,一个线程在派发事件前后,会分别调用 suspend_handler/resume_handler 来临时挂起句柄,代码如下:
- // Suspend the handler so that other threads don't start dispatching
- // it.
- //
- // NOTE: This check was performed in older versions of the
- // TP_Reactor. Looks like it is a waste..
- if (dispatch_info.event_handler_ != this->notify_handler_)
- this->suspend_i (dispatch_info.handle_); // SUSPENDED
- int resume_flag =
- dispatch_info.event_handler_->resume_handler ();
- int reference_counting_required =
- dispatch_info.event_handler_->reference_counting_policy ().value () ==
- ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
- // Call add_reference() if needed.
- if (reference_counting_required)
- {
- dispatch_info.event_handler_->add_reference ();
- }
- // Release the lock. Others threads can start waiting.
- guard.release_token ();
- int result = 0;
- // If there was an event handler ready, dispatch it.
- // Decrement the event left
- --event_count;
- // Dispatched an event
- if (this->dispatch_socket_event (dispatch_info) == 0)
- ++result;
- // Resume handler if required.
- if (dispatch_info.event_handler_ != this->notify_handler_ &&
- resume_flag == ACE_Event_Handler::ACE_REACTOR_RESUMES_HANDLER)
- this->resume_handler (dispatch_info.handle_); // RESUMED
复制代码 看出什么不同来了吗? release_token 前的调用是 suspend_i,即内部版本,不需要加锁,而 release_token 后的调用是 resume_handler,即外部版本,需要加锁。前面的很好理解,当前锁还未释放,所以可以直接调用内部版本,后面的也好理解,锁已经释放,事件已经派发完毕,完成 Leader 到 Follower 角色转变,所以调用接口要使用加锁版本。但这样一来,运行效率必然大大降低,因为 Follower 刚转变成 Leader,就被通知要修改 Reactor 状态,从 select 跳出,进入线程池等待,直到 resume_handler 完成,才可以重新进入事件检测。每次处理事件,都要来这么一下,对性能的影响究竟有多大,我在 Token 的 sleep_hook 中插入一段代码:
- template <class ACE_SELECT_REACTOR_MUTEX> void
- ACE_Select_Reactor_Token_T<ACE_SELECT_REACTOR_MUTEX>::sleep_hook (void)
- {
- ACE_TRACE ("ACE_Select_Reactor_Token_T::sleep_hook");
- ACE_DEBUG ((LM_DEBUG, "(%t) wake leader thread up\n")); // ADDED
- if (this->select_reactor_->notify () == -1)
- ACE_ERROR ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("sleep_hook failed")));
- }
复制代码 重新编译 ACE,观察输出如下:
- (7780) acceptor opened.
- (4636) start reactor event loop !
- (7780) wake leader thread up
- (7780) Connection from 127.0.0.1:56831
- (4636) recv message: Iteration 1.
- (4636) wake leader thread up
- (7780) recv message: Iteration 2.
- (7780) wake leader thread up
- (4636) recv message: Iteration 3.
- (4636) wake leader thread up
- (7780) recv message: Iteration 4.
- (7780) wake leader thread up
- (4636) recv message: Iteration 5.
- (4636) wake leader thread up
- (7780) Connection closed.
- (7780) wake leader thread up
- (3392|7780) handle_close
- (7780) wake leader thread up
复制代码 与预料的一样,几乎每次事件处理,都附加着对 leader 线程的通知,这不得不说是一个瑕疵。此外,从整个 TP_Reactor 的运行原理来看,可以看出,当一个句柄在分派事件时,它是不在 select 检测的句柄集中的,所以此时发生的任何事件,Reactor 都不再通知,除非事件处理器及时返回,否则会影响对句柄上其它事件的检测。另外句柄上的多种事件的回调函数不存在多线程竞争关系,因为一次只能回调一种,用户需要同步的应当是 handle_signal/timeout 与其它 IO 事件回调函数,IO 之间的回调不需要同步。如果回调函数返回值大于0,还可以持续获得该线程的回调,ACE 代码如下:
- // Upcall. If the handler returns positive value (requesting a
- // reactor callback) don't set the ready-bit because it will be
- // ignored if the reactor state has changed. Just call back
- // as many times as the handler requests it. Other threads are off
- // handling other things.
- int status = 1;
- while (status > 0)
- status = (event_handler->*callback) (handle);
复制代码 因为当前线程已经脱离了 leader 角色,再次回调并不影响整个事件循环。如果线程池中的线程远远少于并发连接,那么就有可能存在这样一种情景,所有线程都忙于派发事件,而没有线程可以回到线程池并继续检测事件,此时其它链路上有事件发生时,将得不到及时的处理,所以一定要为你的应用提供足够多的线程来运行 TP_Reactor。
至此整个分析就基本完成,附件是测试程序,与 Select_Reactor 的基本相同,只是将实现换为了 ACE_TP_Reactor 而已(可以通过 USE_TP_Reactor 切换),两种情景中线程函数的切换位于 Client_Service::svc 中,通过改变 #if 条件,可以测试不同的情形,默认是情形 2。