freeeyes 发表于 2011-1-24 17:08:39

Reactor和Proactor模式下带有自动重连机制的客户端实现

最近终于有点时间,趁着重写服务器重连部分,重新写了一个支持链路重连机制的客户端模块。先说Reactor的,Reactor自动重连比较简单,因为是同步的。
为了实现以上功能,首先要添加一个管理类(CClientReConnectManager),管理所有已经连接和还没有连接的对象。然后添加一个单元类(CReactorClientInfo),这个单元类负责管理指定链接对象的信息。

CClientReConnectManager包含了以下方法
bool Init(ACE_Reactor* pReactor);
        bool Connect(int nServerID, const char* pIP, int nPort, IClientMessage* pClientMessage);
        bool Close(int nServerID);                                                                  //关闭连接
        bool ConnectErrorClose(int nServerID);                                                      //由内部错误引起的失败,由ProConnectClient调用
        bool SendData(int nServerID, ACE_Message_Block* pmblk);                                     //发送数据
        bool SetHandler(int nServerID, CConnectClient* pConnectClient);                           //将指定的CProConnectClient*绑定给nServerID
        IClientMessage* GetClientMessage(int nServerID);                                          //获得ClientMessage对象
        bool StartConnectTask(int nIntervalTime = CONNECT_LIMIT_RETRY);                           //设置自动重连的定时器
        void CancelConnectTask();                                                                   //关闭重连定时器
        void Close();

        virtual int handle_timeout (const ACE_Time_Value ¤t_time, const void *act = 0);       //定时器执行

Init是初始化管理类,需要指定一个Reactor反应器,反应器可以自己创建一个,也可以用默认的。Connect()函数是创建一个新的链接,需要ServerID(这个你可以自己去定义一下,只要能区分出各个链接不同就可以了,不可重复),IP和端口,这是必备的,呵呵,就不多说了,关键是IClientMessage这个对象,这是我定义的一个消息处理类,你可以在外面继承这个类,这个类提供了两种方法
class IClientMessage
{
public:
        virtual bool RecvData(CClientParse* pClientParse) = 0;    //接收数据的函数
        virtual bool ConnectError(int nError)             = 0;    //当出错的时候,调用此接口返回错误信息
};

RecvData()是接受完成数据包后,会自动调用这个接口,由继承的类去实现内部的数据处理,同样,当数据链接出错的话,系统会回调ConnectError()方法并告诉继承的类是什么错误导致的失败。
每当用户调用发送接口的时候,我会先检查链接是否健康,如果健康,则正常发送,如果不健康或者已经断开,就会自动重连。为了保持数据链接的最大稳定性,我添加了一个定时器,会定时检测所有注册的链接是否正常,如果不正常的话会自动重连。
定时器自动重连的方法是
int CClientReConnectManager::handle_timeout(const ACE_Time_Value &tv, const void *arg)
{
        ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_ThreadWritrLock);
        mapReactorConnectInfo::iterator b = m_mapConnectInfo.begin();
        mapReactorConnectInfo::iterator e = m_mapConnectInfo.end();

        for(b; b!= e; b++)
        {
                CReactorClientInfo* pClientInfo = (CReactorClientInfo* )b->second;
                if(NULL == pClientInfo->GetConnectClient())
                {
                        //如果连接不存在,则重新建立连接
                        pClientInfo->Run();
                }
        }
        return 0;
}


好了,管理类差不多就是这样了。至于ConnectHander的实现,Proactor和Reactor是不同的,于是我实现了两个工程。代码分别在里面加了注释,其实并不复杂。倒是Proactor的客户端链接,这个应该注意一下,Connect成功了并不代表链接就已经建立,只是代表连接已经准备好建立,至于是否成功,需要在ProConnectHander的Open方法下获得,异步就是这点需要注意一下。
呵呵,上传一下测试过的代码,以下代码在VS2005下编译通过,测试通过,proceXP测试内存稳定。(我的开发机器上ACE的版本是5.7.4,如果用更高版本的ACE,比如6.0.0编译会报错,提示#include "ace/os.h"不存在,你只要把这句话替换成,#include "ace/OS_main.h"即可)
这些代码里有调用例子,有兴趣的朋友可以看看。


res100 发表于 2011-1-24 21:31:22

不错, 谢谢分享,同时期待里的ACE开源服务器 v0.72版本

过路蚂蚁 发表于 2011-2-19 10:31:42

不错,学习了

goldwave 发表于 2011-3-4 09:21:51

谢谢楼主,学习用了

laocai 发表于 2011-3-5 08:32:14

非常好,谢谢楼主。也刚好需要这样的东东:)
有个细节习惯顺便说一说
上面的循环for(b; b!= e; b++),最好是养成++b的习惯,因为在有些情况下"++b"会比"b++"的效率高,当然在这里是无关紧要了。
页: [1]
查看完整版本: Reactor和Proactor模式下带有自动重连机制的客户端实现