|
对于共享内存,我想大家都不陌生,windows和linux下的共享内存有些许差异,但总体而言实现的结构是差不多的。
以前我写过一些关于共享内存的运用和研究。最近结合ACE重新实现了一个更为简便的,纵然ACE的例子有,但是实用性实际并不是很全,更多的是展示API功能而已。
就实际工作来说,共享内存确实实现了跨进程的访问。但是,这样也带来了一些管理的问题,比如,一个程序正在改写共享内存的时候,另一个程序正在访问,那么会不会出问题?还有就是如何保证数据完整性?如果使用锁的话,会不会大幅降低系统的性能?
结合这些实际需求,我一步步来解释如何使用ACE来帮我们解决以上的问题。
比如,我收到了这么一个需求,我有10000量汽车,每时每刻都在给我传定位经纬度数据。我需要根据这些经纬度,去计算这些车是否进入或者驶出了某些用户指定的区域,这些区域包括圆形,矩形,多边形和线路。因为数据上传量非常大,几乎每辆车都有自己的区域设置(一辆车几十个都是有可能的)。那么,光一次从数据库加载到内存里,就要消耗十几分钟。如果我的计算判定程度崩溃了,那么重启的代价是,十几分钟内的所有边界判定都会出问题,这样在实际使用中是不允许的。
那么,怎么办?
使用共享内存。
这里,我需要两个进程,一个是后台维护共享内存的进程,一个是实际计算使用的共享内存进程(可能是若干个)。
我的维护进程作用是,第一次共享内存不存在的时候,从数据库把数据区域加载到共享内存,诚然,这个过程很缓慢,需要实际分钟甚至更多。我们不妨把目光放远一些,100万辆车,加载个把小时都是可能的。
那么,在机器启动的时候(我的实际生产环境是Linux和AIX),我这个进程就会随之启动,第一次加载共享内存。此后,维护进程就只对更新的数据做加载而已。方法有两个,定期加载,比如1小时一次,或者实时加载,用户点击确认的时候,发给维护进程一个消息,让它去加载那些被修改的区域内存。而在实际使用中,在大量用户操作的情况下,定时加载比实时加载更具备实用性。你不可能让i的机器无限制的访问DB。那么别的事情可以不用干了。
而实际计算的进程们,他们负责处理比较数据。他们是不和数据库打交道的。而是直接计算结果,如果有符合用户条件的结果,把数据按照一定格式写成文本文件,交给另外一个读取进程,读取文本文件并同步到数据库,去做。就可以了。
那么,问题来了,因为实际生产环境中,我们的客户是以公司形式存在的。一个公司下往往有几十辆车。一套系统往往有几千个公司,尤其是运输公司,修改进出区域非常频繁,比如快递,进货运等。那么我每次加载的时候,就面临一个问题,加载的时候车辆判定必须还在进行,因为车辆数据处理的非常多。万一正在判定的数据更新了,导致判定结果出现问题,或者程序出现访问错误,那就不行了。
那么,我就得使用进程间的锁来解决问题。
以前的写法是,使用文件作为锁的标记。实际就是文件锁。
最近重新翻读了ACE的代码,把这部分代码用ACE重新实现了一遍,发现代码简洁了不少。
下面来看看,我的代码是如何做的。(以上部分涉及到一些不能公开的逻辑,所以这里只展示例子)
为了兼容windows和linux,我在windows测试环境下,使用FileMapping,在linux下使用SV。
好了,看看,ACE如何解决这样的问题。- #pragma once
- #include "ace/OS_main.h"
- #include "ace/OS_NS_stdio.h"
- #ifdef WIN32
- #include "ace/Shared_Memory_MM.h"
- #else
- #include "ace/Shared_Memory_SV.h"
- #endif
- #include "ace/Log_Msg.h"
- #include "ace/OS_NS_unistd.h"
- #include "ace/Process_Semaphore.h"
- #include "ace/OS_NS_string.h"
- #ifdef win32
- #define PRINTF printf_s
- #else
- #define PRINTF printf
- #endif
- struct _UserData
- {
- char szData[20];
- int nData; //当前index
- int nCount; //写入次数
- bool blIsUpdate; //是否已经更新
- _UserData()
- {
- szData[0] = '\0';
- nData = 0;
- nCount = 0;
- blIsUpdate = false;
- }
- };
- class CMemWrite
- {
- public:
- CMemWrite(void);
- ~CMemWrite(void);
- bool OpenMem(key_t nkey, int nSize);
- bool WriteMemory(bool blFirst = true);
- void Close();
- private:
- ACE_Process_Semaphore* m_pPeocess;
- char* m_pData;
- #ifdef WIN32
- ACE_Shared_Memory_MM* m_pShareMemory;
- #else
- ACE_Shared_Memory_SV* m_pShareMemory;
- #endif
- };
复制代码 先说writer, 也就是我的维护进程。
#ifdef WIN32
#include "ace/Shared_Memory_MM.h"
#else
#include "ace/Shared_Memory_SV.h"
#endif
这里是根据windows还是linux自己选择共享内存的模式,SV的启动和MM是不同的,这里一会再说。
比如,我这里的数据结构是_UserData,你可以把这个想象成为车辆的数据信息。
#define SHM_SIZE sizeof(_UserData)*100
我定义一个共享内存的size,比如说就100个_UserData大小吧。
ACE_Process_Semaphore* m_pPeocess;
这个ACE_Process_Semaphore就是全局进程锁,当我修改数据的时候,我需要锁上它。- bool CMemWrite::OpenMem( key_t nkey, int nSize )
- {
- bool blIsCreate = false;
- #ifdef WIN32
- if(m_pShareMemory == NULL)
- {
- m_pShareMemory = new ACE_Shared_Memory_MM(nkey);
- }
- #else
- if(m_pShareMemory == NULL)
- {
- m_pShareMemory = new ACE_Shared_Memory_SV();
- int nError = m_pShareMemory->open(nkey, nSize, ACE_Shared_Memory_SV::ACE_OPEN);
- if(nError != 0)
- {
- //创建共享内存
- nError = m_pShareMemory->open(nkey, nSize, ACE_Shared_Memory_SV::ACE_CREATE);
- if(nError != 0)
- {
- PRINTF("[CMemWrite::Init]create share memory fail(%d).\n", errno);
- }
- blIsCreate = true;
- }
- }
- #endif
- m_pData = (char *)m_pShareMemory->malloc();
- if(NULL == m_pData)
- {
- PRINTF("[CMemWrite::OpenMem]open share memory fail.\n");
- return false;
- }
-
- if(blIsCreate == true)
- {
- ACE_OS::memset(m_pData, 0, nSize);
- PRINTF("[CMemWrite::Init]first share memory memset(%d).\n", nSize);
- }
-
- return true;
- }
复制代码 这里就是打开共享内存的方法了。
SV和MM,不同,MM第一次初始化文件的时候,实际都是0,而SV第一次初始化的时候,里面并不总是干净的内存,所以如果确认共享内存是第一次开启,我们需要初始化它。
if(blIsCreate == true)
{
ACE_OS::memset(m_pData, 0, nSize);
PRINTF("[CMemWrite::Init]first share memory memset(%d).\n", nSize);
}
这里就是初始化的地方,MM则不用。
行了,在这里,你可以加载你的共享内存初始化的方法,这里可以连接数据库,读文件,访问IO,随你。
然后,比如我每隔一段时间,写一次,这里我故意用了sleep,就是为了让大家看到进程所锁住数据不让读访问的功能。当然,你可以用读写锁,进程读写锁也是支持的。- bool CMemWrite::WriteMemory( bool blFirst /*= true*/ )
- {
- if(blFirst == true)
- {
- //如果是第一次运行,初始化动作
- m_pPeocess->acquire();
- if(NULL != m_pData)
- {
- for(int i = 0; i < 100; i++)
- {
- _UserData* pUserData = (_UserData* )(m_pData + i*sizeof(_UserData));
- pUserData->blIsUpdate = true;
- pUserData->nData = i;
- pUserData->nCount = 1;
- #ifdef WIN32
- sprintf_s(pUserData->szData, 20, "freeeyes test");
- #else
- sprintf(pUserData->szData, "freeeyes test");
- #endif
- }
- }
- else
- {
- PRINTF("[CMemRead::WriteMemory]m_pData is NULL.\n");
- return false;
- }
- ACE_Time_Value tvSleep(30, 0);
- ACE_OS::sleep(tvSleep);
- m_pPeocess->release();
- }
- else
- {
- m_pPeocess->acquire();
- ACE_Time_Value tvSleep(10, 0);
- ACE_OS::sleep(tvSleep);
- //m_pData = (char *)m_pShareMemory->malloc();
- if(NULL != m_pData)
- {
- for(int i = 0; i < 100; i++)
- {
- _UserData* pUserData = (_UserData* )(m_pData + i*sizeof(_UserData));
- pUserData->blIsUpdate = true;
- pUserData->nCount++;
- }
- }
- else
- {
- PRINTF("[CMemRead::WriteMemory]m_pData is NULL.\n");
- return false;
- }
- m_pPeocess->release();
- }
- return true;
- }
复制代码 这里就是每次write执行,我让UserData的nCount自动+1
好了,我们运行一下这个代码。- #include "MemWrite.h"
- #define SHM_SIZE sizeof(_UserData)*100
- int main(int argc, char* argv[])
- {
- CMemWrite objMemWrite;
- //打开一个共享内存
-
- #ifdef WIN32
- key_t objKey = "1234";
- #else
- key_t objKey = 1234;
- #endif
- if(true == objMemWrite.OpenMem(objKey, SHM_SIZE))
- {
- PRINTF("[Main]open share memory ok.\n");
- }
- bool blFirst = true;
- while(true)
- {
- PRINTF("[Main]Write begin.\n");
- objMemWrite.WriteMemory(blFirst);
- blFirst = false;
- PRINTF("[Main]Write end.\n");
- }
- return 0;
- }
复制代码 行了,维护进程写完了。下面来看看读取处理进程如何实现。
在这里,我的读取进程,每次扫描一下所有的UserData(),并写入文件。这样我们能直观的看到数据变化结果。- int main(int argc, char* argv[])
- {
- CMemRead objMemRead;
- //打开一个共享内存
- #ifdef WIN32
- key_t objKey = "1234";
- #else
- key_t objKey = 1234;
- #endif
- if(true == objMemRead.OpenMem(objKey, SHM_SIZE))
- {
- PRINTF("[Main]open share memory ok.\n");
- }
- while(true)
- {
- PRINTF("[Main]Read begin.\n");
- objMemRead.ReadMemory();
- PRINTF("[Main]Read end.\n");
-
- ACE_Time_Value tvSleep(1, 0);
- ACE_OS::sleep(tvSleep);
- }
- getchar();
- return 0;
- }
复制代码 ReadMemory()方法便是我每次扫描数据的结果。
实现如下:- bool CMemRead::ReadMemory()
- {
- m_pPeocess->acquire();
- //m_pData = (char *)m_pShareMemory->malloc();
- if(NULL != m_pData)
- {
- for(int i = 0; i < 100; i++)
- {
- _UserData* pUserData = (_UserData* )(m_pData + i*sizeof(_UserData));
- if(pUserData->blIsUpdate == true)
- {
- //发现更新,写入文件
- FILE* pFile = ACE_OS::fopen(ACE_TEXT("freeeyes.txt"), ACE_TEXT("ab+"));
- if(NULL == pFile)
- {
- PRINTF("[CMemRead::ReadMemory]pFile is NULL(%d).\n", errno);
- return false;
- }
- char szFileLog[200] = {'\0'};
- #ifdef WIN32
- sprintf_s(szFileLog, 200, "nData=%d, szData=%s, nCount=%d.\n", pUserData->nData, pUserData->szData, pUserData->nCount);
- #else
- sprintf(szFileLog, "nData=%d, szData=%s, nCount=%d.\n", pUserData->nData, pUserData->szData, pUserData->nCount);
- #endif
- size_t szFile = ACE_OS::fwrite(szFileLog, sizeof(char), strlen(szFileLog), pFile);
- if(szFile != strlen(szFileLog))
- {
- PRINTF("[CMemRead::ReadMemory]szFile is error.\n");
- ACE_OS::fclose(pFile);
- return false;
- }
- ACE_OS::fclose(pFile);
- pUserData->blIsUpdate = false;
- }
- }
- }
- else
- {
- PRINTF("[CMemRead::ReadMemory]m_pData is NULL.\n");
- return false;
- }
- m_pPeocess->release();
- return true;
- }
复制代码 同样,这里我使用了进程锁。
好了,编译运行一下,你就看到了读写互斥的共享内存访问。
下一讲,我会讲一些共享内存的使用小技巧。
此代码在win7和linux下编译运行通过。
|
本帖子中包含更多资源
您需要 登录 才可以下载或查看,没有账号?用户注册
×
|