找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 6773|回复: 0

利用ACE实现共享内存的管理(一)

[复制链接]
发表于 2013-8-28 15:40:20 | 显示全部楼层 |阅读模式
对于共享内存,我想大家都不陌生,windows和linux下的共享内存有些许差异,但总体而言实现的结构是差不多的。
以前我写过一些关于共享内存的运用和研究。最近结合ACE重新实现了一个更为简便的,纵然ACE的例子有,但是实用性实际并不是很全,更多的是展示API功能而已。
就实际工作来说,共享内存确实实现了跨进程的访问。但是,这样也带来了一些管理的问题,比如,一个程序正在改写共享内存的时候,另一个程序正在访问,那么会不会出问题?还有就是如何保证数据完整性?如果使用锁的话,会不会大幅降低系统的性能?
结合这些实际需求,我一步步来解释如何使用ACE来帮我们解决以上的问题。
比如,我收到了这么一个需求,我有10000量汽车,每时每刻都在给我传定位经纬度数据。我需要根据这些经纬度,去计算这些车是否进入或者驶出了某些用户指定的区域,这些区域包括圆形,矩形,多边形和线路。因为数据上传量非常大,几乎每辆车都有自己的区域设置(一辆车几十个都是有可能的)。那么,光一次从数据库加载到内存里,就要消耗十几分钟。如果我的计算判定程度崩溃了,那么重启的代价是,十几分钟内的所有边界判定都会出问题,这样在实际使用中是不允许的。
那么,怎么办?
使用共享内存。
这里,我需要两个进程,一个是后台维护共享内存的进程,一个是实际计算使用的共享内存进程(可能是若干个)。
我的维护进程作用是,第一次共享内存不存在的时候,从数据库把数据区域加载到共享内存,诚然,这个过程很缓慢,需要实际分钟甚至更多。我们不妨把目光放远一些,100万辆车,加载个把小时都是可能的。
那么,在机器启动的时候(我的实际生产环境是Linux和AIX),我这个进程就会随之启动,第一次加载共享内存。此后,维护进程就只对更新的数据做加载而已。方法有两个,定期加载,比如1小时一次,或者实时加载,用户点击确认的时候,发给维护进程一个消息,让它去加载那些被修改的区域内存。而在实际使用中,在大量用户操作的情况下,定时加载比实时加载更具备实用性。你不可能让i的机器无限制的访问DB。那么别的事情可以不用干了。
而实际计算的进程们,他们负责处理比较数据。他们是不和数据库打交道的。而是直接计算结果,如果有符合用户条件的结果,把数据按照一定格式写成文本文件,交给另外一个读取进程,读取文本文件并同步到数据库,去做。就可以了。
那么,问题来了,因为实际生产环境中,我们的客户是以公司形式存在的。一个公司下往往有几十辆车。一套系统往往有几千个公司,尤其是运输公司,修改进出区域非常频繁,比如快递,进货运等。那么我每次加载的时候,就面临一个问题,加载的时候车辆判定必须还在进行,因为车辆数据处理的非常多。万一正在判定的数据更新了,导致判定结果出现问题,或者程序出现访问错误,那就不行了。
那么,我就得使用进程间的锁来解决问题。
以前的写法是,使用文件作为锁的标记。实际就是文件锁。
最近重新翻读了ACE的代码,把这部分代码用ACE重新实现了一遍,发现代码简洁了不少。
下面来看看,我的代码是如何做的。(以上部分涉及到一些不能公开的逻辑,所以这里只展示例子)
为了兼容windows和linux,我在windows测试环境下,使用FileMapping,在linux下使用SV。
好了,看看,ACE如何解决这样的问题。
  1. #pragma once
  2. #include "ace/OS_main.h"
  3. #include "ace/OS_NS_stdio.h"
  4. #ifdef WIN32
  5. #include "ace/Shared_Memory_MM.h"
  6. #else
  7. #include "ace/Shared_Memory_SV.h"
  8. #endif
  9. #include "ace/Log_Msg.h"
  10. #include "ace/OS_NS_unistd.h"
  11. #include "ace/Process_Semaphore.h"
  12. #include "ace/OS_NS_string.h"
  13. #ifdef win32
  14. #define PRINTF printf_s
  15. #else
  16. #define PRINTF printf
  17. #endif
  18. struct _UserData
  19. {
  20.         char szData[20];
  21.         int  nData;        //当前index
  22.         int  nCount;       //写入次数
  23.         bool blIsUpdate;   //是否已经更新
  24.         _UserData()
  25.         {
  26.                 szData[0]  = '\0';
  27.                 nData      = 0;
  28.                 nCount     = 0;
  29.                 blIsUpdate = false;
  30.         }
  31. };
  32. class CMemWrite
  33. {
  34. public:
  35.         CMemWrite(void);
  36.         ~CMemWrite(void);
  37.         bool OpenMem(key_t nkey, int nSize);
  38.         bool WriteMemory(bool blFirst = true);
  39.         void Close();
  40. private:
  41.         ACE_Process_Semaphore* m_pPeocess;
  42.         char* m_pData;
  43. #ifdef WIN32       
  44.         ACE_Shared_Memory_MM* m_pShareMemory;
  45. #else
  46.   ACE_Shared_Memory_SV* m_pShareMemory;
  47. #endif       
  48. };
复制代码
先说writer, 也就是我的维护进程。
#ifdef WIN32
#include "ace/Shared_Memory_MM.h"
#else
#include "ace/Shared_Memory_SV.h"
#endif
这里是根据windows还是linux自己选择共享内存的模式,SV的启动和MM是不同的,这里一会再说。
比如,我这里的数据结构是_UserData,你可以把这个想象成为车辆的数据信息。
#define SHM_SIZE sizeof(_UserData)*100
我定义一个共享内存的size,比如说就100个_UserData大小吧。
ACE_Process_Semaphore* m_pPeocess;
这个ACE_Process_Semaphore就是全局进程锁,当我修改数据的时候,我需要锁上它。
  1. bool CMemWrite::OpenMem( key_t nkey, int nSize )
  2. {
  3.         bool blIsCreate = false;
  4. #ifdef WIN32
  5.         if(m_pShareMemory == NULL)
  6.         {
  7.                 m_pShareMemory = new ACE_Shared_Memory_MM(nkey);
  8.         }
  9. #else
  10.         if(m_pShareMemory == NULL)
  11.         {
  12.                 m_pShareMemory = new ACE_Shared_Memory_SV();
  13.                 int nError = m_pShareMemory->open(nkey, nSize, ACE_Shared_Memory_SV::ACE_OPEN);
  14.                 if(nError != 0)
  15.                 {
  16.                         //创建共享内存
  17.                         nError = m_pShareMemory->open(nkey, nSize, ACE_Shared_Memory_SV::ACE_CREATE);
  18.                         if(nError != 0)
  19.                         {
  20.                                 PRINTF("[CMemWrite::Init]create share memory fail(%d).\n", errno);
  21.                         }
  22.                         blIsCreate = true;
  23.                 }                       
  24.         }       
  25. #endif
  26.         m_pData = (char *)m_pShareMemory->malloc();
  27.         if(NULL == m_pData)
  28.         {
  29.                 PRINTF("[CMemWrite::OpenMem]open share memory fail.\n");
  30.                 return false;
  31.         }
  32.        
  33.         if(blIsCreate == true)
  34.         {
  35.                 ACE_OS::memset(m_pData, 0, nSize);
  36.                 PRINTF("[CMemWrite::Init]first share memory memset(%d).\n", nSize);
  37.         }       
  38.        
  39.         return true;
  40. }
复制代码
这里就是打开共享内存的方法了。
SV和MM,不同,MM第一次初始化文件的时候,实际都是0,而SV第一次初始化的时候,里面并不总是干净的内存,所以如果确认共享内存是第一次开启,我们需要初始化它。
if(blIsCreate == true)
{
ACE_OS::memset(m_pData, 0, nSize);
PRINTF("[CMemWrite::Init]first share memory memset(%d).\n", nSize);
}
这里就是初始化的地方,MM则不用。
行了,在这里,你可以加载你的共享内存初始化的方法,这里可以连接数据库,读文件,访问IO,随你。
然后,比如我每隔一段时间,写一次,这里我故意用了sleep,就是为了让大家看到进程所锁住数据不让读访问的功能。当然,你可以用读写锁,进程读写锁也是支持的。
  1. bool CMemWrite::WriteMemory( bool blFirst /*= true*/ )
  2. {
  3.         if(blFirst == true)
  4.         {
  5.                 //如果是第一次运行,初始化动作
  6.                 m_pPeocess->acquire();
  7.                 if(NULL != m_pData)
  8.                 {
  9.                         for(int i = 0; i < 100; i++)
  10.                         {
  11.                                 _UserData* pUserData = (_UserData* )(m_pData + i*sizeof(_UserData));
  12.                                 pUserData->blIsUpdate = true;
  13.                                 pUserData->nData      = i;
  14.                                 pUserData->nCount     = 1;
  15. #ifdef WIN32                               
  16.                                 sprintf_s(pUserData->szData, 20, "freeeyes test");
  17. #else
  18.                                 sprintf(pUserData->szData, "freeeyes test");
  19. #endif
  20.                         }
  21.                 }
  22.                 else
  23.                 {
  24.                         PRINTF("[CMemRead::WriteMemory]m_pData is NULL.\n");
  25.                         return false;
  26.                 }
  27.                 ACE_Time_Value tvSleep(30, 0);
  28.                 ACE_OS::sleep(tvSleep);
  29.                 m_pPeocess->release();
  30.         }
  31.         else
  32.         {
  33.                 m_pPeocess->acquire();
  34.                 ACE_Time_Value tvSleep(10, 0);
  35.                 ACE_OS::sleep(tvSleep);
  36.                 //m_pData = (char *)m_pShareMemory->malloc();
  37.                 if(NULL != m_pData)
  38.                 {
  39.                         for(int i = 0; i < 100; i++)
  40.                         {
  41.                                 _UserData* pUserData = (_UserData* )(m_pData + i*sizeof(_UserData));
  42.                                 pUserData->blIsUpdate = true;
  43.                                 pUserData->nCount++;
  44.                         }
  45.                 }
  46.                 else
  47.                 {
  48.                         PRINTF("[CMemRead::WriteMemory]m_pData is NULL.\n");
  49.                         return false;
  50.                 }
  51.                 m_pPeocess->release();
  52.         }
  53.         return true;
  54. }
复制代码
这里就是每次write执行,我让UserData的nCount自动+1
好了,我们运行一下这个代码。
  1. #include "MemWrite.h"
  2. #define SHM_SIZE sizeof(_UserData)*100
  3. int main(int argc, char* argv[])
  4. {
  5.         CMemWrite objMemWrite;
  6.         //打开一个共享内存
  7.        
  8. #ifdef WIN32
  9.         key_t objKey = "1234";
  10. #else
  11.         key_t objKey = 1234;
  12. #endif
  13.         if(true == objMemWrite.OpenMem(objKey, SHM_SIZE))
  14.         {
  15.                 PRINTF("[Main]open share memory ok.\n");
  16.         }
  17.         bool blFirst = true;
  18.         while(true)
  19.         {
  20.                 PRINTF("[Main]Write begin.\n");
  21.                 objMemWrite.WriteMemory(blFirst);
  22.                 blFirst = false;
  23.                 PRINTF("[Main]Write end.\n");
  24.         }
  25.         return 0;
  26. }
复制代码
行了,维护进程写完了。下面来看看读取处理进程如何实现。
在这里,我的读取进程,每次扫描一下所有的UserData(),并写入文件。这样我们能直观的看到数据变化结果。
  1. int main(int argc, char* argv[])
  2. {
  3.         CMemRead objMemRead;
  4.         //打开一个共享内存
  5. #ifdef WIN32
  6.         key_t objKey = "1234";
  7. #else
  8.         key_t objKey = 1234;
  9. #endif       
  10.         if(true == objMemRead.OpenMem(objKey, SHM_SIZE))
  11.         {
  12.                 PRINTF("[Main]open share memory ok.\n");
  13.         }
  14.         while(true)
  15.         {
  16.                 PRINTF("[Main]Read begin.\n");
  17.                 objMemRead.ReadMemory();
  18.                 PRINTF("[Main]Read end.\n");
  19.                
  20.                 ACE_Time_Value tvSleep(1, 0);
  21.                 ACE_OS::sleep(tvSleep);
  22.         }
  23.         getchar();
  24.         return 0;
  25. }
复制代码
ReadMemory()方法便是我每次扫描数据的结果。
实现如下:
  1. bool CMemRead::ReadMemory()
  2. {
  3.         m_pPeocess->acquire();
  4.         //m_pData = (char *)m_pShareMemory->malloc();
  5.         if(NULL != m_pData)
  6.         {
  7.                 for(int i = 0; i < 100; i++)
  8.                 {
  9.                         _UserData* pUserData = (_UserData* )(m_pData + i*sizeof(_UserData));
  10.                         if(pUserData->blIsUpdate == true)
  11.                         {
  12.                                 //发现更新,写入文件
  13.                                 FILE* pFile = ACE_OS::fopen(ACE_TEXT("freeeyes.txt"), ACE_TEXT("ab+"));
  14.                                 if(NULL == pFile)
  15.                                 {
  16.                                         PRINTF("[CMemRead::ReadMemory]pFile is NULL(%d).\n", errno);
  17.                                         return false;
  18.                                 }
  19.                                 char szFileLog[200] = {'\0'};
  20. #ifdef WIN32
  21.                                 sprintf_s(szFileLog, 200, "nData=%d, szData=%s, nCount=%d.\n", pUserData->nData, pUserData->szData, pUserData->nCount);
  22. #else
  23.                                 sprintf(szFileLog, "nData=%d, szData=%s, nCount=%d.\n", pUserData->nData, pUserData->szData, pUserData->nCount);
  24. #endif
  25.                                 size_t szFile = ACE_OS::fwrite(szFileLog, sizeof(char), strlen(szFileLog), pFile);
  26.                                 if(szFile != strlen(szFileLog))
  27.                                 {
  28.                                         PRINTF("[CMemRead::ReadMemory]szFile is error.\n");
  29.                                         ACE_OS::fclose(pFile);
  30.                                         return false;
  31.                                 }
  32.                                 ACE_OS::fclose(pFile);
  33.                                 pUserData->blIsUpdate = false;
  34.                         }
  35.                 }
  36.         }
  37.         else
  38.         {
  39.                 PRINTF("[CMemRead::ReadMemory]m_pData is NULL.\n");
  40.                 return false;
  41.         }
  42.         m_pPeocess->release();
  43.         return true;
  44. }
复制代码
同样,这里我使用了进程锁。
好了,编译运行一下,你就看到了读写互斥的共享内存访问。
下一讲,我会讲一些共享内存的使用小技巧。
此代码在win7和linux下编译运行通过。





本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?用户注册

×
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-4-29 19:45 , Processed in 0.013197 second(s), 5 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表