找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 12266|回复: 2

ACE Readers/Writer 锁介绍

[复制链接]
发表于 2014-3-12 17:04:39 | 显示全部楼层 |阅读模式
本帖最后由 yunh 于 2014-3-13 09:17 编辑

        Readers/Writer 锁,顾名思义就是读写锁,它允许多个读线程同时访问资源,写线程之间、写线程与读线程之间则是互斥的。当你的应用中有大量的线程只是读取,少量线程负责更新,那么使用读写锁将比一般的 Mutex 高效不少 (如果写线程与读线程不相上下,那就另当别论,毕竟读写锁实现起来更复杂)。
        ACE 提供了两个层次上的读写锁:进程内与进程间。前者由 ACE_RW_Thread_Mutex 实现;后者由 ACE_RW_Mutex / ACE_RW_Process_Mutex 实现。ACE_RW_(Thread_)Mutex 在 Unix like 平台上都是基于平台原生的同步机制 (pthread_)rwlock_t,所以这里只是简单的对它们的封装,而在 Windows 平台上,由于没有原生的读写锁,ACE 自己通过 Semaphore 与 Event 模拟了它的实现,但这个模拟只能在进程内工作,所以当你使用进程内的读写锁 ACE_RW_Thread_Mutex 时,可以将它们看作是跨平台的,而你使用进程间的读写锁 ACE_RW_Mutex 时,则只在 Unix like 平台上可以工作 (Semaphore 与 Event 虽然可以跨进程通知,但还有一堆内部变量是怎么在进程间共享的,我实在想象不出来)。
        下面是一个使用 ACE_RW_Thread_Mutex 的例子:
  1. #include "stdafx.h"
  2. #include "ace/OS_NS_unistd.h"
  3. #include "ace/Task.h"
  4. #define RD_CNT 2
  5. #define WR_CNT 1
  6. #define RDWR_CNT 1
  7. #define USE_THR
  8. #if defined (USE_THR)
  9. #include "ace/RW_Thread_Mutex.h"
  10. typedef ACE_RW_Thread_Mutex MUTEX_TYPE;
  11. #else
  12. #include "ace/RW_Mutex.h"
  13. typedef ACE_RW_Mutex MUTEX_TYPE;
  14. #endif
  15. int g_counter = 0;
  16. class rd_task : public ACE_Task_Base
  17. {
  18. public:
  19.     rd_task(MUTEX_TYPE& rdlk)
  20.         : rdlk_(rdlk)
  21.     {
  22.     }
  23.     virtual int svc()
  24.     {
  25.         ACE_DEBUG((LM_DEBUG, "(%T %t) rd_task running.\n"));
  26.         while(thr_mgr()->testcancel(ACE_Thread::self()) == 0)
  27.         {
  28.             ACE_OS::sleep (ACE_Time_Value (0, 100000));
  29.             //ACE_READ_GUARD_RETURN(MUTEX_TYPE, mon, rdlk_, -1);
  30.             rdlk_.acquire_read ();
  31.             ACE_DEBUG((LM_DEBUG, "(%T %t) acquire the read lock, counter = %u.\n", g_counter));
  32.             rdlk_.release ();
  33.         }
  34.         return 0;
  35.     }
  36. private:
  37.     MUTEX_TYPE& rdlk_;
  38. };
  39. class wr_task : public ACE_Task_Base
  40. {
  41. public:
  42.     wr_task(MUTEX_TYPE& wrlk)
  43.         : wrlk_(wrlk)
  44.     {
  45.     }
  46.     virtual int svc()
  47.     {
  48.         ACE_DEBUG((LM_DEBUG, "(%T %t) wr_task running.\n"));
  49.         while(thr_mgr()->testcancel(ACE_Thread::self()) == 0)
  50.         {
  51.             ACE_OS::sleep (ACE_Time_Value (0, 200000));
  52.             //ACE_WRITE_GUARD_RETURN(MUTEX_TYPE, mon, wrlk_, -1);
  53.             wrlk_.acquire_write ();
  54.             ACE_DEBUG((LM_DEBUG, "(%T %t) acquire the write lock, set counter = %u.\n", ++g_counter));
  55.             wrlk_.release ();
  56.         }
  57.         return 0;
  58.     }
  59. private:
  60.     MUTEX_TYPE& wrlk_;
  61. };
  62. class rdwr_task : public ACE_Task_Base
  63. {
  64. public:
  65.     rdwr_task(MUTEX_TYPE& lk)
  66.         : lk_(lk)
  67.     {
  68.     }
  69.     virtual int svc()
  70.     {
  71.         ACE_DEBUG((LM_DEBUG, "(%T %t) rdwr_task running.\n"));
  72.         while(thr_mgr()->testcancel(ACE_Thread::self()) == 0)
  73.         {
  74.             ACE_OS::sleep (ACE_Time_Value (0, 400000));
  75.             //ACE_WRITE_GUARD_RETURN(MUTEX_TYPE, mon, lk_, -1);
  76.             lk_.acquire_read ();
  77.             ACE_DEBUG((LM_DEBUG, "(%T %t) acquire the read lock, counter = %u.\n", g_counter));
  78.             ACE_OS::sleep (ACE_Time_Value (0, 400000));
  79.             lk_.tryacquire_write_upgrade ();
  80.             ACE_DEBUG((LM_DEBUG, "(%T %t) update the read lock to write, set counter = %u.\n", ++g_counter));
  81.             lk_.release ();
  82.         }
  83.         return 0;
  84.     }
  85. private:
  86.     MUTEX_TYPE& lk_;
  87. };
  88. int ACE_TMAIN(int argc, ACE_TCHAR* argv[])
  89. {
  90.     MUTEX_TYPE mutex;
  91.     rd_task rdt(mutex);
  92.     wr_task wrt(mutex);
  93.     rdwr_task rdwrt (mutex);
  94.     rdt.activate(THR_JOINABLE | THR_NEW_LWP, RD_CNT);
  95.     wrt.activate(THR_JOINABLE | THR_NEW_LWP, WR_CNT);
  96.     rdwrt.activate(THR_JOINABLE | THR_NEW_LWP, RDWR_CNT);
  97.     ACE_OS::sleep(3);
  98.     ACE_Thread_Manager::instance()->cancel_all();
  99.     wrt.wait();
  100.     rdt.wait();
  101.     rdwrt.wait ();
  102.     return 0;
  103. }
复制代码
       默认情况下开启 2 个读线程不停的在锁上调用 acquire_read,开启 1 个写线程不停的在锁上调用 acquire_write,还开启了一个读写线程,它先 acquire_read 进行读,之后调用 try_acquire_write_upgrade 将它“升级”为写线程,进行写入。程序输出如下:
  1. ( 16:43:05.227000 5264) rd_task running.
  2. ( 16:43:05.227000 6444) rd_task running.
  3. ( 16:43:05.227000 2432) wr_task running.
  4. ( 16:43:05.227000 6152) rdwr_task running.
  5. ( 16:43:05.336000 5264) acquire the read lock, counter = 0.
  6. ( 16:43:05.336000 6444) acquire the read lock, counter = 0.
  7. ( 16:43:05.429000 2432) acquire the write lock, set counter = 1.
  8. ( 16:43:05.445000 5264) acquire the read lock, counter = 1.
  9. ( 16:43:05.445000 6444) acquire the read lock, counter = 1.
  10. ( 16:43:05.554000 6444) acquire the read lock, counter = 1.
  11. ( 16:43:05.554000 5264) acquire the read lock, counter = 1.
  12. ( 16:43:05.632000 2432) acquire the write lock, set counter = 2.
  13. ( 16:43:05.632000 6152) acquire the read lock, counter = 2.
  14. ( 16:43:05.663000 6444) acquire the read lock, counter = 2.
  15. ( 16:43:05.663000 5264) acquire the read lock, counter = 2.
  16. ( 16:43:05.773000 5264) acquire the read lock, counter = 2.
  17. ( 16:43:05.773000 6444) acquire the read lock, counter = 2.
  18. ( 16:43:06.038000 6152) update the read lock to write, set counter = 3.
  19. ( 16:43:06.038000 2432) acquire the write lock, set counter = 4.
  20. ( 16:43:06.038000 5264) acquire the read lock, counter = 4.
  21. ( 16:43:06.038000 6444) acquire the read lock, counter = 4.
  22. ( 16:43:06.147000 6444) acquire the read lock, counter = 4.
  23. ( 16:43:06.147000 5264) acquire the read lock, counter = 4.
  24. ( 16:43:06.241000 2432) acquire the write lock, set counter = 5.
  25. ( 16:43:06.256000 5264) acquire the read lock, counter = 5.
  26. ( 16:43:06.256000 6444) acquire the read lock, counter = 5.
  27. ( 16:43:06.365000 5264) acquire the read lock, counter = 5.
  28. ( 16:43:06.365000 6444) acquire the read lock, counter = 5.
  29. ( 16:43:06.443000 6152) acquire the read lock, counter = 5.
  30. ( 16:43:06.849000 6152) update the read lock to write, set counter = 6.
  31. ( 16:43:06.849000 2432) acquire the write lock, set counter = 7.
  32. ( 16:43:06.849000 5264) acquire the read lock, counter = 7.
  33. ( 16:43:06.849000 6444) acquire the read lock, counter = 7.
  34. ( 16:43:06.958000 5264) acquire the read lock, counter = 7.
  35. ( 16:43:06.958000 6444) acquire the read lock, counter = 7.
  36. ( 16:43:07.052000 2432) acquire the write lock, set counter = 8.
  37. ( 16:43:07.067000 5264) acquire the read lock, counter = 8.
  38. ( 16:43:07.067000 6444) acquire the read lock, counter = 8.
  39. ( 16:43:07.177000 5264) acquire the read lock, counter = 8.
  40. ( 16:43:07.177000 6444) acquire the read lock, counter = 8.
  41. ( 16:43:07.255000 6152) acquire the read lock, counter = 8.
  42. ( 16:43:07.660000 6152) update the read lock to write, set counter = 9.
  43. ( 16:43:07.660000 2432) acquire the write lock, set counter = 10.
  44. ( 16:43:07.660000 6444) acquire the read lock, counter = 10.
  45. ( 16:43:07.660000 5264) acquire the read lock, counter = 10.
  46. ( 16:43:07.769000 6444) acquire the read lock, counter = 10.
  47. ( 16:43:07.769000 5264) acquire the read lock, counter = 10.
  48. ( 16:43:07.863000 2432) acquire the write lock, set counter = 11.
  49. ( 16:43:07.879000 5264) acquire the read lock, counter = 11.
  50. ( 16:43:07.879000 6444) acquire the read lock, counter = 11.
  51. ( 16:43:07.988000 5264) acquire the read lock, counter = 11.
  52. ( 16:43:07.988000 6444) acquire the read lock, counter = 11.
  53. ( 16:43:08.066000 6152) acquire the read lock, counter = 11.
  54. ( 16:43:08.471000 6152) update the read lock to write, set counter = 12.
  55. ( 16:43:08.471000 2432) acquire the write lock, set counter = 13.
  56. ( 16:43:08.471000 5264) acquire the read lock, counter = 13.
  57. ( 16:43:08.471000 6444) acquire the read lock, counter = 13.
复制代码
       程序输出表明读线程确实可以“并行”的读取数据,而写线程与它们是互斥的。可以通过 USE_THR 编译开关在 ACE_RW_Thread_Mutex 与 ACE_RW_Mutex 之间切换,输出是一样的,如果开启多个进程,可能要给构造器一个名字来在进程间标识它,这里我没有再进一步测试。
        如果想在进程间使用 Readers/Writer 锁,则最好选择 ACE_RW_Process_Mutex,在底层它使用的是 ace_flock_t,在所有平台上,它都是一个文件锁,所以可以在进程间使用是毫无疑问的。下面是一个使用它的测试程序:
  1. #include "stdafx.h"
  2. #include "ace/RW_Process_Mutex.h"
  3. #include "ace/OS_NS_unistd.h"
  4. #include "ace/Log_Msg.h"
  5. //#define READER
  6. int ACE_TMAIN(int argc, ACE_TCHAR* argv[])
  7. {
  8.     ACE_RW_Process_Mutex mutex ("C:/lock.txt");
  9. #if defined (READER)
  10.     int ret = mutex.acquire_read ();
  11. #else
  12.     int ret = mutex.acquire_write ();
  13. #endif
  14.     if (ret == 0)
  15.     {
  16. #if defined (READER)
  17.         ACE_DEBUG ((LM_DEBUG, "(%P/%t %T) reader acquire the mutex.\n"));
  18. #else
  19.         ACE_DEBUG ((LM_DEBUG, "(%P/%t %T) writer acquire the mutex.\n"));
  20. #endif
  21.         ACE_OS::sleep (20);
  22.         mutex.release ();
  23.         ACE_DEBUG ((LM_DEBUG, "(%P/%t %T) release the mutex.\n"));
  24.         //ACE_OS::sleep (3);
  25.     }
  26.     return 0;
  27. }
复制代码
       这个程序一启动就获取写锁,并随后 sleep 20 秒,如果此时有其它进程想获取写锁,必定被阻塞在 acquire 调用处,当第一个进程醒来调用 release 后,之后的进程才有可能从 acquire_write 阻塞中醒来再继续执行。编译完成后,开启两个这样的程序,结果正如我们所预料的一样:


进程 B 只有在进程 A 释放锁后才获得了写锁

        可以打开编译开关 READER 来使进程获取读锁,重新编译后启动两个进程,发现它们几乎同时获取了锁:


进程 A 与 B 同时获取了读锁

        进程分别使用读、写锁的情况没有测试,但现象应该可以从上面的测试中推测出来。实际观察确实在 C:\ 产生了一个名为 lock.txt 的临时文件,两个进程退出后,文件自动消失。
        由于 ACE_RW_Process_Mutex 是通过文件锁实现的,所以如果你仅仅在进程内使用读写锁,应该使用 ACE_RW_Thread_Mutex 取代它,因为后者在进程内更加高效。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?用户注册

×
发表于 2014-3-17 13:10:43 | 显示全部楼层
ace 的锁的实现远远不够。
 楼主| 发表于 2014-3-17 15:21:29 | 显示全部楼层
话怎么只说一半儿
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-4-26 08:06 , Processed in 0.013434 second(s), 6 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表