本帖最后由 yunh 于 2014-4-3 16:59 编辑
前段时间看 《ACE Programmer's Guide》中线程池一章中领导者/跟随者模型一节 (P.263),照着书上的代码写了个例子,运行起来以后,发现偶尔会有线程挂死,不能正常结束的情况。下面先把我写的代码贴上来。- // lf.cpp : 定义控制台应用程序的入口点。
- //
- #include "stdafx.h"
- #include "ace/Task.h"
- #include "ace/OS_NS_unistd.h"
- #define ON_STACK
- //#define FOLLOWER_LOCK // I think this lock is redundant
- class Follower_Cond : public ACE_Condition_Thread_Mutex
- {
- public:
- Follower_Cond (ACE_Thread_Mutex &mutex)
- : ACE_Condition_Thread_Mutex (mutex)
- , owner_ (ACE_Thread::self ())
- {
- }
- ACE_thread_t owner () const { return owner_; }
- private:
- ACE_thread_t owner_;
- };
- class LF_ThreadPool : public ACE_Task <ACE_MT_SYNCH>
- {
- public:
- enum {
- MAX_WAITTIME = 200,
- };
- LF_ThreadPool ()
- : shutdown_ (0)
- , current_leader_ (0)
- {
- //msg_queue ()->open (1024*1024, 1024*1024);
- }
- void shutdown () { shutdown_ = 1; }
- int done () { return shutdown_; }
- int leader_active () { return current_leader_ != 0; }
- void leader_active (ACE_thread_t leader) { current_leader_ = leader; }
- void process_message (ACE_Message_Block *mb)
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) process message: %s\n", mb->base ()));
- mb->release ();
- ACE_OS::sleep (ACE_Time_Value (0, 10));
- }
- virtual int svc ()
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) start up\n"));
- while (!done ())
- {
- become_leader ();
- ACE_Message_Block *mb = 0;
- ACE_Time_Value tv (0, MAX_WAITTIME);
- tv += ACE_OS::gettimeofday ();
- if(getq (mb, &tv) == -1)
- {
- // notify the follower to exit too.
- elect_new_leader ();
- break;
- }
- elect_new_leader ();
- process_message (mb);
- }
- ACE_DEBUG ((LM_DEBUG, "(%t) exit\n"));
- return 0;
- }
- int become_leader ()
- {
- ACE_GUARD_RETURN (ACE_Thread_Mutex, mon, leader_lock_, -1);
- if (leader_active ())
- {
- #if defined (ON_STACK)
- Follower_Cond cond_s (leader_lock_), *cond (&cond_s);
- #else
- Follower_Cond *cond = 0;
- ACE_NEW_RETURN (cond, Follower_Cond (leader_lock_), -1);
- #endif
- {
- #if defined (FOLLOWER_LOCK)
- ACE_GUARD_RETURN (ACE_Thread_Mutex, guard, followers_lock_, -1);
- #endif
- followers_.enqueue_tail (cond);
- }
- while (leader_active ())
- cond->wait ();
- #if !defined (ON_STACK)
- delete cond;
- #endif
- }
- ACE_DEBUG ((LM_DEBUG, "(%t) Becoming the leader.\n"));
- leader_active (ACE_Thread::self ());
- return 0;
- }
- int elect_new_leader ()
- {
- ACE_GUARD_RETURN (ACE_Thread_Mutex, mon, leader_lock_, -1);
- leader_active (0);
- if(!followers_.is_empty ())
- {
- Follower_Cond *cond = 0;
- #if defined (FOLLOWER_LOCK)
- ACE_GUARD_RETURN (ACE_Thread_Mutex, guard, followers_lock_, -1);
- #endif
- int ret = followers_.dequeue_head (cond);
- ACE_ASSERT (ret == 0);
- ACE_DEBUG ((LM_DEBUG, "(%t) Resigning and Electing %d\n", cond->owner ()));
- cond->signal ();
- return 0;
- }
- else
- ACE_DEBUG ((LM_DEBUG, "(%t) Oops no followers left, size = %u\n", followers_.size ()));
-
- return -1;
- }
- private:
- int shutdown_;
- ACE_thread_t current_leader_;
- ACE_Thread_Mutex leader_lock_;
- ACE_Unbounded_Queue <Follower_Cond *> followers_;
- #if defined (FOLLOWER_LOCK)
- ACE_Thread_Mutex followers_lock_;
- #endif
- };
- int ACE_TMAIN(int argc, ACE_TCHAR* argv[])
- {
- LF_ThreadPool pool;
- ACE_Message_Block *mb = 0;
- for (int i=0; i<10; ++ i)
- {
- ACE_NEW_RETURN (mb, ACE_Message_Block (64), -1);
- ACE_OS::sprintf (mb->base (), "%d", i+1);
- mb->wr_ptr (ACE_OS::strlen (mb->base ()));
- pool.putq (mb);
- }
- pool.activate (THR_NEW_LWP | THR_JOINABLE, 3);
- ACE_Thread_Manager::instance ()->wait ();
- return 0;
- }
复制代码 这个例子使用的ACE版本与书中使用的版本一致,都是 5.4.1,其实在目录 “ACE_wrappers\examples\APG\ThreadPools\LF_ThreadPool.cpp” 中有它完整的代码,我是根据书上的片断写成,稍有出入但原理相同。这个程序演示了 Leader-Follower 线程模型,主线程在 task 的队列上入队一定量的请求,再启动几个子线程,以这种模型处理队列上的请求。所有子线程都运行 LF_ThreadPool::svc 方法,它首先调用 become_leader 试图成为领导者线程,如果当前没有领导者,它就会直接获得这个角色,继续向下执行;否则会阻塞在等待上。当领导者处理完请求后,它调用 elect_new_leader,通过激发信号,释放之前等待的线程,领导者回归线程池,变为普通线程,再次等待在 become_leader 之上。当队列中没有更多请求时,线程直接退出循环,在退出之前,会调用 elect_new_leader 通知下一个等待者也退出,这样主线程在会合的所有子线程后,整个进程就可以得体的退出了。
大部分情况下,这个例子可以正常运转,但是偶尔,有些线程无法正常退出,导致整个进程挂死在那里。下面是一次典型的挂死情况的输出:- (8060) start up
- (8060) Becoming the leader.
- (8060) Oops no followers left, size = 0
- (10176) start up
- (10176) Becoming the leader.
- (8060) process message: 1
- (6804) start up
- (10176) Oops no followers left, size = 0
- (10176) process message: 2
- (8060) Becoming the leader.
- (8060) Resigning and Electing 6804
- (8060) process message: 3
- (6804) Becoming the leader.
- (6804) Resigning and Electing 10176
- (6804) process message: 4
- (10176) Becoming the leader.
- (10176) Resigning and Electing 8060
- (10176) process message: 5
- (8060) Becoming the leader.
- (8060) Resigning and Electing 6804
- (8060) process message: 6
- (10176) Becoming the leader.
- (10176) Oops no followers left, size = 0
- (10176) process message: 7
- (8060) Becoming the leader.
- (8060) Resigning and Electing 10176
- (8060) process message: 8
- (10176) Becoming the leader.
- (10176) Resigning and Electing 8060
- (10176) process message: 9
- (8060) Becoming the leader.
- (8060) Oops no followers left, size = 0
- (8060) process message: 10
- (10176) Becoming the leader.
- (10176) Oops no followers left, size = 0
- (10176) exit
- (8060) Becoming the leader.
- (8060) Oops no followers left, size = 0
- (8060) exit
复制代码 启动了三个线程:8060、10176 与 6804,最后的 exit 输出表明 8060 与 10176 正常退出了,但 6804 的最后输出一直停留在 “process message: 4”。挂上调度器,检查线程,发现果然只有主线程与 6804 还在。- > 9276 __tmainCRTStartup ACE_OS::cond_wait 正常 0
- 6804 ace_thread_adapter ACE_OS::cond_wait 正常 0
复制代码 主线程 9276 的堆栈表示,它是在正常的 ACE_Thread_Manager::wait 处等待其它子线程退出:- > ACE5.4.1d.dll!ACE_OS::cond_wait(ACE_cond_t * cv=0x013dbdfc, _RTL_CRITICAL_SECTION * external_mutex=0x013dbde0) 行1712 + 0x11 字节 C++
- ACE5.4.1d.dll!ACE_OS::cond_timedwait(ACE_cond_t * cv=0x013dbdfc, _RTL_CRITICAL_SECTION * external_mutex=0x013dbde0, ACE_Time_Value * timeout=0x00000000) 行1605 + 0xd 字节 C++
- ACE5.4.1d.dll!ACE_Condition_Thread_Mutex::wait(ACE_Thread_Mutex & mutex={...}, const ACE_Time_Value * abstime=0x00000000) 行112 + 0x11 字节 C++
- ACE5.4.1d.dll!ACE_Condition_Thread_Mutex::wait(const ACE_Time_Value * abstime=0x00000000) 行120 C++
- ACE5.4.1d.dll!ACE_Thread_Manager::wait(const ACE_Time_Value * timeout=0x00000000, int abandon_detached_threads=0) 行1779 + 0xf 字节 C++
- lf.exe!ace_main_i(int argc=1, char * * argv=0x013fe828) 行174 + 0x1d 字节 C++
- lf.exe!ACE_Main::run_i(int argc=1, char * * argv=0x013fe828) 行161 + 0x30 字节 C++
- ACE5.4.1d.dll!ACE_Main_Base::run(int argc=1, char * * argv=0x013fe828) 行94 + 0x16 字节 C++
- ACE5.4.1d.dll!ace_os_main_i(ACE_Main_Base & mbase={...}, int argc=1, char * * argv=0x013fe828) 行101 + 0x10 字节 C++
- lf.exe!main(int argc=1, char * * argv=0x013fe828) 行161 + 0x3a 字节 C++
- lf.exe!__tmainCRTStartup() 行597 + 0x19 字节 C
- lf.exe!mainCRTStartup() 行414 C
复制代码 看来它就是在等待 6804,但这个线程又在做什么呢? 查看它的堆栈:- > ACE5.4.1d.dll!ACE_OS::cond_wait(ACE_cond_t * cv=0x01aafcd8, _RTL_CRITICAL_SECTION * external_mutex=0x0012fc88) 行1712 + 0x11 字节 C++
- ACE5.4.1d.dll!ACE_Condition_Thread_Mutex::wait() 行102 + 0x10 字节 C++
- lf.exe!LF_ThreadPool::become_leader() 行100 + 0xb 字节 C++
- lf.exe!LF_ThreadPool::svc() 行58 C++
- ACE5.4.1d.dll!ACE_Task_Base::svc_run(void * args=0x0012fc28) 行203 + 0xf 字节 C++
- ACE5.4.1d.dll!ACE_Thread_Adapter::invoke_i() 行150 + 0x9 字节 C++
- ACE5.4.1d.dll!ACE_Thread_Adapter::invoke() 行93 + 0xf 字节 C++
- ACE5.4.1d.dll!ace_thread_adapter(void * args=0x013dc808) 行131 + 0xe 字节 C++
- msvcr80d.dll!_callthreadstartex() 行348 + 0xf 字节 C
- msvcr80d.dll!_threadstartex(void * ptd=0x013df618) 行331 C
复制代码 它居然也在等待!点击 become_leader 调用,可以发现光标停留在 100 行:- {
- #if defined (FOLLOWER_LOCK)
- ACE_GUARD_RETURN (ACE_Thread_Mutex, guard, followers_lock_, -1);
- #endif
- followers_.enqueue_tail (cond);
- }
- while (leader_active ())
- cond->wait ();
复制代码 就是上面的 cond->wait。这段代码的意思是,如果发现当前有领导者,就创建一个 condition,把它加入到全局队列,然后在上面等待,当领导者完成处理后,调用 elect_new_leader 会从队列中取出一个 condition,并激发它,从而让我可以继续执行。而且这里为了防止线程竞争,当从 condition 唤醒时,它第一件要做的事是去检查条件是否已经满足,如果不满足,则继续等待。因为有这种情况,当 leader 线程激发信号后,它很快的回到循环开始,再次进入 become_leader,又一次取得领导权,那么当这个线程唤醒要执行时,leader_active 条件还是假,所以它此时最好的选择还是继续等待,难道这有什么问题吗? 对了!它并没有再次将 condition 加入队列,导致其它线程无法通知它!从而形成一个消失的线程,虽然存在,却不能提供任何服务。为了验证这一点,将上面的代码段改为如下:- int n = 0;
- while (leader_active ())
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) waiting to become leader, %u times\n", ++n));
- // NOTE: this re-enqueue very important,
- // as when we signalled by other thread,
- // we have dequeued, if someone else
- // become leader during the gap,
- // and we go to wait again without enqueue,
- // we will never get notified.
- followers_.enqueue_tail (cond);
- cond->wait ();
- ACE_DEBUG ((LM_DEBUG, "(%t) get signalled\n"));
- }
复制代码 这一次,不仅在每次等待前入队 condition,还打了一些日志帮我们更好的看清这一现象。如果输出的 times > 0,就表示发生过类似这样的情况,而这种情况绝对是导致上述问题的根源。编译重运行,输出如下:- (10012) start up
- (10012) Becoming the leader.
- (10012) Oops no followers left, size = 0
- (10012) process message: 1
- (10012) Becoming the leader.
- (10012) Oops no followers left, size = 0
- (10012) process message: 2
- (9896) start up
- (3688) start up
- (10012) Becoming the leader.
- (9896) waiting to become leader, 1 times
- (3688) waiting to become leader, 1 times
- (10012) Resigning and Electing 9896
- (10012) process message: 3
- (9896) get signalled
- (9896) Becoming the leader.
- (10012) waiting to become leader, 1 times
- (9896) Resigning and Electing 3688
- (9896) process message: 4
- (3688) get signalled
- (3688) Becoming the leader.
- (9896) waiting to become leader, 1 times
- (3688) Resigning and Electing 10012
- (3688) process message: 5
- (10012) get signalled
- (10012) Becoming the leader.
- (3688) waiting to become leader, 1 times
- (10012) Resigning and Electing 9896
- (10012) process message: 6
- (9896) get signalled
- (9896) Becoming the leader.
- (10012) waiting to become leader, 1 times
- (9896) Resigning and Electing 3688
- (9896) process message: 7
- (3688) get signalled
- (3688) Becoming the leader.
- (3688) Resigning and Electing 10012
- (9896) Becoming the leader.
- (3688) process message: 8
- (9896) Oops no followers left, size = 0
- (9896) process message: 9
- (10012) get signalled
- (10012) Becoming the leader.
- (3688) waiting to become leader, 1 times
- (9896) waiting to become leader, 1 times
- (10012) Resigning and Electing 3688
- (10012) process message: 10
- (3688) get signalled
- (3688) Becoming the leader.
- (3688) Resigning and Electing 9896
- (3688) exit
- (10012) Becoming the leader.
- (9896) get signalled
- (9896) waiting to become leader, 2 times
- (10012) Resigning and Electing 9896
- (10012) exit
- (9896) get signalled
- (9896) Becoming the leader.
- (9896) Oops no followers left, size = 0
- (9896) exit
- 请按任意键继续. . .
复制代码 为了得到这个现象,我试了很多次,这是多线程调试的一个复杂性之一——并不是每次都出问题。如果你希望看到更明显的输出,可以将初始入队的消息个数与线程个数都增加一个数量级。这次的情况还是3个线程,其中 3688 在选择 9896 当领导者后,10012 突然插进来当了领导者,导致 9896 虽然被唤醒,都不得不再次等待,当 10012 完成其角色后,这才通知 9896 继续工作,整个程序正常退出,没有卡死现象了。看来问题就是刚才我们分析的没有再次入队 condition,使用 condition 习惯了以后,容易写出这样的代码: - while (leader_active ())
- cond->wait ();
复制代码 但是这里背景有稍许不同,结果就大相径庭了,如果用在工业环境中,你会发现你的线程池中真正工作的线程越来越少,原因呢却不是那么明显,足以让一个程序员为之焦头烂额,不过这只是 ACE 提供的一个例子,并不是库代码,所以我们大可以放心好了,库代码还是经过千锤百炼滴~
最后,ACE 本身提供的那个例子也有这个问题,而且据我目测到 6.1.9 版本这个问题还存在,所以如果你在项目中使用这个例子,还是把它改掉为好。
|