freeeyes 发表于 2013-8-28 15:40:20

利用ACE实现共享内存的管理(一)

对于共享内存,我想大家都不陌生,windows和linux下的共享内存有些许差异,但总体而言实现的结构是差不多的。
以前我写过一些关于共享内存的运用和研究。最近结合ACE重新实现了一个更为简便的,纵然ACE的例子有,但是实用性实际并不是很全,更多的是展示API功能而已。
就实际工作来说,共享内存确实实现了跨进程的访问。但是,这样也带来了一些管理的问题,比如,一个程序正在改写共享内存的时候,另一个程序正在访问,那么会不会出问题?还有就是如何保证数据完整性?如果使用锁的话,会不会大幅降低系统的性能?
结合这些实际需求,我一步步来解释如何使用ACE来帮我们解决以上的问题。
比如,我收到了这么一个需求,我有10000量汽车,每时每刻都在给我传定位经纬度数据。我需要根据这些经纬度,去计算这些车是否进入或者驶出了某些用户指定的区域,这些区域包括圆形,矩形,多边形和线路。因为数据上传量非常大,几乎每辆车都有自己的区域设置(一辆车几十个都是有可能的)。那么,光一次从数据库加载到内存里,就要消耗十几分钟。如果我的计算判定程度崩溃了,那么重启的代价是,十几分钟内的所有边界判定都会出问题,这样在实际使用中是不允许的。
那么,怎么办?
使用共享内存。
这里,我需要两个进程,一个是后台维护共享内存的进程,一个是实际计算使用的共享内存进程(可能是若干个)。
我的维护进程作用是,第一次共享内存不存在的时候,从数据库把数据区域加载到共享内存,诚然,这个过程很缓慢,需要实际分钟甚至更多。我们不妨把目光放远一些,100万辆车,加载个把小时都是可能的。
那么,在机器启动的时候(我的实际生产环境是Linux和AIX),我这个进程就会随之启动,第一次加载共享内存。此后,维护进程就只对更新的数据做加载而已。方法有两个,定期加载,比如1小时一次,或者实时加载,用户点击确认的时候,发给维护进程一个消息,让它去加载那些被修改的区域内存。而在实际使用中,在大量用户操作的情况下,定时加载比实时加载更具备实用性。你不可能让i的机器无限制的访问DB。那么别的事情可以不用干了。
而实际计算的进程们,他们负责处理比较数据。他们是不和数据库打交道的。而是直接计算结果,如果有符合用户条件的结果,把数据按照一定格式写成文本文件,交给另外一个读取进程,读取文本文件并同步到数据库,去做。就可以了。
那么,问题来了,因为实际生产环境中,我们的客户是以公司形式存在的。一个公司下往往有几十辆车。一套系统往往有几千个公司,尤其是运输公司,修改进出区域非常频繁,比如快递,进货运等。那么我每次加载的时候,就面临一个问题,加载的时候车辆判定必须还在进行,因为车辆数据处理的非常多。万一正在判定的数据更新了,导致判定结果出现问题,或者程序出现访问错误,那就不行了。
那么,我就得使用进程间的锁来解决问题。
以前的写法是,使用文件作为锁的标记。实际就是文件锁。
最近重新翻读了ACE的代码,把这部分代码用ACE重新实现了一遍,发现代码简洁了不少。
下面来看看,我的代码是如何做的。(以上部分涉及到一些不能公开的逻辑,所以这里只展示例子)
为了兼容windows和linux,我在windows测试环境下,使用FileMapping,在linux下使用SV。
好了,看看,ACE如何解决这样的问题。#pragma once

#include "ace/OS_main.h"
#include "ace/OS_NS_stdio.h"
#ifdef WIN32
#include "ace/Shared_Memory_MM.h"
#else
#include "ace/Shared_Memory_SV.h"
#endif

#include "ace/Log_Msg.h"
#include "ace/OS_NS_unistd.h"
#include "ace/Process_Semaphore.h"
#include "ace/OS_NS_string.h"

#ifdef win32
#define PRINTF printf_s
#else
#define PRINTF printf
#endif

struct _UserData
{
        char szData;
        intnData;      //当前index
        intnCount;       //写入次数
        bool blIsUpdate;   //是否已经更新

        _UserData()
        {
                szData= '\0';
                nData      = 0;
                nCount   = 0;
                blIsUpdate = false;
        }
};

class CMemWrite
{
public:
        CMemWrite(void);
        ~CMemWrite(void);

        bool OpenMem(key_t nkey, int nSize);
        bool WriteMemory(bool blFirst = true);

        void Close();

private:
        ACE_Process_Semaphore* m_pPeocess;
        char* m_pData;
#ifdef WIN32       
        ACE_Shared_Memory_MM* m_pShareMemory;
#else
ACE_Shared_Memory_SV* m_pShareMemory;
#endif       
};先说writer, 也就是我的维护进程。
#ifdef WIN32
#include "ace/Shared_Memory_MM.h"
#else
#include "ace/Shared_Memory_SV.h"
#endif
这里是根据windows还是linux自己选择共享内存的模式,SV的启动和MM是不同的,这里一会再说。
比如,我这里的数据结构是_UserData,你可以把这个想象成为车辆的数据信息。
#define SHM_SIZE sizeof(_UserData)*100
我定义一个共享内存的size,比如说就100个_UserData大小吧。
ACE_Process_Semaphore* m_pPeocess;
这个ACE_Process_Semaphore就是全局进程锁,当我修改数据的时候,我需要锁上它。bool CMemWrite::OpenMem( key_t nkey, int nSize )
{
        bool blIsCreate = false;
#ifdef WIN32
        if(m_pShareMemory == NULL)
        {
                m_pShareMemory = new ACE_Shared_Memory_MM(nkey);
        }
#else
        if(m_pShareMemory == NULL)
        {
                m_pShareMemory = new ACE_Shared_Memory_SV();
                int nError = m_pShareMemory->open(nkey, nSize, ACE_Shared_Memory_SV::ACE_OPEN);
                if(nError != 0)
                {
                        //创建共享内存
                        nError = m_pShareMemory->open(nkey, nSize, ACE_Shared_Memory_SV::ACE_CREATE);
                        if(nError != 0)
                        {
                                PRINTF("create share memory fail(%d).\n", errno);
                        }
                        blIsCreate = true;
                }                       
        }       
#endif
        m_pData = (char *)m_pShareMemory->malloc();
        if(NULL == m_pData)
        {
                PRINTF("open share memory fail.\n");
                return false;
        }
       
        if(blIsCreate == true)
        {
                ACE_OS::memset(m_pData, 0, nSize);
                PRINTF("first share memory memset(%d).\n", nSize);
        }       
       

        return true;
}这里就是打开共享内存的方法了。
SV和MM,不同,MM第一次初始化文件的时候,实际都是0,而SV第一次初始化的时候,里面并不总是干净的内存,所以如果确认共享内存是第一次开启,我们需要初始化它。
if(blIsCreate == true)
{
ACE_OS::memset(m_pData, 0, nSize);
PRINTF("first share memory memset(%d).\n", nSize);
}
这里就是初始化的地方,MM则不用。
行了,在这里,你可以加载你的共享内存初始化的方法,这里可以连接数据库,读文件,访问IO,随你。
然后,比如我每隔一段时间,写一次,这里我故意用了sleep,就是为了让大家看到进程所锁住数据不让读访问的功能。当然,你可以用读写锁,进程读写锁也是支持的。bool CMemWrite::WriteMemory( bool blFirst /*= true*/ )
{
        if(blFirst == true)
        {
                //如果是第一次运行,初始化动作
                m_pPeocess->acquire();

                if(NULL != m_pData)
                {
                        for(int i = 0; i < 100; i++)
                        {
                                _UserData* pUserData = (_UserData* )(m_pData + i*sizeof(_UserData));
                                pUserData->blIsUpdate = true;
                                pUserData->nData      = i;
                                pUserData->nCount   = 1;
#ifdef WIN32                               
                                sprintf_s(pUserData->szData, 20, "freeeyes test");
#else
                                sprintf(pUserData->szData, "freeeyes test");
#endif
                        }
                }
                else
                {
                        PRINTF("m_pData is NULL.\n");
                        return false;
                }

                ACE_Time_Value tvSleep(30, 0);
                ACE_OS::sleep(tvSleep);

                m_pPeocess->release();
        }
        else
        {
                m_pPeocess->acquire();

                ACE_Time_Value tvSleep(10, 0);
                ACE_OS::sleep(tvSleep);

                //m_pData = (char *)m_pShareMemory->malloc();
                if(NULL != m_pData)
                {
                        for(int i = 0; i < 100; i++)
                        {
                                _UserData* pUserData = (_UserData* )(m_pData + i*sizeof(_UserData));
                                pUserData->blIsUpdate = true;
                                pUserData->nCount++;
                        }
                }
                else
                {
                        PRINTF("m_pData is NULL.\n");
                        return false;
                }
                m_pPeocess->release();
        }

        return true;
}这里就是每次write执行,我让UserData的nCount自动+1
好了,我们运行一下这个代码。#include "MemWrite.h"

#define SHM_SIZE sizeof(_UserData)*100

int main(int argc, char* argv[])
{
        CMemWrite objMemWrite;

        //打开一个共享内存
       
#ifdef WIN32
        key_t objKey = "1234";
#else
        key_t objKey = 1234;
#endif

        if(true == objMemWrite.OpenMem(objKey, SHM_SIZE))
        {
                PRINTF("open share memory ok.\n");
        }

        bool blFirst = true;

        while(true)
        {
                PRINTF("Write begin.\n");
                objMemWrite.WriteMemory(blFirst);
                blFirst = false;
                PRINTF("Write end.\n");
        }

        return 0;
}行了,维护进程写完了。下面来看看读取处理进程如何实现。
在这里,我的读取进程,每次扫描一下所有的UserData(),并写入文件。这样我们能直观的看到数据变化结果。int main(int argc, char* argv[])
{
        CMemRead objMemRead;

        //打开一个共享内存
#ifdef WIN32
        key_t objKey = "1234";
#else
        key_t objKey = 1234;
#endif       
        if(true == objMemRead.OpenMem(objKey, SHM_SIZE))
        {
                PRINTF("open share memory ok.\n");
        }

        while(true)
        {
                PRINTF("Read begin.\n");
                objMemRead.ReadMemory();
                PRINTF("Read end.\n");
               
                ACE_Time_Value tvSleep(1, 0);
                ACE_OS::sleep(tvSleep);
        }

        getchar();
        return 0;
}
ReadMemory()方法便是我每次扫描数据的结果。
实现如下:bool CMemRead::ReadMemory()
{
        m_pPeocess->acquire();

        //m_pData = (char *)m_pShareMemory->malloc();
        if(NULL != m_pData)
        {
                for(int i = 0; i < 100; i++)
                {
                        _UserData* pUserData = (_UserData* )(m_pData + i*sizeof(_UserData));
                        if(pUserData->blIsUpdate == true)
                        {
                                //发现更新,写入文件
                                FILE* pFile = ACE_OS::fopen(ACE_TEXT("freeeyes.txt"), ACE_TEXT("ab+"));
                                if(NULL == pFile)
                                {
                                        PRINTF("pFile is NULL(%d).\n", errno);
                                        return false;
                                }

                                char szFileLog = {'\0'};
#ifdef WIN32
                                sprintf_s(szFileLog, 200, "nData=%d, szData=%s, nCount=%d.\n", pUserData->nData, pUserData->szData, pUserData->nCount);
#else
                                sprintf(szFileLog, "nData=%d, szData=%s, nCount=%d.\n", pUserData->nData, pUserData->szData, pUserData->nCount);
#endif
                                size_t szFile = ACE_OS::fwrite(szFileLog, sizeof(char), strlen(szFileLog), pFile);
                                if(szFile != strlen(szFileLog))
                                {
                                        PRINTF("szFile is error.\n");
                                        ACE_OS::fclose(pFile);
                                        return false;
                                }

                                ACE_OS::fclose(pFile);
                                pUserData->blIsUpdate = false;
                        }
                }
        }
        else
        {
                PRINTF("m_pData is NULL.\n");
                return false;
        }
        m_pPeocess->release();

        return true;
}同样,这里我使用了进程锁。
好了,编译运行一下,你就看到了读写互斥的共享内存访问。
下一讲,我会讲一些共享内存的使用小技巧。
此代码在win7和linux下编译运行通过。





页: [1]
查看完整版本: 利用ACE实现共享内存的管理(一)