yunh 发表于 2014-4-3 16:56:33

ACE 示例中的一个多线程问题分析

本帖最后由 yunh 于 2014-4-3 16:59 编辑

      前段时间看 《ACE Programmer's Guide》中线程池一章中领导者/跟随者模型一节 (P.263),照着书上的代码写了个例子,运行起来以后,发现偶尔会有线程挂死,不能正常结束的情况。下面先把我写的代码贴上来。
// lf.cpp : 定义控制台应用程序的入口点。
//

#include "stdafx.h"
#include "ace/Task.h"
#include "ace/OS_NS_unistd.h"

#define ON_STACK
//#define FOLLOWER_LOCK// I think this lock is redundant

class Follower_Cond : public ACE_Condition_Thread_Mutex
{
public:
    Follower_Cond (ACE_Thread_Mutex &mutex)
      : ACE_Condition_Thread_Mutex (mutex)
      , owner_ (ACE_Thread::self ())
    {
    }

    ACE_thread_t owner () const { return owner_; }

private:
    ACE_thread_t owner_;
};

class LF_ThreadPool : public ACE_Task <ACE_MT_SYNCH>
{
public:
    enum {
      MAX_WAITTIME = 200,
    };

    LF_ThreadPool ()
      : shutdown_ (0)
      , current_leader_ (0)
    {
      //msg_queue ()->open (1024*1024, 1024*1024);
    }

    void shutdown () { shutdown_ = 1; }
    int done () { return shutdown_; }
    int leader_active () { return current_leader_ != 0; }
    void leader_active (ACE_thread_t leader) { current_leader_ = leader; }
    void process_message (ACE_Message_Block *mb)
    {
      ACE_DEBUG ((LM_DEBUG, "(%t) process message: %s\n", mb->base ()));
      mb->release ();
      ACE_OS::sleep (ACE_Time_Value (0, 10));
    }

    virtual int svc ()
    {
      ACE_DEBUG ((LM_DEBUG, "(%t) start up\n"));
      while (!done ())
      {
            become_leader ();

            ACE_Message_Block *mb = 0;
            ACE_Time_Value tv (0, MAX_WAITTIME);
            tv += ACE_OS::gettimeofday ();

            if(getq (mb, &tv) == -1)
            {
                // notify the follower to exit too.
                elect_new_leader ();

                break;
            }

            elect_new_leader ();
            process_message (mb);
      }

      ACE_DEBUG ((LM_DEBUG, "(%t) exit\n"));
      return 0;
    }


    int become_leader ()
    {
      ACE_GUARD_RETURN (ACE_Thread_Mutex, mon, leader_lock_, -1);
      if (leader_active ())
      {
#if defined (ON_STACK)
            Follower_Cond cond_s (leader_lock_), *cond (&cond_s);
#else
            Follower_Cond *cond = 0;
            ACE_NEW_RETURN (cond, Follower_Cond (leader_lock_), -1);
#endif

            {
#if defined (FOLLOWER_LOCK)
                ACE_GUARD_RETURN (ACE_Thread_Mutex, guard, followers_lock_, -1);
#endif
                followers_.enqueue_tail (cond);
            }

            while (leader_active ())
                cond->wait ();

#if !defined (ON_STACK)
            delete cond;
#endif
      }

      ACE_DEBUG ((LM_DEBUG, "(%t) Becoming the leader.\n"));
      leader_active (ACE_Thread::self ());
      return 0;
    }

    int elect_new_leader ()
    {
      ACE_GUARD_RETURN (ACE_Thread_Mutex, mon, leader_lock_, -1);
      leader_active (0);
      if(!followers_.is_empty ())
      {
            Follower_Cond *cond = 0;
#if defined (FOLLOWER_LOCK)
            ACE_GUARD_RETURN (ACE_Thread_Mutex, guard, followers_lock_, -1);
#endif
            int ret = followers_.dequeue_head (cond);
            ACE_ASSERT (ret == 0);
            ACE_DEBUG ((LM_DEBUG, "(%t) Resigning and Electing %d\n", cond->owner ()));
            cond->signal ();
            return 0;
      }
      else
            ACE_DEBUG ((LM_DEBUG, "(%t) Oops no followers left, size = %u\n", followers_.size ()));
      
      return -1;
    }

private:
    int shutdown_;
    ACE_thread_t current_leader_;
    ACE_Thread_Mutex leader_lock_;
    ACE_Unbounded_Queue <Follower_Cond *> followers_;
#if defined (FOLLOWER_LOCK)
    ACE_Thread_Mutex followers_lock_;
#endif
};


int ACE_TMAIN(int argc, ACE_TCHAR* argv[])
{
    LF_ThreadPool pool;
    ACE_Message_Block *mb = 0;
    for (int i=0; i<10; ++ i)
    {
      ACE_NEW_RETURN (mb, ACE_Message_Block (64), -1);
      ACE_OS::sprintf (mb->base (), "%d", i+1);
      mb->wr_ptr (ACE_OS::strlen (mb->base ()));
      pool.putq (mb);
    }

    pool.activate (THR_NEW_LWP | THR_JOINABLE, 3);
    ACE_Thread_Manager::instance ()->wait ();
    return 0;
}
      这个例子使用的ACE版本与书中使用的版本一致,都是 5.4.1,其实在目录 “ACE_wrappers\examples\APG\ThreadPools\LF_ThreadPool.cpp” 中有它完整的代码,我是根据书上的片断写成,稍有出入但原理相同。这个程序演示了 Leader-Follower 线程模型,主线程在 task 的队列上入队一定量的请求,再启动几个子线程,以这种模型处理队列上的请求。所有子线程都运行 LF_ThreadPool::svc 方法,它首先调用become_leader 试图成为领导者线程,如果当前没有领导者,它就会直接获得这个角色,继续向下执行;否则会阻塞在等待上。当领导者处理完请求后,它调用 elect_new_leader,通过激发信号,释放之前等待的线程,领导者回归线程池,变为普通线程,再次等待在 become_leader 之上。当队列中没有更多请求时,线程直接退出循环,在退出之前,会调用 elect_new_leader 通知下一个等待者也退出,这样主线程在会合的所有子线程后,整个进程就可以得体的退出了。
      大部分情况下,这个例子可以正常运转,但是偶尔,有些线程无法正常退出,导致整个进程挂死在那里。下面是一次典型的挂死情况的输出:
(8060) start up
(8060) Becoming the leader.
(8060) Oops no followers left, size = 0
(10176) start up
(10176) Becoming the leader.
(8060) process message: 1
(6804) start up
(10176) Oops no followers left, size = 0
(10176) process message: 2
(8060) Becoming the leader.
(8060) Resigning and Electing 6804
(8060) process message: 3
(6804) Becoming the leader.
(6804) Resigning and Electing 10176
(6804) process message: 4
(10176) Becoming the leader.
(10176) Resigning and Electing 8060
(10176) process message: 5
(8060) Becoming the leader.
(8060) Resigning and Electing 6804
(8060) process message: 6
(10176) Becoming the leader.
(10176) Oops no followers left, size = 0
(10176) process message: 7
(8060) Becoming the leader.
(8060) Resigning and Electing 10176
(8060) process message: 8
(10176) Becoming the leader.
(10176) Resigning and Electing 8060
(10176) process message: 9
(8060) Becoming the leader.
(8060) Oops no followers left, size = 0
(8060) process message: 10
(10176) Becoming the leader.
(10176) Oops no followers left, size = 0
(10176) exit
(8060) Becoming the leader.
(8060) Oops no followers left, size = 0
(8060) exit      启动了三个线程:8060、10176 与 6804,最后的 exit 输出表明 8060 与 10176 正常退出了,但 6804 的最后输出一直停留在 “process message: 4”。挂上调度器,检查线程,发现果然只有主线程与 6804 还在。
>    9276    __tmainCRTStartup    ACE_OS::cond_wait    正常    0
   6804    ace_thread_adapter    ACE_OS::cond_wait    正常    0      主线程 9276 的堆栈表示,它是在正常的 ACE_Thread_Manager::wait 处等待其它子线程退出:
>    ACE5.4.1d.dll!ACE_OS::cond_wait(ACE_cond_t * cv=0x013dbdfc, _RTL_CRITICAL_SECTION * external_mutex=0x013dbde0)行1712 + 0x11 字节    C++
   ACE5.4.1d.dll!ACE_OS::cond_timedwait(ACE_cond_t * cv=0x013dbdfc, _RTL_CRITICAL_SECTION * external_mutex=0x013dbde0, ACE_Time_Value * timeout=0x00000000)行1605 + 0xd 字节    C++
   ACE5.4.1d.dll!ACE_Condition_Thread_Mutex::wait(ACE_Thread_Mutex & mutex={...}, const ACE_Time_Value * abstime=0x00000000)行112 + 0x11 字节    C++
   ACE5.4.1d.dll!ACE_Condition_Thread_Mutex::wait(const ACE_Time_Value * abstime=0x00000000)行120    C++
   ACE5.4.1d.dll!ACE_Thread_Manager::wait(const ACE_Time_Value * timeout=0x00000000, int abandon_detached_threads=0)行1779 + 0xf 字节    C++
   lf.exe!ace_main_i(int argc=1, char * * argv=0x013fe828)行174 + 0x1d 字节    C++
   lf.exe!ACE_Main::run_i(int argc=1, char * * argv=0x013fe828)行161 + 0x30 字节    C++
   ACE5.4.1d.dll!ACE_Main_Base::run(int argc=1, char * * argv=0x013fe828)行94 + 0x16 字节    C++
   ACE5.4.1d.dll!ace_os_main_i(ACE_Main_Base & mbase={...}, int argc=1, char * * argv=0x013fe828)行101 + 0x10 字节    C++
   lf.exe!main(int argc=1, char * * argv=0x013fe828)行161 + 0x3a 字节    C++
   lf.exe!__tmainCRTStartup()行597 + 0x19 字节    C
   lf.exe!mainCRTStartup()行414    C      看来它就是在等待 6804,但这个线程又在做什么呢? 查看它的堆栈:
>    ACE5.4.1d.dll!ACE_OS::cond_wait(ACE_cond_t * cv=0x01aafcd8, _RTL_CRITICAL_SECTION * external_mutex=0x0012fc88)行1712 + 0x11 字节    C++
   ACE5.4.1d.dll!ACE_Condition_Thread_Mutex::wait()行102 + 0x10 字节    C++
   lf.exe!LF_ThreadPool::become_leader()行100 + 0xb 字节    C++
   lf.exe!LF_ThreadPool::svc()行58    C++
   ACE5.4.1d.dll!ACE_Task_Base::svc_run(void * args=0x0012fc28)行203 + 0xf 字节    C++
   ACE5.4.1d.dll!ACE_Thread_Adapter::invoke_i()行150 + 0x9 字节    C++
   ACE5.4.1d.dll!ACE_Thread_Adapter::invoke()行93 + 0xf 字节    C++
   ACE5.4.1d.dll!ace_thread_adapter(void * args=0x013dc808)行131 + 0xe 字节    C++
   msvcr80d.dll!_callthreadstartex()行348 + 0xf 字节    C
   msvcr80d.dll!_threadstartex(void * ptd=0x013df618)行331    C      它居然也在等待!点击 become_leader 调用,可以发现光标停留在 100 行:
            {
#if defined (FOLLOWER_LOCK)
                ACE_GUARD_RETURN (ACE_Thread_Mutex, guard, followers_lock_, -1);
#endif
                followers_.enqueue_tail (cond);
            }

            while (leader_active ())
                cond->wait ();       就是上面的 cond->wait。这段代码的意思是,如果发现当前有领导者,就创建一个 condition,把它加入到全局队列,然后在上面等待,当领导者完成处理后,调用 elect_new_leader 会从队列中取出一个 condition,并激发它,从而让我可以继续执行。而且这里为了防止线程竞争,当从 condition 唤醒时,它第一件要做的事是去检查条件是否已经满足,如果不满足,则继续等待。因为有这种情况,当 leader 线程激发信号后,它很快的回到循环开始,再次进入 become_leader,又一次取得领导权,那么当这个线程唤醒要执行时,leader_active 条件还是假,所以它此时最好的选择还是继续等待,难道这有什么问题吗? 对了!它并没有再次将 condition 加入队列,导致其它线程无法通知它!从而形成一个消失的线程,虽然存在,却不能提供任何服务。为了验证这一点,将上面的代码段改为如下:
            int n = 0;
            while (leader_active ())
            {
                ACE_DEBUG ((LM_DEBUG, "(%t) waiting to become leader, %u times\n", ++n));
                // NOTE: this re-enqueue very important,
                // as when we signalled by other thread,
                // we have dequeued, if someone else
                // become leader during the gap,
                // and we go to wait again without enqueue,
                // we will never get notified.
                followers_.enqueue_tail (cond);
                cond->wait ();
                ACE_DEBUG ((LM_DEBUG, "(%t) get signalled\n"));
            }      这一次,不仅在每次等待前入队 condition,还打了一些日志帮我们更好的看清这一现象。如果输出的 times > 0,就表示发生过类似这样的情况,而这种情况绝对是导致上述问题的根源。编译重运行,输出如下:
(10012) start up
(10012) Becoming the leader.
(10012) Oops no followers left, size = 0
(10012) process message: 1
(10012) Becoming the leader.
(10012) Oops no followers left, size = 0
(10012) process message: 2
(9896) start up
(3688) start up
(10012) Becoming the leader.
(9896) waiting to become leader, 1 times
(3688) waiting to become leader, 1 times
(10012) Resigning and Electing 9896
(10012) process message: 3
(9896) get signalled
(9896) Becoming the leader.
(10012) waiting to become leader, 1 times
(9896) Resigning and Electing 3688
(9896) process message: 4
(3688) get signalled
(3688) Becoming the leader.
(9896) waiting to become leader, 1 times
(3688) Resigning and Electing 10012
(3688) process message: 5
(10012) get signalled
(10012) Becoming the leader.
(3688) waiting to become leader, 1 times
(10012) Resigning and Electing 9896
(10012) process message: 6
(9896) get signalled
(9896) Becoming the leader.
(10012) waiting to become leader, 1 times
(9896) Resigning and Electing 3688
(9896) process message: 7
(3688) get signalled
(3688) Becoming the leader.
(3688) Resigning and Electing 10012
(9896) Becoming the leader.
(3688) process message: 8
(9896) Oops no followers left, size = 0
(9896) process message: 9
(10012) get signalled
(10012) Becoming the leader.
(3688) waiting to become leader, 1 times
(9896) waiting to become leader, 1 times
(10012) Resigning and Electing 3688
(10012) process message: 10
(3688) get signalled
(3688) Becoming the leader.
(3688) Resigning and Electing 9896
(3688) exit
(10012) Becoming the leader.
(9896) get signalled
(9896) waiting to become leader, 2 times
(10012) Resigning and Electing 9896
(10012) exit
(9896) get signalled
(9896) Becoming the leader.
(9896) Oops no followers left, size = 0
(9896) exit
请按任意键继续. . .      为了得到这个现象,我试了很多次,这是多线程调试的一个复杂性之一——并不是每次都出问题。如果你希望看到更明显的输出,可以将初始入队的消息个数与线程个数都增加一个数量级。这次的情况还是3个线程,其中 3688 在选择 9896 当领导者后,10012 突然插进来当了领导者,导致 9896 虽然被唤醒,都不得不再次等待,当 10012 完成其角色后,这才通知 9896 继续工作,整个程序正常退出,没有卡死现象了。看来问题就是刚才我们分析的没有再次入队 condition,使用 condition 习惯了以后,容易写出这样的代码:
          while (leader_active ())
                cond->wait ();       但是这里背景有稍许不同,结果就大相径庭了,如果用在工业环境中,你会发现你的线程池中真正工作的线程越来越少,原因呢却不是那么明显,足以让一个程序员为之焦头烂额,不过这只是 ACE 提供的一个例子,并不是库代码,所以我们大可以放心好了,库代码还是经过千锤百炼滴~
      最后,ACE 本身提供的那个例子也有这个问题,而且据我目测到 6.1.9 版本这个问题还存在,所以如果你在项目中使用这个例子,还是把它改掉为好。

页: [1]
查看完整版本: ACE 示例中的一个多线程问题分析