yunh 发表于 2014-3-12 17:04:39

ACE Readers/Writer 锁介绍

本帖最后由 yunh 于 2014-3-13 09:17 编辑

      Readers/Writer 锁,顾名思义就是读写锁,它允许多个读线程同时访问资源,写线程之间、写线程与读线程之间则是互斥的。当你的应用中有大量的线程只是读取,少量线程负责更新,那么使用读写锁将比一般的 Mutex 高效不少 (如果写线程与读线程不相上下,那就另当别论,毕竟读写锁实现起来更复杂)。
      ACE 提供了两个层次上的读写锁:进程内与进程间。前者由 ACE_RW_Thread_Mutex 实现;后者由 ACE_RW_Mutex / ACE_RW_Process_Mutex 实现。ACE_RW_(Thread_)Mutex 在 Unix like 平台上都是基于平台原生的同步机制 (pthread_)rwlock_t,所以这里只是简单的对它们的封装,而在 Windows 平台上,由于没有原生的读写锁,ACE 自己通过 Semaphore 与 Event 模拟了它的实现,但这个模拟只能在进程内工作,所以当你使用进程内的读写锁 ACE_RW_Thread_Mutex 时,可以将它们看作是跨平台的,而你使用进程间的读写锁 ACE_RW_Mutex 时,则只在 Unix like 平台上可以工作 (Semaphore 与 Event 虽然可以跨进程通知,但还有一堆内部变量是怎么在进程间共享的,我实在想象不出来)。
      下面是一个使用 ACE_RW_Thread_Mutex 的例子:
#include "stdafx.h"
#include "ace/OS_NS_unistd.h"
#include "ace/Task.h"

#define RD_CNT 2
#define WR_CNT 1
#define RDWR_CNT 1
#define USE_THR

#if defined (USE_THR)
#include "ace/RW_Thread_Mutex.h"
typedef ACE_RW_Thread_Mutex MUTEX_TYPE;
#else
#include "ace/RW_Mutex.h"
typedef ACE_RW_Mutex MUTEX_TYPE;
#endif

int g_counter = 0;

class rd_task : public ACE_Task_Base
{
public:
    rd_task(MUTEX_TYPE& rdlk)
      : rdlk_(rdlk)
    {
    }

    virtual int svc()
    {
      ACE_DEBUG((LM_DEBUG, "(%T %t) rd_task running.\n"));
      while(thr_mgr()->testcancel(ACE_Thread::self()) == 0)
      {
            ACE_OS::sleep (ACE_Time_Value (0, 100000));
            //ACE_READ_GUARD_RETURN(MUTEX_TYPE, mon, rdlk_, -1);
            rdlk_.acquire_read ();
            ACE_DEBUG((LM_DEBUG, "(%T %t) acquire the read lock, counter = %u.\n", g_counter));
            rdlk_.release ();
      }

      return 0;
    }

private:
    MUTEX_TYPE& rdlk_;
};

class wr_task : public ACE_Task_Base
{
public:
    wr_task(MUTEX_TYPE& wrlk)
      : wrlk_(wrlk)
    {
    }

    virtual int svc()
    {
      ACE_DEBUG((LM_DEBUG, "(%T %t) wr_task running.\n"));
      while(thr_mgr()->testcancel(ACE_Thread::self()) == 0)
      {
            ACE_OS::sleep (ACE_Time_Value (0, 200000));
            //ACE_WRITE_GUARD_RETURN(MUTEX_TYPE, mon, wrlk_, -1);
            wrlk_.acquire_write ();
            ACE_DEBUG((LM_DEBUG, "(%T %t) acquire the write lock, set counter = %u.\n", ++g_counter));
            wrlk_.release ();
      }

      return 0;
    }

private:
    MUTEX_TYPE& wrlk_;
};

class rdwr_task : public ACE_Task_Base
{
public:
    rdwr_task(MUTEX_TYPE& lk)
      : lk_(lk)
    {
    }

    virtual int svc()
    {
      ACE_DEBUG((LM_DEBUG, "(%T %t) rdwr_task running.\n"));
      while(thr_mgr()->testcancel(ACE_Thread::self()) == 0)
      {
            ACE_OS::sleep (ACE_Time_Value (0, 400000));
            //ACE_WRITE_GUARD_RETURN(MUTEX_TYPE, mon, lk_, -1);
            lk_.acquire_read ();
            ACE_DEBUG((LM_DEBUG, "(%T %t) acquire the read lock, counter = %u.\n", g_counter));
            ACE_OS::sleep (ACE_Time_Value (0, 400000));
            lk_.tryacquire_write_upgrade ();
            ACE_DEBUG((LM_DEBUG, "(%T %t) update the read lock to write, set counter = %u.\n", ++g_counter));
            lk_.release ();
      }

      return 0;
    }

private:
    MUTEX_TYPE& lk_;
};


int ACE_TMAIN(int argc, ACE_TCHAR* argv[])
{
    MUTEX_TYPE mutex;

    rd_task rdt(mutex);
    wr_task wrt(mutex);
    rdwr_task rdwrt (mutex);

    rdt.activate(THR_JOINABLE | THR_NEW_LWP, RD_CNT);
    wrt.activate(THR_JOINABLE | THR_NEW_LWP, WR_CNT);
    rdwrt.activate(THR_JOINABLE | THR_NEW_LWP, RDWR_CNT);

    ACE_OS::sleep(3);
    ACE_Thread_Manager::instance()->cancel_all();

    wrt.wait();
    rdt.wait();
    rdwrt.wait ();
    return 0;
}      默认情况下开启 2 个读线程不停的在锁上调用 acquire_read,开启 1 个写线程不停的在锁上调用 acquire_write,还开启了一个读写线程,它先 acquire_read 进行读,之后调用 try_acquire_write_upgrade 将它“升级”为写线程,进行写入。程序输出如下:
( 16:43:05.227000 5264) rd_task running.
( 16:43:05.227000 6444) rd_task running.
( 16:43:05.227000 2432) wr_task running.
( 16:43:05.227000 6152) rdwr_task running.
( 16:43:05.336000 5264) acquire the read lock, counter = 0.
( 16:43:05.336000 6444) acquire the read lock, counter = 0.
( 16:43:05.429000 2432) acquire the write lock, set counter = 1.
( 16:43:05.445000 5264) acquire the read lock, counter = 1.
( 16:43:05.445000 6444) acquire the read lock, counter = 1.
( 16:43:05.554000 6444) acquire the read lock, counter = 1.
( 16:43:05.554000 5264) acquire the read lock, counter = 1.
( 16:43:05.632000 2432) acquire the write lock, set counter = 2.
( 16:43:05.632000 6152) acquire the read lock, counter = 2.
( 16:43:05.663000 6444) acquire the read lock, counter = 2.
( 16:43:05.663000 5264) acquire the read lock, counter = 2.
( 16:43:05.773000 5264) acquire the read lock, counter = 2.
( 16:43:05.773000 6444) acquire the read lock, counter = 2.
( 16:43:06.038000 6152) update the read lock to write, set counter = 3.
( 16:43:06.038000 2432) acquire the write lock, set counter = 4.
( 16:43:06.038000 5264) acquire the read lock, counter = 4.
( 16:43:06.038000 6444) acquire the read lock, counter = 4.
( 16:43:06.147000 6444) acquire the read lock, counter = 4.
( 16:43:06.147000 5264) acquire the read lock, counter = 4.
( 16:43:06.241000 2432) acquire the write lock, set counter = 5.
( 16:43:06.256000 5264) acquire the read lock, counter = 5.
( 16:43:06.256000 6444) acquire the read lock, counter = 5.
( 16:43:06.365000 5264) acquire the read lock, counter = 5.
( 16:43:06.365000 6444) acquire the read lock, counter = 5.
( 16:43:06.443000 6152) acquire the read lock, counter = 5.
( 16:43:06.849000 6152) update the read lock to write, set counter = 6.
( 16:43:06.849000 2432) acquire the write lock, set counter = 7.
( 16:43:06.849000 5264) acquire the read lock, counter = 7.
( 16:43:06.849000 6444) acquire the read lock, counter = 7.
( 16:43:06.958000 5264) acquire the read lock, counter = 7.
( 16:43:06.958000 6444) acquire the read lock, counter = 7.
( 16:43:07.052000 2432) acquire the write lock, set counter = 8.
( 16:43:07.067000 5264) acquire the read lock, counter = 8.
( 16:43:07.067000 6444) acquire the read lock, counter = 8.
( 16:43:07.177000 5264) acquire the read lock, counter = 8.
( 16:43:07.177000 6444) acquire the read lock, counter = 8.
( 16:43:07.255000 6152) acquire the read lock, counter = 8.
( 16:43:07.660000 6152) update the read lock to write, set counter = 9.
( 16:43:07.660000 2432) acquire the write lock, set counter = 10.
( 16:43:07.660000 6444) acquire the read lock, counter = 10.
( 16:43:07.660000 5264) acquire the read lock, counter = 10.
( 16:43:07.769000 6444) acquire the read lock, counter = 10.
( 16:43:07.769000 5264) acquire the read lock, counter = 10.
( 16:43:07.863000 2432) acquire the write lock, set counter = 11.
( 16:43:07.879000 5264) acquire the read lock, counter = 11.
( 16:43:07.879000 6444) acquire the read lock, counter = 11.
( 16:43:07.988000 5264) acquire the read lock, counter = 11.
( 16:43:07.988000 6444) acquire the read lock, counter = 11.
( 16:43:08.066000 6152) acquire the read lock, counter = 11.
( 16:43:08.471000 6152) update the read lock to write, set counter = 12.
( 16:43:08.471000 2432) acquire the write lock, set counter = 13.
( 16:43:08.471000 5264) acquire the read lock, counter = 13.
( 16:43:08.471000 6444) acquire the read lock, counter = 13.      程序输出表明读线程确实可以“并行”的读取数据,而写线程与它们是互斥的。可以通过 USE_THR 编译开关在 ACE_RW_Thread_Mutex 与 ACE_RW_Mutex 之间切换,输出是一样的,如果开启多个进程,可能要给构造器一个名字来在进程间标识它,这里我没有再进一步测试。
      如果想在进程间使用 Readers/Writer 锁,则最好选择 ACE_RW_Process_Mutex,在底层它使用的是 ace_flock_t,在所有平台上,它都是一个文件锁,所以可以在进程间使用是毫无疑问的。下面是一个使用它的测试程序:
#include "stdafx.h"
#include "ace/RW_Process_Mutex.h"
#include "ace/OS_NS_unistd.h"
#include "ace/Log_Msg.h"

//#define READER

int ACE_TMAIN(int argc, ACE_TCHAR* argv[])
{
    ACE_RW_Process_Mutex mutex ("C:/lock.txt");
#if defined (READER)
    int ret = mutex.acquire_read ();
#else
    int ret = mutex.acquire_write ();
#endif
    if (ret == 0)
    {
#if defined (READER)
      ACE_DEBUG ((LM_DEBUG, "(%P/%t %T) reader acquire the mutex.\n"));
#else
      ACE_DEBUG ((LM_DEBUG, "(%P/%t %T) writer acquire the mutex.\n"));
#endif
      ACE_OS::sleep (20);
      mutex.release ();
      ACE_DEBUG ((LM_DEBUG, "(%P/%t %T) release the mutex.\n"));
      //ACE_OS::sleep (3);
    }

    return 0;
}
      这个程序一启动就获取写锁,并随后 sleep 20 秒,如果此时有其它进程想获取写锁,必定被阻塞在 acquire 调用处,当第一个进程醒来调用 release 后,之后的进程才有可能从 acquire_write 阻塞中醒来再继续执行。编译完成后,开启两个这样的程序,结果正如我们所预料的一样:


进程 B 只有在进程 A 释放锁后才获得了写锁

      可以打开编译开关 READER 来使进程获取读锁,重新编译后启动两个进程,发现它们几乎同时获取了锁:


进程 A 与 B 同时获取了读锁

      进程分别使用读、写锁的情况没有测试,但现象应该可以从上面的测试中推测出来。实际观察确实在 C:\ 产生了一个名为 lock.txt 的临时文件,两个进程退出后,文件自动消失。
      由于 ACE_RW_Process_Mutex 是通过文件锁实现的,所以如果你仅仅在进程内使用读写锁,应该使用 ACE_RW_Thread_Mutex 取代它,因为后者在进程内更加高效。

sevencat 发表于 2014-3-17 13:10:43

ace 的锁的实现远远不够。

yunh 发表于 2014-3-17 15:21:29

话怎么只说一半儿
页: [1]
查看完整版本: ACE Readers/Writer 锁介绍