找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 3595|回复: 0

多级缓冲体系的构建(基于PSS)

[复制链接]
发表于 2013-11-12 11:49:43 | 显示全部楼层 |阅读模式
由于业务需要,最近开始实现多级缓冲的数据体系。
因为是多台服务器的集群,所以,一开始考虑的是使用memcached。但是实际上,测试结果不是很尽如人意。这本身并不是memcached的问题,而是业务场景的问题。
我的业务是:
每时每刻都有成千上万的GPS定位数据到达服务器,用于分析,比如,比较是否进出各种区域(圆形,矩形和多边形),是否超速,是否按照路线行驶,疲劳驾驶时间,累计驾驶时间,休息提醒,油量累计,里程限制等等,这一个个的逻辑都需要处理,并且在某些条件触发的时候,及时通知下行的GPS终端做出事件响应。
如果一开始只有几十辆车,这个没什么问题。但是如果上到几十万辆,数据的处理速度就会凸显出问题。每秒到达的定位信息都在1万条以上,要对海量的数据对比分析,是一件很繁重而且复杂的工作。这里还涉及到了用户动态添加和删除指定的规则。服务器如何达到更快的相应程度?
前一段时间写了一篇二级缓冲的文章,就是源于解决此问题。
那么让我们以这个实际案例来看看,我们需要达到什么:
(1)达到数据加载最快,用户设置的新的规则会在最快的时间刷入各个服务器。
(2)必须能够准确反映各个事件的处理情况,数据加载时间尽可能的短暂。
(3)按照号段(手机号段)负载均衡,分压前端集群的数据到达处理压力。
(4)24*7的支持,必须做到在业务进程崩溃的情况下,最快速的恢复服务。
(5)达到每台服务器处理每秒至少1万条数据。
那么综合以上的问题,我会想:
(1)首先数据的加载必须分级别,什么是最重要的事件需要优先处理?
(2)分布式的服务器体系,按照某一个号段(手机号段)进行分割,不同的数据号段命中不同的业务服务器。
(3)采用什么方式能在进程宕机的时候,最快速的恢复服务?
我先看一下我的一个数据块,也就是一个手机号对应的数据规则有多大?
一个手机号,也就是一个终端的规则事件,需要75K的内存大小,看看还是不小的,主要这么大的内存规则是线路关键点,区域个数等等。一辆车最多可以设置50条线路,1000个区域。
这部分不对于我是重点,我一开始考虑memcached的实现方案,因为我考虑我的服务器都是内网服务器,我可以上千兆网卡,考虑到如果我需要达成每秒10000条数据的处理,那么,我至少需要750M每秒的带宽,但是,要命的是,我不可能让memcached吃满我的网络IO,毕竟,我还需要很多其他数据流入流出的IO带宽,而且,就算后面用memcached集群,在高密度命中的模式下,memcached的最大问题是IO,而不是命中。
需求分析的关键之一,就是要有能力把握实际业务场景,效率来源于实际的场景,而不是某些简单的测试。
经过仔细的分析,我觉得memcached方案并不能满足实际我的业务场景需求。
其实仔细想一下,我的业务也很简单,因为我的数据来源是固定的,那么我可以提前加载好数据来源,在本地共享内存中加载。找不到的,再去IO去找。
那么,共享内存本身没什么问题。之所以使用它,是我考虑一个watch的组件(具体可以参看我之前的一篇关于二级缓存的文章)。我需要定时去获得在IO(这里可能是数据库,也可能是文件,或者其他第三方的IO接口)的数据变化,同时,以最小的同步代价更新到现有的共享内存中去,第二个,watch提供的最重要的功能就是IO定向。实际业务逻辑进程只和共享内存打交道。找不到的,再去向watch要,watch拿到数据后放入共享内存并通知业务进程去读取。
这样做的好处是,完全剥离了数据源和逻辑之间的关系。
逻辑只负责和共享内存打交道,这一层封装好的话,给外面的接口,业务开发者都可以不用知道共享内存的存在,只当做一个单件来操作即可了。
这里还有一个要点,就是共享内存如果不够大,不够放入我所有的号段信息数据怎么办?要知道,我开发的平台,是给省级的车辆GPS管理服务的。我不可能让几百万的车辆数据全部流入服务器,这是不公平的,从数据的线性流量来看,在一段时间内车辆上线的情况都是稳定的,比如白天跑的车晚上一般不跑,跑夜班的车白天一般不跑,这是有规律的。那么也就是说,数据是有钝化的。在某一时刻,实时在线的车辆实际就那么十几万辆。我完全可以借助淘汰算法,达到共享内存的最优配置。
什么是淘汰算法?
如果我的服务器在启动的时候,watch先将最活跃,也就是最后使用过的车辆信息,加载到共享内存中,比如最后活跃的100000量车的事件数据。加载到共享内存,当有一个不在共享内存中的车到达的时候,我应该删除最后最不常用的车辆数据。加载当前最新的车辆数据。
这就是一个简单的LRU算法问题。
但是,在这样的交换过程中,什么是影响效能的主要杀手?
需求分析的关键之一,了解自己设计的架构在什么阶段会有瓶颈,并根据需求决定瓶颈是否是需要优先解决的。
毕竟,如果在命中不到我的共享内存的时候,我至少需要支付四次IO成本,顺序是,请求到达watch, watch到达第三方数据源并获得数据,数据源返回数据到watch,watch通知业务去获得。
IO的时间成本是最难预计的,因为你不知道你的IO会在多久返回。但是这部分成本又是必须的,考虑到综合情况,在服务器硬件正常的时候,IO流量是一个稳定的值,当这个值超过一定数量的时候才会引起时间波动,那么,掌握这个度就好了。控制watch的命中量。如果你的数据在一上线的时候大量的命中了watch,只能说明你的共享内存watch初始化策略是一个及其失败的作品。
忽略了IO成本,那么剩下的就是,如果watch成功替换了共享内存中的一个数据块,那么我如何以最小的代价通知业务逻辑进程更新它所在的索引?
我不可能让watch更新了一块共享内存的时候,业务机重新加载所有的共享内存节点,这样的消耗是我承受不起的。
那么我就需要一个与共享内存数据块和业务进程数据块链表的一个对应关系,当某一块数据发生改变,业务进程可以最快速的定位替换当前数据块的状态。
举个例子来说。
如果我在程序启动的时候,首先缓冲了100000个数据的共享内存大小。那么此时此刻,这100000个数据都不在命中状态。当业务进程启动,会有大量的数据请求流入,业务进程会根据这些请求,标记其中10000个车辆的数据已经正在使用。那么剩下的90000个数据实际是空闲的。当有一辆全新的车辆手机号到达的时候,我去这90000条数据里命中,如果命中了,则标记成使用中,如果没有命中,请求watch去数据源查找,并替换90000个数据中最后最不常用的数据。
这样,我就能实现最大程度的数据减压,尽量把90%的数据请求命中在本地缓冲中。就算随机出现一批车辆临时上线,我也能保证我的数据在最大可能的情况下减少访问IO的过程。
另一个问题是,如果数据源更新了,我如何通知业务共享内存更新?
这个工作也可以watch来做,因为我的实际情况是,用户是通过web来更新添加修改删除一些事件的。
我可以选择两种做法
(1)实时更新,前端页面发现用户修改了数据,盗用socket一个端口让watch去加载一条数据。
(2)定时更新,定时遍历一些数据的改变。
实际我取的是后者,毕竟,前端页面去写一个socket会比较耗时,不能增加前端工程师们的压力,先以达到目标为准。
需求分析的关键之一,什么是你现阶段可以舍弃的,什么是你必须保留的,必须清楚明确。
如果完全想做到完美,是不可能的,必须历经一次次的迭代升级,逐步优化。所以最怕的就是停而不做。只存在于空想,想的很美,和做的很好是两回事。
好了,以上面的计划,我开始实现一个可以满足上述需求的小例子,并测试性能是否能达到我的目标。
共享内存的基础调用不在这里讨论,有兴趣的话可以看我以前关于共享内存使用的帖子。
由于涉及到一些公司机密,所以实际逻辑判断我不在这里给出,只给出一个可以测试的需求用例(以用户名和密码验证为准)。
以下代码,在PSS作为一个测试插件提供,windows7和linux下测试通过。(详细代码可以从PSS的google code上获得)
首先,我要声明一个基类,这里明确了我要做的事情。
  1. //抽象Cache层的管理
  2. //抽象公共层
  3. //add by freeeyes
  4. #include "SMOption.h"
  5. class CCacheManager
  6. {
  7. public:
  8.         CCacheManager() {};
  9.         ~CCacheManager() {};
  10.         //***********************************************
  11.         //进程第一次打开缓冲
  12.         //u4CachedCount : 缓冲块的个数
  13.         //objMemorykey  :共享内存key
  14.         //u4CheckSize   :单块缓冲大小
  15.         //***********************************************
  16.         bool Init(uint32 u4CachedCount, key_t objMemorykey, uint32 u4CheckSize)
  17.         {
  18.                 bool blIsCreate = true;
  19.                 bool blIsOpen   = true;
  20.                 Set_Cache_Count(u4CachedCount);
  21.                 blIsOpen = m_SMOption.Init(objMemorykey, u4CheckSize, (uint16)Get_Cache_Count(), blIsCreate);
  22.                 if(false == blIsOpen)
  23.                 {
  24.                         return false;
  25.                 }
  26.                 if(blIsCreate == true)
  27.                 {
  28.                         //共享内存第一次创建,需要从文件里面重建共享内存
  29.                         Read_All_Init_DataResoure();
  30.                 }
  31.                 else
  32.                 {
  33.                         //共享内存已经存在,遍历获得列表
  34.                         Read_All_From_CacheMemory();
  35.                 }
  36.                 return true;
  37.         };
  38.         //***********************************************
  39.         //关闭缓冲管理器,需要继承子类去实现之
  40.         //***********************************************        
  41.         virtual void Close() {};
  42.         //***********************************************
  43.         //定时同步数据源和缓冲中的数据,需要继承子类去实现之
  44.         //***********************************************        
  45.         virtual void Sync_DataReaource_To_Memory() {};
  46. protected:
  47.         //***********************************************
  48.         //当缓冲不存在,第一次初始化缓冲,需要继承子类去实现之
  49.         //***********************************************        
  50.         virtual bool Read_All_Init_DataResoure() { return true; };
  51.         //***********************************************
  52.         //当缓冲存在,从缓冲中还原对应关系,需要继承子类去实现之
  53.         //***********************************************        
  54.         virtual bool Read_All_From_CacheMemory() { return true; };
  55.         //***********************************************
  56.         //开始同步的一些初始化动作
  57.         //***********************************************
  58.         virtual void Begin_Sync_DataReaource_To_Memory() {};
  59.         //***********************************************
  60.         //同步完成后的一些动作
  61.         //***********************************************
  62.         virtual void End_Sync_DataReaource_To_Memory() {};
  63.         //***********************************************
  64.         //设置缓冲个数
  65.         //***********************************************
  66.         void Set_Cache_Count(uint32 u4CacheCount)
  67.         {
  68.                 m_u4Count = u4CacheCount;
  69.         };
  70.         //***********************************************
  71.         //得到当前缓冲个数
  72.         //***********************************************
  73.         uint32 Get_Cache_Count()
  74.         {
  75.                 return m_u4Count;
  76.         };
  77.         //***********************************************
  78.         //得到指定位置的缓冲数据
  79.         //***********************************************
  80.         _CacheBlock* Get_CacheBlock_By_Index(uint32 u4Index)
  81.         {
  82.                 return (_CacheBlock* )m_SMOption.GetBuffer(u4Index);
  83.         };
  84.         //***********************************************
  85.         //设置缓冲区全部加载成功标记位
  86.         //***********************************************
  87.         void Set_Memory_Init_Success()                       
  88.         {
  89.                 m_SMOption.SetMemoryState(READERINITSTATED);
  90.         };
  91. private:
  92.         CSMOption    m_SMOption;
  93.         uint32       m_u4Count;
  94. };
复制代码
这里列出了我需要做的几个关键步骤。
对于watch,它需要同步数据事件,数据源加载事件,第一次启动数据加载事件。
这部分的事件需要自己去实现处理,根据你的数据规则。
还有一些小东西是可以封装的,比如lru淘汰算法模块,这部分完全可以作为通用化剥离出来。
  1. //用于LRU的算法类
  2. //共享内存数据淘汰规则
  3. template<class K>
  4. class CCachedLRUList
  5. {
  6. public:
  7.         CCachedLRUList()
  8.         {
  9.                 m_u4CheckIndex     = 0;
  10.                 m_u4MaxCachedCount = 0;
  11.         };
  12.         ~CCachedLRUList()
  13.         {
  14.         };
  15.         void Set_Lru_Max_Count(uint32 u4MaxCachedCount)
  16.         {
  17.                 m_u4MaxCachedCount = u4MaxCachedCount;
  18.         }
  19.         //检测是否需要开始LRU,如果需要则返回最后要删除的
  20.         bool Check_Cached_Lru(K& lrukey)
  21.         {
  22.                 if(m_mapKey.size() < m_u4MaxCachedCount)
  23.                 {
  24.                         return false;
  25.                 }
  26.                 else
  27.                 {
  28.                         //找到Lru应该弃掉的key
  29.                         lrukey = (K&)m_mapTimeStamp.begin()->second;
  30.                         return true;
  31.                 }
  32.         };
  33.         //从队列里删除指定的key
  34.         bool Delete_Cached_Lru(K lrukey)
  35.         {
  36.                 mapKey::iterator f = m_mapKey.find(lrukey);
  37.                 if(f == m_mapKey.end())
  38.                 {
  39.                         return false;
  40.                 }
  41.                 uint32 u4Index = (uint32)f->second;
  42.                 m_mapKey.erase(f);
  43.                 mapTimeStamp::iterator ft = m_mapTimeStamp.find(u4Index);
  44.                 if(ft == m_mapTimeStamp.end())
  45.                 {
  46.                         return false;
  47.                 }
  48.                 m_mapTimeStamp.erase(ft);
  49.                 //删除Index和key之间的对应关系
  50.                 mapKey2Index::iterator fi = m_mapKey2Index.find(lrukey);
  51.                 if(fi != m_mapKey2Index.end())
  52.                 {
  53.                         uint32 u4CachedIndex = (uint32)fi->second;
  54.                         //删除对应关系
  55.                         m_mapKey2Index.erase(fi);
  56.                         //删除另一个map
  57.                         mapIndex2Key::iterator fii = m_mapIndex2Key.find(u4CachedIndex);
  58.                         if(fii != m_mapIndex2Key.end())
  59.                         {
  60.                                 m_mapIndex2Key.erase(fii);
  61.                         }
  62.                 }
  63.                 return true;
  64.         };
  65.         //添加一个key,如果已经存在则提升到队列最前面去
  66.         EM_LRUReturn Add_Cached_Lru(K lrukey, uint32 u4CachedIndex)
  67.         {
  68.                 mapKey::iterator f = m_mapKey.find(lrukey);
  69.                 if(f == m_mapKey.end())
  70.                 {
  71.                         //如果是新key,判断是否需要进行LRU检测
  72.                         if(m_mapKey.size() > m_u4MaxCachedCount)
  73.                         {
  74.                                 return LRU_NEED_CHECK;
  75.                         }
  76.                         //添加新的key
  77.                         uint32 u4Index = m_u4CheckIndex++;
  78.                         m_mapKey.insert(mapKey::value_type(lrukey, u4Index));
  79.                         m_mapTimeStamp.insert(mapTimeStamp::value_type(u4Index, lrukey));
  80.                         m_mapKey2Index.insert(mapKey2Index::value_type(lrukey, u4CachedIndex));
  81.                         m_mapIndex2Key.insert(mapIndex2Key::value_type(u4CachedIndex, lrukey));
  82.                         return LRU_UNNEED_CHECK;
  83.                 }
  84.                 else
  85.                 {
  86.                         //如果key已经存在,则更新key的时间戳,也就是u4Index;
  87.                         uint32& u4CurrIndex = (uint32&)f->second;
  88.                         //删除旧的key,添加新的key
  89.                         mapTimeStamp::iterator ft = m_mapTimeStamp.find(u4CurrIndex);
  90.                         m_mapTimeStamp.erase(ft);
  91.                         u4CurrIndex = m_u4CheckIndex++;
  92.                         m_mapTimeStamp.insert(mapTimeStamp::value_type(u4CurrIndex, lrukey));
  93.                         return LRU_UNNEED_CHECK;
  94.                 }
  95.         }
  96.         //根据实际情况更新Lru的对应index和key的列表
  97.         bool Reload_Cached_IndexList(K lrukey, K& lruBeforekey, uint32 u4CachedIndex)
  98.         {
  99.                 //寻找之前的Index对应的key并修改之
  100.                 mapIndex2Key::iterator fii = m_mapIndex2Key.find(u4CachedIndex);
  101.                 if(fii == m_mapIndex2Key.end())
  102.                 {
  103.                         return false;
  104.                 }
  105.                 lruBeforekey = (K)fii->second;
  106.                 mapKey2Index::iterator fi = m_mapKey2Index.find(lruBeforekey);
  107.                 if(fi != m_mapKey2Index.end())
  108.                 {
  109.                         m_mapKey2Index.erase(fi);
  110.                         //添加新的key对应关系
  111.                         m_mapKey2Index.insert(mapKey2Index::value_type(lrukey, u4CachedIndex));
  112.                 }
  113.                 m_mapIndex2Key[u4CachedIndex] = lrukey;
  114.                 //测试代码
  115.                 //DisPlay_Index2Key();
  116.                 //DisPlay_Key2Index();
  117.                 return true;
  118.         }
  119.         //获得指定位置的Index对应信息
  120.         bool Get_Cached_KeyByIndex(uint32 u4CachedIndex, K& lrukey)
  121.         {
  122.                 mapIndex2Key::iterator fii = m_mapIndex2Key.find(u4CachedIndex);
  123.                 if(fii == m_mapIndex2Key.end())
  124.                 {
  125.                         return false;
  126.                 }
  127.                 else
  128.                 {
  129.                         lrukey = (K )fii->second;
  130.                         return true;
  131.                 }
  132.         }
  133. private:
  134.         //用于测试显示数据映射内容
  135.         void DisPlay_Index2Key()
  136.         {
  137.                 OUR_DEBUG((LM_INFO, "[DisPlay_Index2Key]*****Begin DisPlay*****\n"));
  138.                 for(mapIndex2Key::iterator b = m_mapIndex2Key.begin(); b != m_mapIndex2Key.end(); b++)
  139.                 {
  140.                         OUR_DEBUG((LM_INFO, "[DisPlay_Index2Key]key=%s.\n", ((string)(b->second)).c_str()));
  141.                 }
  142.                 OUR_DEBUG((LM_INFO, "[DisPlay_Index2Key]*****End DisPlay*****\n"));
  143.         }
  144.         //用于测试显示数据映射内容
  145.         void DisPlay_Key2Index()
  146.         {
  147.                 OUR_DEBUG((LM_INFO, "[DisPlay_Key2Index]*****Begin DisPlay*****\n"));
  148.                 for(mapKey2Index::iterator b = m_mapKey2Index.begin(); b != m_mapKey2Index.end(); b++)
  149.                 {
  150.                         OUR_DEBUG((LM_INFO, "[DisPlay_Key2Index]key=%s.\n", ((string)(b->first)).c_str()));
  151.                 }
  152.                 OUR_DEBUG((LM_INFO, "[DisPlay_Key2Index]*****End DisPlay*****\n"));
  153.         }
  154. private:
  155.         typedef map<K, uint32> mapKey;       //key与Version对应关系
  156.         typedef map<uint32, K> mapTimeStamp; //Version和key的对应关系
  157.         typedef map<K, uint32> mapKey2Index; //记录Index和key的关系
  158.         typedef map<uint32, K> mapIndex2Key; //记录key和Index的关系
  159.         mapKey       m_mapKey;
  160.         mapTimeStamp m_mapTimeStamp;
  161.         mapKey2Index m_mapKey2Index;
  162.         mapIndex2Key m_mapIndex2Key;
  163.         uint32       m_u4CheckIndex;
  164.         uint32       m_u4MaxCachedCount;
  165. };
  166. #endif
复制代码
对于共享内存的数据块,我需要里面有一些共享内存标记的参数,所以,所有涉及共享内存的数据对象,都要继承_CacheBlock类
  1. //用于缓冲当前状态的数据结构
  2. struct _CacheBlock
  3. {
  4. private:
  5.         EM_CheckState       m_emState;           //当前维护状态,对应EM_CheckState
  6.         EM_CACHED_USE_STATE m_emIsUsed;          //在线标记,对应EM_CACHED_USE_STATE状态
  7.         bool                m_blDelete;          //删除标记为,Watch进程维护这个标记,false为正在使用,true为该数据已经删除
  8.         uint32              m_u4CacheIndex;      //当前数据块的块ID
  9. public:
  10.         _CacheBlock()
  11.         {
  12.                 m_emState       = CHECKS_HIT;
  13.                 m_blDelete      = false;
  14.                 m_emIsUsed      = CACHEDUNUSED;
  15.                 m_u4CacheIndex  = 0;
  16.         }
  17.         //设置块ID
  18.         void SetCacheIndex(uint32 u4CacheIndex)
  19.         {
  20.                 m_u4CacheIndex = u4CacheIndex;
  21.         }
  22.         //获得块ID
  23.         uint32 GetCacheIndex()
  24.         {
  25.                 return m_u4CacheIndex;
  26.         }
  27.         
  28.         //被数据源命中
  29.         void SetHit()
  30.         {
  31.                 m_blDelete      = false;
  32.                 m_emState       = CHECKS_HIT;
  33.         }
  34.         //没有被数据源命中
  35.         void SetUnHit()
  36.         {
  37.                 m_blDelete     = true;
  38.                 m_emState      = CHECKS_UNHIT;
  39.         }
  40.         //设置删除状态
  41.         void SetDelete(bool blDelete)
  42.         {
  43.                 m_blDelete = blDelete;
  44.         }
  45.         //设置共享内存没有被逻辑进程使用
  46.         void SetUsed()
  47.         {
  48.                 m_emIsUsed = CACHEDUSED;
  49.         }
  50.         //设置共享内存没有被逻辑进程使用
  51.         void SetUnUsed()
  52.         {
  53.                 m_emIsUsed = CACHEDUNUSED;
  54.         }
  55.         //得到删除状态
  56.         bool GetDelete()
  57.         {
  58.                 return m_blDelete;
  59.         }
  60.         //得到是否命中状态
  61.         EM_CheckState GetCheckState()
  62.         {
  63.                 return m_emState;
  64.         }
  65.         //设置命中状态
  66.         void SetCheckState(EM_CheckState objCheckState)
  67.         {
  68.                 m_emState = objCheckState;
  69.         }
  70. };
复制代码
比如,我的数据对象是一个简单的用户名和密码验证的数据结构。
包含了用户名和密码
那么我的结构是:
  1. //这里如果是想用缓冲功能,必须继承_CacheBlock对象
  2. struct _UserValid : public _CacheBlock
  3. {
  4.         //缓冲的数据结构
  5.         char   m_szUserName[MAX_BUFF_50];   //用户名
  6.         char   m_szUserPass[MAX_BUFF_50];   //密码
  7.         uint32 m_u4LoginCount;              //登陆次数
  8. };
复制代码
watch每隔60秒更新一下缓冲中正在使用的数据。
  1. ACE_thread_t  threadId;
  2.         ACE_hthread_t threadHandle;
  3.         //初始化共享内存
  4.         App_UserValidManager::instance()->Init((uint32)MAX_LOGIN_VALID_COUNT, SHM_USERVALID_KEY, (uint32)sizeof(_UserValid));
  5.         //首先创建工作线程
  6.         ACE_Thread::spawn(
  7.                 (ACE_THR_FUNC)worker,        //线程执行函数
  8.                 NULL,                        //执行函数参数
  9.                 THR_JOINABLE | THR_NEW_LWP,
  10.                 &threadId,
  11.                 &threadHandle
  12.                 );
复制代码
worker的实际代码为:
  1. void* worker(void *arg)
  2. {
  3.         if(NULL != arg)
  4.         {
  5.                 OUR_DEBUG((LM_INFO, "[worker]have param.\n"));
  6.         }
  7.        
  8.         while(true)
  9.         {
  10.                 OUR_DEBUG((LM_INFO, "[Watch]Valid Begin.\n"));
  11.                 App_UserValidManager::instance()->Sync_DataReaource_To_Memory();
  12.                 OUR_DEBUG((LM_INFO, "[Watch]Valid End.\n"));
  13.                 App_UserValidManager::instance()->Display();
  14.                 ACE_Time_Value tvSleep(60, 0);
  15.                 ACE_OS::sleep(tvSleep);
  16.         }
  17.         return NULL;
  18. }
复制代码
我会开启一个socket,让逻辑进程可以通过这个接口获得它所需要的数据。具体代码太多了,就在这里贴一个大概吧。
  1. virtual int handle_input (ACE_HANDLE fd )
  2.         {
  3.                 if(fd == ACE_INVALID_HANDLE)
  4.                 {
  5.                         OUR_DEBUG((LM_ERROR, "[handle_input]fd is ACE_INVALID_HANDLE.\n"));
  6.                         return -1;
  7.                 }
  8.                
  9.                 ACE_Time_Value nowait(0, MAX_QUEUE_TIMEOUT);
  10.                 //处理接收逻辑
  11.                 //接收字节,先接收4字节包长度,然后接收用户名字符串
  12.                 char szPacketSize[4] = {'\0'};
  13.                 int nDataLen = this->peer().recv(szPacketSize, 4, MSG_NOSIGNAL, &nowait);
  14.                 if(nDataLen != 4 && nDataLen <= 0)
  15.                 {
  16.                         return -1;
  17.                 }
  18.                 int nPacketSize = 0;
  19.                 ACE_OS::memcpy(&nPacketSize, szPacketSize, 4);
  20.                 char* pBuff = new char[nPacketSize];
  21.                 ACE_OS::memset(pBuff, 0, nPacketSize);
  22.                 //因为是内网程序,目前不考虑分包和粘包,这里在大流量下可以优化的,先以完成功能为主。
  23.                 //组包规则,2字节用户名长度+用户名+4字节ConnectID
  24.                 nDataLen = this->peer().recv(pBuff, nPacketSize, MSG_NOSIGNAL, &nowait);
  25.                 if(nDataLen != nPacketSize && nDataLen <= 0)
  26.                 {
  27.                         SAFE_DELETE_ARRAY(pBuff);
  28.                         return -1;
  29.                 }
  30.                 //解析数据
  31.                 int nRecvPos      = 0;
  32.                 int nUserNameSize = 0;
  33.                 int nUserPassSize = 0;
  34.                 int nConnectID    = 0;
  35.                 ACE_OS::memcpy((char* )&nUserNameSize, (char* )&pBuff[nRecvPos], 2);
  36.                 nRecvPos += 2;
  37.                 char* pUserName = new char[nUserNameSize + 1];
  38.                 ACE_OS::memset(pUserName, 0, nUserNameSize + 1);
  39.                 ACE_OS::memcpy((char* )pUserName, (char* )&pBuff[nRecvPos], nUserNameSize);
  40.                 nRecvPos += nUserNameSize;
  41.                 ACE_OS::memcpy((char* )&nUserPassSize, (char* )&pBuff[nRecvPos], 2);
  42.                 nRecvPos += 2;
  43.                 char* pUserPass = new char[nUserPassSize + 1];
  44.                 ACE_OS::memset(pUserPass, 0, nUserPassSize + 1);
  45.                 ACE_OS::memcpy((char* )pUserPass, (char* )&pBuff[nRecvPos], nUserPassSize);
  46.                 nRecvPos += nUserPassSize;
  47.                 ACE_OS::memcpy((char* )&nConnectID, (char* )&pBuff[nRecvPos], 4);
  48.                 nRecvPos += 4;
  49.                 int nSendSize = 4 + 2 + nUserNameSize + 2 + nUserPassSize + 1 + 4 + 4;
  50.                 char* pSend = new char[nSendSize];
  51.                 int nSendPos = 0;
  52.                 //处理接收数据
  53.                 uint32 u4CacheIndex = 0;
  54.                 bool blState = App_UserValidManager::instance()->Load_From_DataResouce(pUserName, u4CacheIndex);
  55.                 if(blState == false)
  56.                 {
  57.                         //没有找到这个用户数据,组成返回包
  58.                         int nSendPacketSize = 2 + nUserNameSize + 2 + nUserPassSize + 1 + 4 + 4;
  59.                         ACE_OS::memcpy((char* )&pSend[nSendPos], (char* )&nSendPacketSize, 4);
  60.                         nSendPos += 4;
  61.                         ACE_OS::memcpy((char* )&pSend[nSendPos], (char* )&nUserNameSize, 2);
  62.                         nSendPos += 2;
  63.                         ACE_OS::memcpy((char* )&pSend[nSendPos], (char* )pUserName, nUserNameSize);
  64.                         nSendPos += nUserNameSize;
  65.                         ACE_OS::memcpy((char* )&pSend[nSendPos], (char* )&nUserPassSize, 2);
  66.                         nSendPos += 2;
  67.                         ACE_OS::memcpy((char* )&pSend[nSendPos], (char* )pUserPass, nUserPassSize);
  68.                         nSendPos += nUserPassSize;
  69.                         int nRet = 1;
  70.                         ACE_OS::memcpy((char* )&pSend[nSendPos], (char* )&nRet, 1);
  71.                         nSendPos += 1;
  72.                         ACE_OS::memcpy((char* )&pSend[nSendPos], (char* )&u4CacheIndex, 4);
  73.                         nSendPos += 4;
  74.                         ACE_OS::memcpy((char* )&pSend[nSendPos], (char* )&nConnectID, 4);
  75.                         nSendPos += 4;
  76.                 }
  77.                 else
  78.                 {
  79.                         //找到了这个用户数据,组成返回包
  80.                         int nSendPacketSize = 2 + nUserNameSize + 2 + nUserPassSize + 1 + 4 + 4;
  81.                         ACE_OS::memcpy((char* )&pSend[nSendPos], (char* )&nSendPacketSize, 4);
  82.                         nSendPos += 4;
  83.                         ACE_OS::memcpy((char* )&pSend[nSendPos], (char* )&nUserNameSize, 2);
  84.                         nSendPos += 2;
  85.                         ACE_OS::memcpy((char* )&pSend[nSendPos], (char* )pUserName, nUserNameSize);
  86.                         nSendPos += nUserNameSize;
  87.                         ACE_OS::memcpy((char* )&pSend[nSendPos], (char* )&nUserPassSize, 2);
  88.                         nSendPos += 2;
  89.                         ACE_OS::memcpy((char* )&pSend[nSendPos], (char* )pUserPass, nUserPassSize);
  90.                         nSendPos += nUserPassSize;
  91.                         int nRet = 0;
  92.                         ACE_OS::memcpy((char* )&pSend[nSendPos], (char* )&nRet, 1);
  93.                         nSendPos += 1;
  94.                         ACE_OS::memcpy((char* )&pSend[nSendPos], (char* )&u4CacheIndex, 4);
  95.                         nSendPos += 4;
  96.                         ACE_OS::memcpy((char* )&pSend[nSendPos], (char* )&nConnectID, 4);
  97.                         nSendPos += 4;
  98.                 }
  99.                 //发送返回数据
  100.                 this->peer().send(pSend, nSendSize, &nowait);
  101.                 SAFE_DELETE_ARRAY(pUserPass);
  102.                 SAFE_DELETE_ARRAY(pUserName);
  103.                 SAFE_DELETE_ARRAY(pSend);
  104.                 SAFE_DELETE_ARRAY(pBuff);
  105.                 return 0;
  106.         }
复制代码
PSS的插件作为业务逻辑进程,watch作为监控者,达成。
实际测试结果,在10个用户缓冲下,每秒到达10000个随机请求,其中10%是新的ID,这样的情况CPU在30%左右,数据IO稳定。watch每秒处理800个左右新的请求(剩下200个被缓冲LRU挡住,非常好)。实际效果Linux优于windows,达到了我的设计需求。

您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-4-30 06:49 , Processed in 0.015361 second(s), 6 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表