找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 6796|回复: 0

ACE 示例中的一个多线程问题分析

[复制链接]
发表于 2014-4-3 16:56:33 | 显示全部楼层 |阅读模式
本帖最后由 yunh 于 2014-4-3 16:59 编辑

        前段时间看 《ACE Programmer's Guide》中线程池一章中领导者/跟随者模型一节 (P.263),照着书上的代码写了个例子,运行起来以后,发现偶尔会有线程挂死,不能正常结束的情况。下面先把我写的代码贴上来。
  1. // lf.cpp : 定义控制台应用程序的入口点。
  2. //
  3. #include "stdafx.h"
  4. #include "ace/Task.h"
  5. #include "ace/OS_NS_unistd.h"
  6. #define ON_STACK
  7. //#define FOLLOWER_LOCK  // I think this lock is redundant
  8. class Follower_Cond : public ACE_Condition_Thread_Mutex
  9. {
  10. public:
  11.     Follower_Cond (ACE_Thread_Mutex &mutex)
  12.         : ACE_Condition_Thread_Mutex (mutex)
  13.         , owner_ (ACE_Thread::self ())
  14.     {
  15.     }
  16.     ACE_thread_t owner () const { return owner_; }
  17. private:
  18.     ACE_thread_t owner_;
  19. };
  20. class LF_ThreadPool : public ACE_Task <ACE_MT_SYNCH>
  21. {
  22. public:
  23.     enum {
  24.         MAX_WAITTIME = 200,
  25.     };
  26.     LF_ThreadPool ()
  27.         : shutdown_ (0)
  28.         , current_leader_ (0)
  29.     {
  30.         //msg_queue ()->open (1024*1024, 1024*1024);
  31.     }
  32.     void shutdown () { shutdown_ = 1; }
  33.     int done () { return shutdown_; }
  34.     int leader_active () { return current_leader_ != 0; }
  35.     void leader_active (ACE_thread_t leader) { current_leader_ = leader; }
  36.     void process_message (ACE_Message_Block *mb)
  37.     {
  38.         ACE_DEBUG ((LM_DEBUG, "(%t) process message: %s\n", mb->base ()));
  39.         mb->release ();
  40.         ACE_OS::sleep (ACE_Time_Value (0, 10));
  41.     }
  42.     virtual int svc ()
  43.     {
  44.         ACE_DEBUG ((LM_DEBUG, "(%t) start up\n"));
  45.         while (!done ())
  46.         {
  47.             become_leader ();
  48.             ACE_Message_Block *mb = 0;
  49.             ACE_Time_Value tv (0, MAX_WAITTIME);
  50.             tv += ACE_OS::gettimeofday ();
  51.             if(getq (mb, &tv) == -1)
  52.             {
  53.                 // notify the follower to exit too.
  54.                 elect_new_leader ();
  55.                 break;
  56.             }
  57.             elect_new_leader ();
  58.             process_message (mb);
  59.         }
  60.         ACE_DEBUG ((LM_DEBUG, "(%t) exit\n"));
  61.         return 0;
  62.     }
  63.     int become_leader ()
  64.     {
  65.         ACE_GUARD_RETURN (ACE_Thread_Mutex, mon, leader_lock_, -1);
  66.         if (leader_active ())
  67.         {
  68. #if defined (ON_STACK)
  69.             Follower_Cond cond_s (leader_lock_), *cond (&cond_s);
  70. #else
  71.             Follower_Cond *cond = 0;
  72.             ACE_NEW_RETURN (cond, Follower_Cond (leader_lock_), -1);
  73. #endif
  74.             {
  75. #if defined (FOLLOWER_LOCK)
  76.                 ACE_GUARD_RETURN (ACE_Thread_Mutex, guard, followers_lock_, -1);
  77. #endif
  78.                 followers_.enqueue_tail (cond);
  79.             }
  80.             while (leader_active ())
  81.                 cond->wait ();
  82. #if !defined (ON_STACK)
  83.             delete cond;
  84. #endif
  85.         }
  86.         ACE_DEBUG ((LM_DEBUG, "(%t) Becoming the leader.\n"));
  87.         leader_active (ACE_Thread::self ());
  88.         return 0;
  89.     }
  90.     int elect_new_leader ()
  91.     {
  92.         ACE_GUARD_RETURN (ACE_Thread_Mutex, mon, leader_lock_, -1);
  93.         leader_active (0);
  94.         if(!followers_.is_empty ())
  95.         {
  96.             Follower_Cond *cond = 0;
  97. #if defined (FOLLOWER_LOCK)
  98.             ACE_GUARD_RETURN (ACE_Thread_Mutex, guard, followers_lock_, -1);
  99. #endif
  100.             int ret = followers_.dequeue_head (cond);
  101.             ACE_ASSERT (ret == 0);
  102.             ACE_DEBUG ((LM_DEBUG, "(%t) Resigning and Electing %d\n", cond->owner ()));
  103.             cond->signal ();
  104.             return 0;
  105.         }
  106.         else
  107.             ACE_DEBUG ((LM_DEBUG, "(%t) Oops no followers left, size = %u\n", followers_.size ()));
  108.         
  109.         return -1;
  110.     }
  111. private:
  112.     int shutdown_;
  113.     ACE_thread_t current_leader_;
  114.     ACE_Thread_Mutex leader_lock_;
  115.     ACE_Unbounded_Queue <Follower_Cond *> followers_;
  116. #if defined (FOLLOWER_LOCK)
  117.     ACE_Thread_Mutex followers_lock_;
  118. #endif
  119. };
  120. int ACE_TMAIN(int argc, ACE_TCHAR* argv[])
  121. {
  122.     LF_ThreadPool pool;
  123.     ACE_Message_Block *mb = 0;
  124.     for (int i=0; i<10; ++ i)
  125.     {
  126.         ACE_NEW_RETURN (mb, ACE_Message_Block (64), -1);
  127.         ACE_OS::sprintf (mb->base (), "%d", i+1);
  128.         mb->wr_ptr (ACE_OS::strlen (mb->base ()));
  129.         pool.putq (mb);
  130.     }
  131.     pool.activate (THR_NEW_LWP | THR_JOINABLE, 3);
  132.     ACE_Thread_Manager::instance ()->wait ();
  133.     return 0;
  134. }
复制代码
       这个例子使用的ACE版本与书中使用的版本一致,都是 5.4.1,其实在目录 “ACE_wrappers\examples\APG\ThreadPools\LF_ThreadPool.cpp” 中有它完整的代码,我是根据书上的片断写成,稍有出入但原理相同。这个程序演示了 Leader-Follower 线程模型,主线程在 task 的队列上入队一定量的请求,再启动几个子线程,以这种模型处理队列上的请求。所有子线程都运行 LF_ThreadPool::svc 方法,它首先调用  become_leader 试图成为领导者线程,如果当前没有领导者,它就会直接获得这个角色,继续向下执行;否则会阻塞在等待上。当领导者处理完请求后,它调用 elect_new_leader,通过激发信号,释放之前等待的线程,领导者回归线程池,变为普通线程,再次等待在 become_leader 之上。当队列中没有更多请求时,线程直接退出循环,在退出之前,会调用 elect_new_leader 通知下一个等待者也退出,这样主线程在会合的所有子线程后,整个进程就可以得体的退出了。
        大部分情况下,这个例子可以正常运转,但是偶尔,有些线程无法正常退出,导致整个进程挂死在那里。下面是一次典型的挂死情况的输出:
  1. (8060) start up
  2. (8060) Becoming the leader.
  3. (8060) Oops no followers left, size = 0
  4. (10176) start up
  5. (10176) Becoming the leader.
  6. (8060) process message: 1
  7. (6804) start up
  8. (10176) Oops no followers left, size = 0
  9. (10176) process message: 2
  10. (8060) Becoming the leader.
  11. (8060) Resigning and Electing 6804
  12. (8060) process message: 3
  13. (6804) Becoming the leader.
  14. (6804) Resigning and Electing 10176
  15. (6804) process message: 4
  16. (10176) Becoming the leader.
  17. (10176) Resigning and Electing 8060
  18. (10176) process message: 5
  19. (8060) Becoming the leader.
  20. (8060) Resigning and Electing 6804
  21. (8060) process message: 6
  22. (10176) Becoming the leader.
  23. (10176) Oops no followers left, size = 0
  24. (10176) process message: 7
  25. (8060) Becoming the leader.
  26. (8060) Resigning and Electing 10176
  27. (8060) process message: 8
  28. (10176) Becoming the leader.
  29. (10176) Resigning and Electing 8060
  30. (10176) process message: 9
  31. (8060) Becoming the leader.
  32. (8060) Oops no followers left, size = 0
  33. (8060) process message: 10
  34. (10176) Becoming the leader.
  35. (10176) Oops no followers left, size = 0
  36. (10176) exit
  37. (8060) Becoming the leader.
  38. (8060) Oops no followers left, size = 0
  39. (8060) exit
复制代码
       启动了三个线程:8060、10176 与 6804,最后的 exit 输出表明 8060 与 10176 正常退出了,但 6804 的最后输出一直停留在 “process message: 4”。挂上调度器,检查线程,发现果然只有主线程与 6804 还在。
  1. >    9276    __tmainCRTStartup    ACE_OS::cond_wait    正常    0
  2.      6804    ace_thread_adapter    ACE_OS::cond_wait    正常    0
复制代码
       主线程 9276 的堆栈表示,它是在正常的 ACE_Thread_Manager::wait 处等待其它子线程退出:
  1. >    ACE5.4.1d.dll!ACE_OS::cond_wait(ACE_cond_t * cv=0x013dbdfc, _RTL_CRITICAL_SECTION * external_mutex=0x013dbde0)  行1712 + 0x11 字节    C++
  2.      ACE5.4.1d.dll!ACE_OS::cond_timedwait(ACE_cond_t * cv=0x013dbdfc, _RTL_CRITICAL_SECTION * external_mutex=0x013dbde0, ACE_Time_Value * timeout=0x00000000)  行1605 + 0xd 字节    C++
  3.      ACE5.4.1d.dll!ACE_Condition_Thread_Mutex::wait(ACE_Thread_Mutex & mutex={...}, const ACE_Time_Value * abstime=0x00000000)  行112 + 0x11 字节    C++
  4.      ACE5.4.1d.dll!ACE_Condition_Thread_Mutex::wait(const ACE_Time_Value * abstime=0x00000000)  行120    C++
  5.      ACE5.4.1d.dll!ACE_Thread_Manager::wait(const ACE_Time_Value * timeout=0x00000000, int abandon_detached_threads=0)  行1779 + 0xf 字节    C++
  6.      lf.exe!ace_main_i(int argc=1, char * * argv=0x013fe828)  行174 + 0x1d 字节    C++
  7.      lf.exe!ACE_Main::run_i(int argc=1, char * * argv=0x013fe828)  行161 + 0x30 字节    C++
  8.      ACE5.4.1d.dll!ACE_Main_Base::run(int argc=1, char * * argv=0x013fe828)  行94 + 0x16 字节    C++
  9.      ACE5.4.1d.dll!ace_os_main_i(ACE_Main_Base & mbase={...}, int argc=1, char * * argv=0x013fe828)  行101 + 0x10 字节    C++
  10.      lf.exe!main(int argc=1, char * * argv=0x013fe828)  行161 + 0x3a 字节    C++
  11.      lf.exe!__tmainCRTStartup()  行597 + 0x19 字节    C
  12.      lf.exe!mainCRTStartup()  行414    C
复制代码
       看来它就是在等待 6804,但这个线程又在做什么呢? 查看它的堆栈:
  1. >    ACE5.4.1d.dll!ACE_OS::cond_wait(ACE_cond_t * cv=0x01aafcd8, _RTL_CRITICAL_SECTION * external_mutex=0x0012fc88)  行1712 + 0x11 字节    C++
  2.      ACE5.4.1d.dll!ACE_Condition_Thread_Mutex::wait()  行102 + 0x10 字节    C++
  3.      lf.exe!LF_ThreadPool::become_leader()  行100 + 0xb 字节    C++
  4.      lf.exe!LF_ThreadPool::svc()  行58    C++
  5.      ACE5.4.1d.dll!ACE_Task_Base::svc_run(void * args=0x0012fc28)  行203 + 0xf 字节    C++
  6.      ACE5.4.1d.dll!ACE_Thread_Adapter::invoke_i()  行150 + 0x9 字节    C++
  7.      ACE5.4.1d.dll!ACE_Thread_Adapter::invoke()  行93 + 0xf 字节    C++
  8.      ACE5.4.1d.dll!ace_thread_adapter(void * args=0x013dc808)  行131 + 0xe 字节    C++
  9.      msvcr80d.dll!_callthreadstartex()  行348 + 0xf 字节    C
  10.      msvcr80d.dll!_threadstartex(void * ptd=0x013df618)  行331    C
复制代码
       它居然也在等待!点击 become_leader 调用,可以发现光标停留在 100 行:
  1.             {
  2. #if defined (FOLLOWER_LOCK)
  3.                 ACE_GUARD_RETURN (ACE_Thread_Mutex, guard, followers_lock_, -1);
  4. #endif
  5.                 followers_.enqueue_tail (cond);
  6.             }
  7.             while (leader_active ())
  8.                 cond->wait ();
复制代码
       就是上面的 cond->wait。这段代码的意思是,如果发现当前有领导者,就创建一个 condition,把它加入到全局队列,然后在上面等待,当领导者完成处理后,调用 elect_new_leader 会从队列中取出一个 condition,并激发它,从而让我可以继续执行。而且这里为了防止线程竞争,当从 condition 唤醒时,它第一件要做的事是去检查条件是否已经满足,如果不满足,则继续等待。因为有这种情况,当 leader 线程激发信号后,它很快的回到循环开始,再次进入 become_leader,又一次取得领导权,那么当这个线程唤醒要执行时,leader_active 条件还是假,所以它此时最好的选择还是继续等待,难道这有什么问题吗? 对了!它并没有再次将 condition 加入队列,导致其它线程无法通知它!从而形成一个消失的线程,虽然存在,却不能提供任何服务。为了验证这一点,将上面的代码段改为如下:
  1.             int n = 0;
  2.             while (leader_active ())
  3.             {
  4.                 ACE_DEBUG ((LM_DEBUG, "(%t) waiting to become leader, %u times\n", ++n));
  5.                 // NOTE: this re-enqueue very important,
  6.                 // as when we signalled by other thread,
  7.                 // we have dequeued, if someone else
  8.                 // become leader during the gap,
  9.                 // and we go to wait again without enqueue,
  10.                 // we will never get notified.
  11.                 followers_.enqueue_tail (cond);
  12.                 cond->wait ();
  13.                 ACE_DEBUG ((LM_DEBUG, "(%t) get signalled\n"));
  14.             }
复制代码
       这一次,不仅在每次等待前入队 condition,还打了一些日志帮我们更好的看清这一现象。如果输出的 times > 0,就表示发生过类似这样的情况,而这种情况绝对是导致上述问题的根源。编译重运行,输出如下:
  1. (10012) start up
  2. (10012) Becoming the leader.
  3. (10012) Oops no followers left, size = 0
  4. (10012) process message: 1
  5. (10012) Becoming the leader.
  6. (10012) Oops no followers left, size = 0
  7. (10012) process message: 2
  8. (9896) start up
  9. (3688) start up
  10. (10012) Becoming the leader.
  11. (9896) waiting to become leader, 1 times
  12. (3688) waiting to become leader, 1 times
  13. (10012) Resigning and Electing 9896
  14. (10012) process message: 3
  15. (9896) get signalled
  16. (9896) Becoming the leader.
  17. (10012) waiting to become leader, 1 times
  18. (9896) Resigning and Electing 3688
  19. (9896) process message: 4
  20. (3688) get signalled
  21. (3688) Becoming the leader.
  22. (9896) waiting to become leader, 1 times
  23. (3688) Resigning and Electing 10012
  24. (3688) process message: 5
  25. (10012) get signalled
  26. (10012) Becoming the leader.
  27. (3688) waiting to become leader, 1 times
  28. (10012) Resigning and Electing 9896
  29. (10012) process message: 6
  30. (9896) get signalled
  31. (9896) Becoming the leader.
  32. (10012) waiting to become leader, 1 times
  33. (9896) Resigning and Electing 3688
  34. (9896) process message: 7
  35. (3688) get signalled
  36. (3688) Becoming the leader.
  37. (3688) Resigning and Electing 10012
  38. (9896) Becoming the leader.
  39. (3688) process message: 8
  40. (9896) Oops no followers left, size = 0
  41. (9896) process message: 9
  42. (10012) get signalled
  43. (10012) Becoming the leader.
  44. (3688) waiting to become leader, 1 times
  45. (9896) waiting to become leader, 1 times
  46. (10012) Resigning and Electing 3688
  47. (10012) process message: 10
  48. (3688) get signalled
  49. (3688) Becoming the leader.
  50. (3688) Resigning and Electing 9896
  51. (3688) exit
  52. (10012) Becoming the leader.
  53. (9896) get signalled
  54. (9896) waiting to become leader, 2 times
  55. (10012) Resigning and Electing 9896
  56. (10012) exit
  57. (9896) get signalled
  58. (9896) Becoming the leader.
  59. (9896) Oops no followers left, size = 0
  60. (9896) exit
  61. 请按任意键继续. . .
复制代码
       为了得到这个现象,我试了很多次,这是多线程调试的一个复杂性之一——并不是每次都出问题。如果你希望看到更明显的输出,可以将初始入队的消息个数与线程个数都增加一个数量级。这次的情况还是3个线程,其中 3688 在选择 9896 当领导者后,10012 突然插进来当了领导者,导致 9896 虽然被唤醒,都不得不再次等待,当 10012 完成其角色后,这才通知 9896 继续工作,整个程序正常退出,没有卡死现象了。看来问题就是刚才我们分析的没有再次入队 condition,使用 condition 习惯了以后,容易写出这样的代码:  
  1.           while (leader_active ())
  2.                 cond->wait ();
复制代码
       但是这里背景有稍许不同,结果就大相径庭了,如果用在工业环境中,你会发现你的线程池中真正工作的线程越来越少,原因呢却不是那么明显,足以让一个程序员为之焦头烂额,不过这只是 ACE 提供的一个例子,并不是库代码,所以我们大可以放心好了,库代码还是经过千锤百炼滴~
        最后,ACE 本身提供的那个例子也有这个问题,而且据我目测到 6.1.9 版本这个问题还存在,所以如果你在项目中使用这个例子,还是把它改掉为好。

您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-3-29 15:39 , Processed in 0.012402 second(s), 7 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表