找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 5762|回复: 9

ACE Reactor框架使用实例-大量代码

[复制链接]
发表于 2008-1-17 18:48:52 | 显示全部楼层 |阅读模式
,上传的是已经投入使用的代码,所以有部分应用业务包含其中,我作了部分删除和注释.
本想全部删除,仅留框架,但我觉得与书上的教学例子无异了,所以保留了部分.有不清楚的大家可以交流.
本人水平有限,接触ACE时间不长,代码水平不高,如有问题或考虑不全之处还请大家指出
服务端:
功能:保存所有客户端信息和在线状态,统一分配端口.对掉线的客户端信息通知到进程控制模块
ServerService.h
  1. #ifndef _SERVERSERVICE_H_
  2. #define _SERVERSERVICE_H_
  3. #include <map>
  4. #include <string>
  5. #include <sstream>
  6. #include <fstream>
  7. #include "ace/TP_Reactor.h"
  8. #include "ace/SOCK_Dgram.h"
  9. #include "ace/Task.h"
  10. namespace YB
  11. {
  12. const static ACE_UINT32 iLocalHostPort = 8001;
  13. typedef unsigned  char BYTE;
  14. static ACE_Thread_Mutex slock;
  15. typedef struct STAPPID_
  16. {
  17. std::string  sIp;               // 客户端IP
  18. int    iPort;               // 客户端连接到服务端的端口
  19. int    ClientAcceptPort;            // 服务端分配的客户端监听端口
  20. BYTE   byAppid;              // 模块标识
  21. BYTE         byGroup;              // 组号
  22. int          iTime;               // 时计器,维护客户端在线状态
  23. bool   bOnline;              // 在线状态
  24. }STAppid;
  25. /*
  26. 服务端UDP数据收发
  27. */
  28. class CMain;
  29. class CServerService: public ACE_Event_Handler
  30. {
  31. public:
  32. // 构造、析构函数
  33. CServerService();
  34. ~CServerService();
  35. virtual ACE_HANDLE get_handle(void) const;         // 继承基类
  36. virtual int  handle_input(ACE_HANDLE fd = ACE_INVALID_HANDLE);    // 有网络数据到达
  37. virtual int  handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask); // 关闭网络监听
  38. bool Open(CMain* cm);
  39. void SendToAll(char* buf, int nSize);         // 消息群发
  40. int  DeleteAppid(BYTE byappid, std::string sip);       // 删除指定客户端信息
  41. bool ReadServerIP();              // 读配置IP
  42. private:
  43. void UpdateClientAllSatte(STAppid stAppid, ACE_INET_Addr taddr);   // 更新客户端状态
  44. void UpdateState(YB::BYTE byappid);          // 客户端在线
  45. void MsgData(YB::BYTE byappid);           // 消息报文
  46. void ChackProtocol(const char* buf, const int nSize);     // 解析报文
  47. void ApplyConn(const char *buf, const int nSize);      // 应答客户申请连接
  48. void AllotPort(unsigned short &uiPort);         // 检查分配客户端端口是否重复,并分配新的
  49. BYTE checkFrame(unsigned char* uc_buf, unsigned short uc_length);  // 帧校验
  50. void fixFrame(unsigned char* uc_buf, unsigned char& uc_Length,   // 组帧
  51.       unsigned char flag, unsigned char dest_addr, unsigned char src_addr);
  52.     void    CheckAppid(BYTE byappid,std::string sIp);
  53. public:
  54. std::map<BYTE, STAppid> mpInfo;            // 注册客户端信息表
  55. private:
  56. ACE_INET_Addr addr;
  57. ACE_SOCK_Dgram udp;
  58. std::string  sServerIP;
  59. unsigned short usiPort;             // 分配客户端端口值
  60. STAppid   stAppid;
  61. int    iPortCount;
  62. CMain*   cmn;
  63. };
  64. // 定时器类,监视在线客户端,
  65. class CTaskTimer : public ACE_Task_Base
  66. {
  67. public:
  68. // 构造、析构函数
  69. CTaskTimer(){timeid = 0;};
  70. virtual ~CTaskTimer(){};
  71. virtual int  handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask); // 关闭网络监听
  72. public:
  73. bool Open(CServerService *udp);
  74. int  handle_timeout(const ACE_Time_Value &current_time, const void *act = 0);// 定时器,清理掉线客户端
  75. private:
  76. CServerService *sudp;
  77. long   timeid;
  78. };
  79. // 主调类,负责启动定时器和网络监听
  80. class CMain : public ACE_Task_Base
  81. {
  82. public:
  83. // 构造、析构函数
  84. CMain(){};
  85. ~CMain(){};
  86. public:
  87. bool Open();
  88. int  Close();
  89. private:
  90. CServerService *serudp;
  91. CTaskTimer  *taskTimer;
  92. };
  93. }
  94. #endif  // end of _SERVERSERVICE_H_
复制代码
 楼主| 发表于 2008-1-17 18:49:16 | 显示全部楼层
  1. ServerService.cpp
  2. #include "./ServerService.h"
  3. using namespace YB;
  4. bool CMain::Open()
  5. {
  6. ACE_NEW_RETURN(serudp, CServerService, false);
  7. ACE_NEW_RETURN(taskTimer, CTaskTimer, false);
  8. serudp->reactor(ACE_Reactor::instance());
  9. if (!serudp->Open(this)) return false;
  10. taskTimer->reactor(ACE_Reactor::instance());
  11. if (!taskTimer->Open(serudp)) return false;
  12. ACE_OS::sleep(ACE_Time_Value(0, 10000));   // 等待10毫秒   
  13. ACE_Reactor::instance()->run_reactor_event_loop(); // 启动线程
  14. return true;
  15. }
  16. int CMain::Close()
  17. {
  18. taskTimer->handle_close(ACE_INVALID_HANDLE, 0);
  19. serudp->handle_close(ACE_INVALID_HANDLE, 0);
  20. if (ACE_Reactor::instance()->reactor_event_loop_done() != 1)
  21. {
  22.   ACE_Reactor::instance()->end_reactor_event_loop();
  23. }
  24. return 0;
  25. }  
  26. ///////////////////////////////////////////
  27. CServerService::CServerService()
  28. {
  29. usiPort = 20000;   // 初使分配客户端端口值
  30. iPortCount = 0;
  31. }
  32. CServerService::~CServerService()
  33. {
  34. }
  35. bool CServerService::ReadServerIP()
  36. {
  37. std::ifstream fle("ServerIp.txt", std::ios::in);
  38. if (!fle) return false;
  39. std::ostringstream seamServerIP;
  40. seamServerIP<<fle.rdbuf();
  41. sServerIP = seamServerIP.str();
  42. fle.close();
  43. return true;
  44. }
  45. bool CServerService::Open(CMain* cm)
  46. {
  47. if (!ReadServerIP()) return false;
  48. this->addr.set(iLocalHostPort, sServerIP.c_str());//, ACE_LOCALHOST
  49. this->udp.open(addr);
  50. this->reactor()->register_handler(this, ACE_Event_Handler::READ_MASK);
  51. cmn = cm;
  52. return true;
  53. }
  54. ACE_HANDLE CServerService::get_handle() const
  55. {
  56. return this->udp.get_handle();
  57. }
  58. int  CServerService::handle_input(ACE_HANDLE)
  59. {
  60. ACE_INET_Addr taddr;
  61. char   buf[255] = {0};
  62. int    isize = 0;
  63. isize = this->udp.recv(buf, 255, taddr);
  64. stAppid.iPort = taddr.get_port_number();
  65. stAppid.sIp = taddr.get_host_addr();
  66. if (isize > 0 && isize < 255) ChackProtocol(buf, isize);
  67. return 0;
  68. }
  69. int  CServerService::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
  70. {
  71. if (this->udp.get_handle() != ACE_INVALID_HANDLE)
  72. {
  73.   ACE_Reactor_Mask m = ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL;
  74.   this->reactor()->remove_handler(this, m);
  75.   this->udp.close();  
  76. }
  77. delete this;
  78. return 0;
  79. }
  80. unsigned char CServerService::checkFrame(unsigned char* uc_buf, unsigned short uc_length)            
  81. {
  82. //检查报文
  83. return 1;
  84. }
  85. void CServerService::fixFrame( unsigned char* uc_buf,
  86.         unsigned char& uc_Length,
  87.         unsigned char flag,
  88.         unsigned char dest_addr,
  89.         unsigned char src_addr)
  90. {
  91. //组装报文
  92. return ;
  93. }
  94. void CServerService::ChackProtocol(const char* buf, const int nSize)
  95. {
  96. YB::BYTE *p = (YB::BYTE*)buf;
  97. if (checkFrame(p, nSize)) return;
  98. switch (*(p + 11))
  99. {
  100.   case 0x00:  // 心跳
  101.   {
  102.    
  103.    UpdateState(*(p + 6));
  104.    if (*(p + 2) == 0x02) MsgData(*(p + 6));
  105.    break;
  106.    
  107.   }   
  108.   case 0x01:  // 我要处理的类型
  109.   {
  110.    switch (*(p + 15))
  111.    {
  112.     case 0x00:  // 正常退出,离线状态
  113.     {
  114.      DeleteAppid(*(p + 23), stAppid.sIp);
  115.      break;
  116.     }
  117.     case 0x02: // 申请连接
  118.     {
  119.      ApplyConn(buf, nSize);
  120.      break;
  121.     }
  122.     default:
  123.      break;
  124.    }
  125.    break;
  126.   }
  127.   case 0x02: // 退出
  128.   {
  129.    if (*(p + 15) == 0x04 && *(p + 6) == 0x01) cmn->Close();
  130.    break;
  131.   }
  132.   default:
  133.    break;  
  134. }
  135. }
  136. void CServerService::ApplyConn(const char *buf, const int nSize)
  137. {
  138. ACE_INET_Addr taddr;
  139. YB::BYTE isize = 0x0C;
  140. char  puf[255] = {0};
  141. char *p = (char*)buf;  
  142. AllotPort(usiPort);
  143. stAppid.ClientAcceptPort = usiPort;
  144. stAppid.byAppid = *(p + 6);
  145. stAppid.byGroup = *(p + 16);
  146. CheckAppid( stAppid.byAppid,stAppid.sIp.c_str() );
  147. taddr.set(usiPort, stAppid.sIp.c_str());
  148. ACE_UINT32 ip = taddr.get_ip_address();
  149. u_short  iprt = taddr.get_port_number();
  150. /*组帧
  151. strcpy(puf, "\x01\x01\x01\x0C\x03");
  152. puf[5] = stAppid.byGroup;
  153. memcpy(puf + 6, &ip, sizeof(ACE_UINT32));
  154. memcpy(puf + 6 + sizeof(ACE_UINT32), &iprt, sizeof(u_short));
  155. fixFrame((unsigned char*)puf, isize, 0x01, stAppid.byAppid, 0x04);
  156. taddr.set(stAppid.iPort, stAppid.sIp.c_str());
  157. */
  158. this->udp.send(puf, isize, taddr);
  159. /*
  160. // 向其他客户端更新信息
  161. isize = 0x0D;
  162. memset(puf, 0x00, 255);
  163. strcpy(puf, "\x01\x01\x01\x0D\x01");
  164. puf[5] = stAppid.byGroup;
  165. memcpy(puf + 6, &ip, sizeof(ACE_UINT32));
  166. memcpy(puf + 6 + sizeof(ACE_UINT32), &iprt, sizeof(u_short));
  167. memcpy(puf + 6 + sizeof(ACE_UINT32) + sizeof(u_short), &stAppid.byAppid, sizeof(YB::BYTE));
  168. fixFrame((unsigned char*)puf, isize, 0x01, 0x00, 0x04);
  169. */
  170. SendToAll(puf, isize);
  171. // 向新增加客户更新信息
  172. UpdateClientAllSatte(stAppid, taddr);
  173. // 增加新的客户端到链表
  174. stAppid.iTime = 1;
  175. stAppid.bOnline = true;
  176. slock.acquire();
  177. mpInfo.insert(std::make_pair(stAppid.byAppid, stAppid));
  178. slock.release();
  179. }
  180. void CServerService::CheckAppid(YB::BYTE byappid,std::string sIp)
  181. {
  182. std::map<YB::BYTE, STAppid>::iterator mpIter;
  183. for (mpIter = mpInfo.begin(); mpIter != mpInfo.end(); mpIter++)   
  184. {
  185.   if (byappid == mpIter->second.byAppid &&
  186.    sIp == mpIter->second.sIp )
  187.   {
  188.    DeleteAppid(byappid,sIp);
  189.    ACE_OS::sleep(1);
  190.   }   
  191. }
  192. }
  193. void CServerService::AllotPort(unsigned short &uiPort)
  194. {
  195. if (uiPort > 65500)
  196. {
  197.   uiPort = 20000;
  198.   iPortCount++;
  199. }
  200. if (iPortCount < 1)
  201. {
  202.   uiPort++; //增加分配的端口号
  203. }
  204. else
  205. {
  206.   std::map<YB::BYTE, STAppid>::iterator mpIter;
  207.   for (mpIter = mpInfo.begin(); mpIter != mpInfo.end(); mpIter++)   
  208.   {
  209.    if (uiPort == mpIter->second.ClientAcceptPort)
  210.    {
  211.     uiPort++;
  212.     mpIter = mpInfo.begin();
  213.    }   
  214.   }
  215. }
  216. }
  217. int CServerService::DeleteAppid(YB::BYTE byappid, std::string sip)
  218. {
  219. std::map<YB::BYTE, STAppid>::iterator mpIter;
  220. YB::BYTE isize = 0x0D;
  221. bool  b_isfind = false;
  222. char  puf[255] = {0};
  223. slock.acquire();
  224. for (mpIter = mpInfo.find(byappid); mpIter != mpInfo.end(); mpIter++)   
  225. {
  226.   if (mpIter->first != byappid) break;
  227.   if (mpIter->second.sIp != sip) continue;
  228.   
  229.   ACE_INET_Addr taddr(mpIter->second.ClientAcceptPort, sip.c_str());
  230.   ACE_UINT32  ip = taddr.get_ip_address();
  231.   u_short   iprt = taddr.get_port_number();
  232.   /*组帧
  233.   memset(puf, 0, 255);
  234.   isize = 0x0D;
  235.   strcpy(puf, "\x01\x01\x01\x0D");
  236.   puf[5] = mpIter->second.byGroup;
  237.   memcpy(puf + 6, &ip, sizeof(ACE_UINT32));
  238.   memcpy(puf + 6 + sizeof(ACE_UINT32), &iprt, sizeof(u_short));
  239.   memcpy(puf + 6 + sizeof(ACE_UINT32) + sizeof(u_short), &mpIter->second.byAppid, sizeof(YB::BYTE));
  240.   fixFrame((unsigned char*)puf, isize, 0x01, 0x00, 0x04);
  241. */
  242.   mpInfo.erase(mpIter);
  243.   b_isfind = true;
  244.   break;
  245. }
  246. slock.release();
  247. // 广播到各客户端
  248. if (b_isfind) SendToAll(puf, isize);
  249. return 0;
  250. }
  251. void CServerService::SendToAll(char* buf, int nSize)
  252. {
  253. std::map<YB::BYTE, STAppid>::iterator mpIter;
  254. ACE_INET_Addr taddr;
  255. for (mpIter = mpInfo.begin(); mpIter != mpInfo.end(); mpIter++)   
  256. {
  257.   taddr.set(mpIter->second.iPort, mpIter->second.sIp.c_str());
  258.   int bychar = (unsigned char)buf[10] - mpIter->second.byAppid;
  259.   buf[10] = mpIter->second.byAppid;
  260.   buf[nSize - 2] -= bychar;
  261.   this->udp.send(buf, nSize, taddr);
  262. }
  263. }
  264. void CServerService::UpdateState(YB::BYTE byappid)  // 客户端在线
  265. {
  266. std::map<YB::BYTE, STAppid>:: iterator mpIter;
  267. for (mpIter = mpInfo.find(byappid); mpIter != mpInfo.end(); mpIter++)
  268. {
  269.   if (mpIter->first != byappid) break;
  270.   if (stAppid.sIp != mpIter->second.sIp) continue;
  271.   mpIter->second.iTime = 1;
  272.   break;
  273. }
  274. }
  275. void CServerService::MsgData(YB::BYTE byappid)
  276. {
  277. std::map<YB::BYTE, STAppid>::iterator mpIter;
  278. for (mpIter = mpInfo.find(byappid); mpIter != mpInfo.end(); mpIter++)  
  279. {
  280.   if (mpIter->first != byappid) break;
  281.   if (mpIter->second.sIp != stAppid.sIp) continue;
  282.   ACE_INET_Addr taddr(stAppid.iPort, stAppid.sIp.c_str());
  283.   char   puf[255] = {0};
  284.   unsigned char iszie = 0x04;
  285.   strcpy(puf, "\xf0\x01\x01\x04");
  286.   fixFrame((unsigned char*)puf, iszie, 0x01, byappid, 0x04);
  287.   this->udp.send(puf, iszie, taddr);
  288.   break;
  289. }
  290. }
  291. void CServerService::UpdateClientAllSatte(STAppid stAppid, ACE_INET_Addr taddr)
  292. {
  293. std::map<YB::BYTE, STAppid>::iterator mpIter;
  294. ACE_INET_Addr taddr1;
  295. unsigned char isize = 0x0D;
  296. ACE_UINT32  ip = 0;
  297. u_short   iprt = 0;
  298. char   puf[255] = {0};
  299.   
  300. ACE_Time_Value t(0, 100000);
  301. ACE_OS::sleep(t);
  302. for (mpIter = mpInfo.begin(); mpIter != mpInfo.end(); mpIter++)  
  303. {
  304.   taddr1.set(mpIter->second.ClientAcceptPort, mpIter->second.sIp.c_str());  
  305.   ip = taddr1.get_ip_address();
  306.   iprt = taddr1.get_port_number();
  307.   /*
  308.   memset(puf, 0, 255);
  309.   isize = 0x0D;
  310.   strcpy(puf, "\x01\x01\x01\x0D\x01");
  311.   puf[5] = mpIter->second.byGroup;
  312.   memcpy(puf + 6, &ip, sizeof(ACE_UINT32));
  313.   memcpy(puf + 6 + sizeof(ACE_UINT32), &iprt, sizeof(u_short));
  314.   memcpy(puf + 6 + sizeof(ACE_UINT32) + sizeof(u_short), &mpIter->second.byAppid, sizeof(YB::BYTE));
  315.   fixFrame((unsigned char*)puf, isize, 0x01, stAppid.byAppid, 0x04);
  316.   */
  317.   this->udp.send(puf, isize, taddr);
  318.   t.set(0, 10000);
  319.   ACE_OS::sleep(t);
  320. }
  321. }
  322. //////////////////////////////////////////////
  323. ///*
  324. bool  CTaskTimer::Open(CServerService *udp)
  325. {
  326. sudp = udp;
  327. ACE_Time_Value idlay(1);
  328. ACE_Time_Value ival(60);
  329. timeid = this->reactor()->schedule_timer(this, 0, idlay, ival);
  330. return true;
  331. }
  332. int CTaskTimer::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
  333. {
  334. if (timeid) this->reactor()->cancel_timer(this);
  335. delete this;
  336. return 0;
  337. }
  338. int CTaskTimer::handle_timeout(const ACE_Time_Value &current_time, const void *act)
  339. {
  340. std::map<YB::BYTE, STAppid>::iterator mpIter;
  341. for (mpIter = sudp->mpInfo.begin(); mpIter != sudp->mpInfo.end(); mpIter++)   
  342. {
  343.   mpIter->second.bOnline = mpIter->second.iTime ? true : false;
  344.   mpIter->second.iTime = 0;
  345. }
  346. // 删除掉线客户端
  347. for (mpIter = sudp->mpInfo.begin(); mpIter != sudp->mpInfo.end(); mpIter++)   
  348. {
  349.   if (!mpIter->second.bOnline)
  350.   {
  351.    sudp->DeleteAppid(mpIter->second.byAppid, mpIter->second.sIp);
  352.    mpIter = sudp->mpInfo.begin();
  353.    if (mpIter == sudp->mpInfo.end()) break;
  354.   }
  355. }
  356. return 0;
  357. }
复制代码
 楼主| 发表于 2008-1-17 18:51:04 | 显示全部楼层
  1. main.cpp
  2. #include "./ServerService.h"
  3. int ACE_TMAIN(int argc, char* argv[])
  4. {
  5. YB::CMain cmain;
  6. if (!cmain.Open()) cmain.Close();
  7. return 0;
  8. }
复制代码
客户端:DLL
功能:根据模块号与其他客户进行连接.数据转发
DLL.cpp
  1. #include "./NetWork.h"
  2. #include "./os_fun.h"
  3. class _mydllexport CBaseInNetwork
  4. {
  5. public:
  6. CBaseInNetwork();
  7. virtual ~CBaseInNetwork();
  8. virtual bool InitNetwork(unsigned  char byAppid, int igroup ,int itype);
  9. virtual void CloseNetwork();
  10. virtual int SendData(const unsigned char* buf, const int nSize, unsigned  char byAppid );
  11. virtual int GetDataLength();
  12. virtual int GetData(unsigned char* buf);
  13. private:
  14. CNetWork *pnet;
  15. };
  16. CBaseInNetwork::CBaseInNetwork()
  17. {
  18. }
  19. CBaseInNetwork::~CBaseInNetwork()
  20. {
  21. }
  22. bool CBaseInNetwork::InitNetwork( unsigned  char byAppid,  int igroup ,int itype)
  23. {
  24. ACE::init();
  25. bool bRetVal = false;
  26. pnet =  new CNetWork();
  27. bRetVal = pnet->Open( byAppid, igroup , itype );
  28. return bRetVal;
  29. }
  30. int CBaseInNetwork::SendData(const unsigned char* buf, const int nSize, unsigned  char byAppid )
  31. {
  32. return pnet->SendData( buf, nSize, byAppid );
  33. }
  34. int CBaseInNetwork::GetDataLength()
  35. {
  36. return pnet->GetDataLength();
  37. }
  38. int CBaseInNetwork::GetData(unsigned char* buf)
  39. {
  40. return pnet->GetData( buf );
  41. }
  42. void CBaseInNetwork::CloseNetwork( )
  43. {
  44. pnet->Close();
  45. delete pnet;
  46. pnet = NULL;
  47. ACE::fini();
  48. }
复制代码
os_fun.h
  1. #ifndef _OS_FUN_H_
  2. #define _OS_FUN_H_
  3. #ifdef WIN32
  4. #include "./win32_fun.hpp"
  5. #endif
  6. #ifdef linux
  7. #include "./linux_fun.hpp"
  8. #endif
  9. #endif // end of _OS_FUN_H_
复制代码
win32_fun.hpp
  1. #ifndef __WIN32_FUN_H
  2. #define __WIN32_FUN_H
  3. #include <windows.h>
  4. #define _mydllexport extern "C" _declspec(dllexport)
  5. BOOL WINAPI DllMain(HANDLE hModule,
  6.      DWORD  ul_reason_for_call,
  7.      LPVOID lpReserved
  8.      )
  9. {
  10. switch (ul_reason_for_call)
  11. {
  12. case DLL_PROCESS_ATTACH:
  13. case DLL_THREAD_ATTACH:
  14. case DLL_THREAD_DETACH:
  15. case DLL_PROCESS_DETACH:
  16.   break;
  17. }
  18.     return TRUE;
  19. }
  20. #endif // end of __WIN32_FUN_H
复制代码
linux_fun.hpp
  1. #ifndef __LINUX_FUN_H
  2. #define __LINUX_FUN_H
  3. #define _mydllexport  
  4. #endif // end of __LINUX_FUN_H
复制代码
BaseNetWork.h
  1. #ifndef _BASENETWORK_H_
  2. #define _BASENETWORK_H_
  3. namespace YB
  4. {
  5. class CBaseNetWork
  6. {
  7. protected:
  8. CBaseNetWork(void){};
  9. public:
  10. virtual ~CBaseNetWork(void){};
  11. virtual bool Open(unsigned  char byAppid, int igroup = 0, int itype = 0) = 0; //strAppid自己
  12. virtual int SendData(const unsigned char* buf, const int nSize, unsigned  char byAppid = 0xff) = 0;
  13. virtual int GetData(unsigned char* buf) = 0;
  14. virtual int GetDataLength() = 0;
  15. virtual void Close() = 0;
  16. };
  17. }
  18. #endif // end of _BASENETWORK_H_
复制代码
 楼主| 发表于 2008-1-17 18:52:03 | 显示全部楼层
  1. #ifndef _NETWORK_H_
  2. #define _NETWORK_H_
  3. #include <map>
  4. #include <list>
  5. #include <string>
  6. #include <fstream>
  7. #include <sstream>
  8. #include "ace/TP_Reactor.h"
  9. #include "ace/SOCK_Dgram.h"
  10. #include "ace/SOCK_Acceptor.h"
  11. #include "ace/SOCK_Stream.h"
  12. #include "ace/SOCK_Connector.h"
  13. #include "ace/Task.h"
  14. const static ACE_UINT32 iServerPort = 8001;
  15. typedef unsigned  char Byte;
  16. static ACE_Thread_Mutex mlocka;
  17. static ACE_Thread_Mutex mlock_mp;
  18. const static Byte ServerAppid = 0x04;
  19. class CNetWork;
  20. /*
  21. 与服务端建立连接,收发信息
  22. */
  23. class CServerUdpcs : public ACE_Event_Handler
  24. {
  25. public:
  26. // 构造、析构函数
  27. CServerUdpcs(){};
  28. ~CServerUdpcs(){};
  29. virtual ACE_HANDLE get_handle()const;
  30. virtual int  handle_input(ACE_HANDLE fd = ACE_INVALID_HANDLE);    // 网络读事件
  31. virtual int  handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask); // 关闭网络事件
  32. public:
  33. int  Open_Event();          // 注册事件
  34. bool Open(Byte byappid, Byte bygroup);
  35. int  SendData(const char* buf, const int nSize);
  36. int  GetData(char* buf, const int nSize);
  37. void SetParentHander(CNetWork *p);      // 设置主类指针,引用数据连表
  38. public:
  39. int  iClientStateSign;         // 客户状态变化标志;1有变化,0变化
  40. private:
  41. void ChackProtocol(const char* buf, const int nSize); // 解析报文
  42. void UpdateMapInfo(const char* buf, const int nSize); // 更新客户端状态
  43. void ReadDataToList(const char* buf, const int nSize); // 保存数据到连表
  44. void SaveMapinfo(const char* buf);      // 保存服务器发送的其他客户端信息
  45. void SaveLocalhost(const char* buf);      // 保存本机IP和服务器分配的端口
  46. int  GetSourcePort(const char* buf);      // 得到端口
  47. std::string GetSourceIp(const char* buf);     // 得到IP地址
  48. private:
  49. ACE_INET_Addr praddr;
  50. ACE_SOCK_Dgram udp; //UDP协议流
  51. CNetWork  *net;
  52. };
  53. /*
  54. 客户端建立监听服务
  55. */
  56. class CClientAcceptor : public ACE_Event_Handler
  57. {
  58. public:
  59. // 构造、析构函数
  60. CClientAcceptor(){};
  61. ~CClientAcceptor(){};
  62. virtual ACE_HANDLE get_handle()const;
  63. virtual int  handle_input(ACE_HANDLE fd = ACE_INVALID_HANDLE);    // 接受客户端连接
  64. virtual int  handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask); // 关闭网络事件
  65. public:
  66. int  open(void* avg = 0);
  67. void SetParentHander(CNetWork *p);
  68. private:
  69. ACE_SOCK_Acceptor acp;
  70. CNetWork*   net;
  71. };
  72. class CClientService;
  73. // 保存在线客户端连接
  74. typedef struct STAPPIDCS
  75. {
  76. std::string  sIp;
  77. int    iPort;
  78. Byte   byAppid;
  79. Byte         byGroup; // 组号
  80. CClientService *pcs; // 客户端连接
  81. }STAppidcs;
  82. // 保存其他客户发送到本站的数据
  83. typedef struct STLISTDATA
  84. {
  85. std::string  sAppid;
  86. int    iLength;
  87. Byte   *byData;
  88. }STListData;
  89. /*
  90. 点对点数据收发
  91. */
  92. class CClientService : public ACE_Event_Handler
  93. {
  94. public:
  95. // 构造、析构函数
  96. CClientService(void){};
  97. ~CClientService(){};
  98. // 继承基类
  99. virtual ACE_HANDLE get_handle()const;
  100. virtual int  handle_input(ACE_HANDLE fd = ACE_INVALID_HANDLE);   // 接受数据,保存到连表
  101. virtual int  handle_close(ACE_HANDLE handle, ACE_Reactor_Mask mask); // 退出连接,删除资源
  102. public:
  103. int  Open();
  104. void SetParentHander(CNetWork *p);
  105. int  connect(ACE_INET_Addr addr);         // 连接到其他客户端
  106. int  SendData(const char* buf, const int nSize);      // 发送数据到其他客户端
  107. ACE_SOCK_Stream &peer(){return sockeam;}
  108. private:
  109. void AddAppid(STAppidcs STAppidcs);         // 保存客户端连接
  110. void DeleteAppid(CClientService *cs);        // 删除客户端连接
  111. void ReadDataToList(const char* buf, const int nSize);    // 保存数据到连表
  112. void ChackProtocol(const char* buf, const int nSize);    // 解析报文
  113. private:
  114. ACE_SOCK_Connector con;           
  115. ACE_SOCK_Stream  sockeam;
  116. STAppidcs   stAppidcs;           // 保存最近一个客户端连接信息
  117. CNetWork   *net;
  118. };
  119. class CTaskTimer;
  120. /*
  121. 用户服务类,数据保存
  122. */
  123. class CNetWork:public ACE_Task_Base
  124. {
  125. public:
  126. // 构造、析构函数
  127. CNetWork(void);
  128. virtual ~CNetWork(void);
  129. public:
  130. bool Open(Byte byappid, int igroup = 0, int itype = 0);    // 服务器地址;strAppid自己
  131. int  SendData(const Byte* buf, const int nSize, Byte byappid = 0xff);
  132. int  GetDataLength();            //返回数据长度
  133. int  GetData(Byte* buf);            //返回数据
  134. void Close();
  135. bool ReadServerIP();             //取系统IP
  136. int  svc(void);              // 线程回调函数
  137. public:
  138. std::map<Byte, STAppidcs> mpInfo;          // 客户端信息连表
  139.   std::list<STListData>  lstData;         // 数据连表
  140. int  iprtype;              // 是否主控模块标识
  141. Byte byAppid;              // 本机APPID
  142. int  iport;               // 服务端分配PORT,
  143. std::string sip;              // 本机IP
  144. Byte byGroup;              // 组号
  145. STAppidcs  stAppidcs;            // 保存最近一个服务端返回其他客户信息CServerUdpcs.SaveMapinfo使用
  146. CServerUdpcs *pServerUdp;
  147. CClientAcceptor *pCAcptor;
  148. CClientService  *pCService;
  149. CTaskTimer  *ptimer;
  150. std::string  sServerIP;
  151. std::string  sLocalIP;
  152. bool   b_run;
  153. private:
  154. int GroupSend(const char* buf, const int nSize);      // 组群发
  155. int SingleSend(const char* buf, const int nSize, Byte byappid);   // 单发
  156. };
  157. // 定时器类
  158. class CTaskTimer : public ACE_Task_Base
  159. {
  160. public:
  161. // 构造、析构函数
  162. CTaskTimer(){timeid = 0;};
  163. virtual ~CTaskTimer(){};
  164. virtual int  handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask); // 关闭网络监听
  165. public:
  166. bool Open(CNetWork *p);
  167. int  handle_timeout(const ACE_Time_Value &current_time, const void *act = 0);// 定时器,发送心跳报文
  168. private:
  169. CNetWork  *pnet;
  170. long   timeid;
  171. };
  172. #endif // end of _NETWORK_H_
复制代码
NetWork.cpp
  1. #include "./NetWork.h"
  2. unsigned char checkFrame(unsigned char* uc_buf, unsigned short uc_length)            
  3. {
  4. return 0;
  5. }
  6. void fixFrame(unsigned char* uc_buf,
  7.     unsigned char& uc_Length,
  8.     unsigned char flag,
  9.     unsigned char dest_addr,
  10.     unsigned char src_addr)
  11. {
  12. ;
  13. }
  14. ///////////////////////////////////////////
  15. ///*
  16. CNetWork::CNetWork()
  17. {
  18. }
  19. CNetWork::~CNetWork()
  20. {
  21. }
  22. bool CNetWork::Open(Byte byappid, int igroup, int itype)
  23. {
  24. b_run = false;
  25. iprtype = itype;
  26. byAppid = byappid;
  27. byGroup = (Byte)igroup;
  28. ACE_NEW_RETURN(pServerUdp, CServerUdpcs, false);
  29. ACE_NEW_RETURN(pCAcptor, CClientAcceptor, false);
  30. ACE_NEW_RETURN(ptimer, CTaskTimer, false);
  31. if (!ReadServerIP()) return false;
  32. // 向服务端申请连接,登陆到服务端
  33. pServerUdp->SetParentHander(this);
  34. if (!pServerUdp->Open(byAppid, byGroup)) return false;
  35. // 自监听
  36. pCAcptor->SetParentHander(this);
  37. // 开启线程
  38. activate();
  39. b_run = true;
  40. return true;
  41. }
  42. int CNetWork::svc(void)
  43. {
  44. ACE_Reactor rt;
  45. this->reactor(&rt);
  46. this->pServerUdp->reactor(&rt);
  47. this->pCAcptor->reactor(&rt);
  48. this->ptimer->reactor(&rt);
  49. this->pServerUdp->Open_Event();
  50. this->pCAcptor->open();
  51. this->ptimer->Open(this);
  52. rt.run_reactor_event_loop();
  53. ACE_OS::sleep(ACE_Time_Value(1));
  54. return 0;
  55. }
  56. int CNetWork::SendData(const Byte* buf, const int nSize, Byte byappid)
  57. {
  58. int isize = 0;
  59. if (byappid == 0xff)      // 组发
  60. {
  61.   isize = GroupSend((char*)buf, nSize);
  62. }
  63. else if (byappid == ServerAppid)   // 心跳
  64. {
  65.   isize = pServerUdp->SendData((char*)buf, nSize);
  66. }
  67. else          // 单发点到点
  68. {
  69.   isize = SingleSend((char*)buf, nSize, byappid);
  70. }
  71. return isize;
  72. }
  73. int CNetWork::GetDataLength()
  74. {
  75. if (iprtype && pServerUdp->iClientStateSign) return (mpInfo.size() + 17);
  76. if (lstData.empty()) return 0;
  77. std::list<STListData>::iterator lstIter;
  78. lstIter = lstData.begin();
  79. return lstIter->iLength;
  80. }
  81. int CNetWork::GetData(Byte* buf)
  82. {
  83. if (iprtype && pServerUdp->iClientStateSign)
  84. {
  85.   std::map<Byte, STAppidcs>::iterator mpIter;
  86.   unsigned char isize = 0;
  87.   
  88.   pServerUdp->iClientStateSign = 0;
  89.   strcpy((char*)buf, "\x03\x01\x01");
  90.   buf[3] = mpInfo.size() + 4;
  91.   isize = 4;
  92.   for (mpIter = mpInfo.begin(); mpIter != mpInfo.end(); mpIter++)
  93.   {
  94.    if (mpIter->second.sIp == sip) buf[isize++] = mpIter->second.byAppid;
  95.   }
  96.   fixFrame(buf, isize, 0x01, byAppid, ServerAppid);
  97.   return isize;
  98. }
  99. if(lstData.empty()) return 0;
  100. std::list<STListData>::iterator lstIter;
  101. int ilen = 0;
  102. // 返回用户数据
  103. mlocka.acquire();
  104. lstIter = lstData.begin();
  105. ilen = lstIter->iLength;
  106. memcpy(buf, lstIter->byData, ilen);
  107. delete []lstIter->byData;
  108. lstData.erase(lstIter);
  109. mlocka.release();
  110. return ilen;
  111. }
  112. void CNetWork::Close()
  113. {
  114. std::map<Byte, STAppidcs>::iterator mpIter;
  115. Byte isize = 0x0D;
  116. char  buf[255] = {0};
  117. ACE_INET_Addr taddr(iport, sip.c_str());
  118. ACE_UINT32  ip = taddr.get_ip_address();
  119. u_short   iprt = taddr.get_port_number();
  120. /*
  121. strcpy(buf, "\x01\x01\x01\x0D");
  122. buf[5] = byGroup;
  123. memcpy(buf + 6, &ip, sizeof(ACE_UINT32));
  124. memcpy(buf + 6 + sizeof(ACE_UINT32), &iprt, sizeof(u_short));
  125. memcpy(buf + 6 + sizeof(ACE_UINT32) + sizeof(u_short), &byAppid, sizeof(Byte));
  126. fixFrame((unsigned char*)buf, isize, 0x01, ServerAppid, byAppid);
  127. */
  128. isize = pServerUdp->SendData(buf, isize);   
  129. // 关闭监听事件
  130. ptimer->handle_close(ACE_INVALID_HANDLE, 0);
  131. pCAcptor->handle_close(ACE_INVALID_HANDLE, 0);
  132. pServerUdp->handle_close(ACE_INVALID_HANDLE, 0);
  133. // 关闭网络事件
  134. for (mpIter = mpInfo.begin(); mpIter != mpInfo.end(); mpIter++)
  135. {
  136.   if (mpIter->second.pcs != NULL)
  137.   {   
  138.    mpIter->second.pcs->handle_close(ACE_INVALID_HANDLE, 0);
  139.    mpIter->second.pcs = NULL;
  140.   }
  141. }
  142. // 清除连表
  143. mlock_mp.acquire();
  144. mpInfo.clear();
  145. mlock_mp.release();
  146. if (b_run && this->reactor()->reactor_event_loop_done() != 1) this->reactor()->end_reactor_event_loop();
  147. }
  148. int CNetWork::GroupSend(const char* buf, const int nSize)
  149. {
  150. std::map<Byte, STAppidcs>::iterator mpIter;
  151. int isize = 0;
  152. for (mpIter = mpInfo.begin(); mpIter!= mpInfo.end(); mpIter++)
  153. {
  154.   if (mpIter->second.byGroup == byGroup)
  155.   {
  156.    isize = SingleSend(buf, nSize, mpIter->second.byAppid);
  157.   }
  158. }
  159. return isize;
  160. }
  161. int CNetWork::SingleSend(const char* buf, const int nSize, Byte byappid)
  162. {
  163. std::map<Byte, STAppidcs>::iterator mpIter;
  164. Byte isize = 0;
  165. for (mpIter = mpInfo.find(byappid); mpIter != mpInfo.end(); mpIter++)
  166. {
  167.   if (mpIter->first != byappid) break;   
  168.   if (mpIter->second.pcs != NULL)    // 已有连接
  169.   {
  170.    CClientService* p = mpIter->second.pcs;
  171.    isize = p->SendData(buf, nSize);
  172.   }
  173.   else          // 无连接,新建连接到对端
  174.   {
  175.    CClientService* cs;
  176.    ACE_INET_Addr addr(mpIter->second.iPort, mpIter->second.sIp.c_str());
  177.    ACE_NEW_RETURN(cs, CClientService, 0);
  178.    cs->SetParentHander(this);
  179.    cs->reactor(this->reactor());
  180.    if (cs->connect(addr) != -1)
  181.    {
  182.     char  puf[255] = {0};
  183.    
  184.     cs->Open();
  185.     mpIter->second.pcs = cs;   // 将连接保存到连表
  186.          
  187.     strcpy(puf, "\x01\x01\x01\x06\x02");
  188.     puf[5] = byGroup;
  189.     isize = 0x06;
  190.     fixFrame((unsigned char*)puf, isize, 0x01, byappid, byAppid);
  191.     isize = mpIter->second.pcs->SendData(puf, isize);
  192.    
  193.     // 等待
  194.     ACE_Time_Value t(1);
  195.     ACE_OS::sleep(t);     
  196.     isize = mpIter->second.pcs->SendData(buf, nSize);
  197.    }
  198.    else
  199.    {
  200.     delete cs;
  201.    }
  202.   }   
  203. }
  204. return isize;
  205. }
  206. bool CNetWork::ReadServerIP()
  207. {
  208. char sip1[20] = {0};
  209. char sip2[20] = {0};
  210. FILE* fp = NULL;
  211. if ((fp = fopen("./ServerIp_c.txt", "r")) == NULL) return false;
  212. fscanf(fp, "%s %s", sip1, sip2);
  213. fclose(fp);
  214. sServerIP = sip1;
  215. sLocalIP = sip2;
  216. return true;
  217. }
  218. /////////////////////////////////////////////////////////////////
  219. bool CServerUdpcs::Open(Byte byappid, Byte bygroup)
  220. {
  221. ACE_INET_Addr taddr;
  222. char   puf[255] = {0};
  223. Byte  isize = 0x06;
  224. ACE_Time_Value t(3);
  225. ACE_INET_Addr taddr_local(net->sLocalIP.c_str());
  226. udp.open(taddr_local);
  227. taddr.set(iServerPort, net->sServerIP.c_str());
  228. /*
  229. strcpy(puf, "\x01\x01\x01\x06\x02");
  230. puf[5] = bygroup;
  231. fixFrame((unsigned char*)puf, isize, 0x01, ServerAppid, byappid);
  232. */
  233. this->udp.send(puf, isize, taddr);
  234. // 必须阻塞等待服务端返回分配的端口,否则不能建立监听
  235. memset(puf, 0x00, 255);
  236. ACE_OS::sleep(ACE_Time_Value(0, 10000));
  237. isize = this->udp.recv(puf, 255, this->praddr, 0, &t);
  238. if (isize > 0 && isize < 255)
  239. {
  240.   ChackProtocol(puf, isize);
  241.   if (net->iport < 20000) //分配的端口都是>20000的,
  242.   {
  243.    udp.close();
  244.    return false;
  245.   }
  246.   return true;
  247. }
  248. udp.close();
  249. return false;
  250. }
  251. int CServerUdpcs::Open_Event()
  252. {
  253. return (this->reactor()->register_handler(this, ACE_Event_Handler::READ_MASK));
  254. }
  255. ACE_HANDLE CServerUdpcs::get_handle()const
  256. {
  257. return this->udp.get_handle();
  258. }
  259. int CServerUdpcs::handle_input(ACE_HANDLE fd)
  260. {
  261. char buf[255] = {0};
  262. int isize = this->udp.recv(buf, 255, this->praddr);
  263. if (isize > 0 && isize < 255) ChackProtocol(buf, isize);
  264. return 0;
  265. }
  266. int CServerUdpcs::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
  267. {
  268. if (this->udp.get_handle() != ACE_INVALID_HANDLE)
  269. {
  270.   ACE_Reactor_Mask m = ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL;
  271.   this->reactor()->remove_handler(this, m);
  272.   this->udp.close();
  273. }
  274. delete this;
  275. return 0;
  276. }
  277. void CServerUdpcs::ChackProtocol(const char* buf, const int nSize)
  278. {
  279. Byte *p = (Byte*)buf;
  280. if (checkFrame(p, nSize)) return;
  281. switch (*(p + 11))
  282. {
  283.   case 0xF0:     // 消息应答
  284.   {
  285.    if (*(p + 6) == ServerAppid) ReadDataToList(buf, nSize);
  286.    break;
  287.   }
  288.   case 0x01:     // my
  289.   {
  290.    switch (*(p + 15))
  291.    {
  292.     case 0x03:  // 返回本机IP和端口号
  293.      SaveLocalhost(buf);
  294.      break;
  295.     case 0x01:  // 保存在线客户端信息(新增客户端)
  296.      SaveMapinfo(buf);
  297.      break;
  298.     case 0x00:  // 更新在线客户端信息(客户端掉线)
  299.      UpdateMapInfo(buf, nSize);
  300.      break;
  301.     default:
  302.      break;
  303.    }
  304.    break;
  305.   }
  306.   default:
  307.    break;
  308. }
  309. }
  310. void CServerUdpcs::SetParentHander(CNetWork *p)
  311. {
  312. net = p;
  313. }
  314. int CServerUdpcs::SendData(const char* buf, const int nSize)
  315. {
  316. return this->udp.send(buf, nSize, this->praddr);
  317. }
  318. int CServerUdpcs::GetData(char* buf, const int nSize)
  319. {
  320. return this->udp.recv(buf, nSize, this->praddr);
  321. }
  322. void CServerUdpcs::SaveLocalhost(const char* buf)
  323. {
  324. net->sip = GetSourceIp(buf);  
  325. net->iport = GetSourcePort(buf);
  326. }
  327. void CServerUdpcs::UpdateMapInfo(const char* buf, const int nSize)
  328. {
  329. std::map<Byte, STAppidcs>::iterator mpIter;
  330. Byte byappid = (Byte)buf[23];
  331. std::string sip = GetSourceIp(buf);
  332. mlock_mp.acquire();
  333. for (mpIter = net->mpInfo.find(byappid); mpIter != net->mpInfo.end(); mpIter++)   
  334. {
  335.   if (mpIter->first != byappid) break;
  336.   if (mpIter->second.sIp != sip) continue;
  337.   if (mpIter->second.pcs != NULL)
  338.   {
  339.    mpIter->second.pcs->handle_close(ACE_INVALID_HANDLE, 0);
  340.    mpIter->second.pcs = NULL;
  341.   }
  342.   net->mpInfo.erase(mpIter);
  343.   break;
  344. }
  345. mlock_mp.release();
  346. iClientStateSign = 1;
  347. }
  348. void CServerUdpcs::ReadDataToList(const char* buf, const int nSize)
  349. {
  350. STListData stlData;
  351. stlData.iLength = nSize;
  352. stlData.byData = new Byte[nSize];
  353. memcpy(stlData.byData, buf, nSize);
  354. //将数据保存到连表
  355. mlocka.acquire();
  356. net->lstData.push_front(stlData);
  357. mlocka.release();
  358. }
  359. void CServerUdpcs::SaveMapinfo(const char* buf)
  360. {
  361. std::map<Byte, STAppidcs>::iterator mpIter;
  362. Byte *p = (Byte*)buf;
  363. STAppidcs stAppidcs;
  364. bool  b_Insert = false;
  365. /*
  366. stAppidcs.byAppid = *(p + 23);
  367. stAppidcs.byGroup = *(p + 16);
  368. stAppidcs.sIp = GetSourceIp(buf);
  369. stAppidcs.iPort = GetSourcePort(buf);
  370. stAppidcs.pcs = NULL;
  371. */
  372. mlock_mp.acquire();
  373. for ((mpIter = net->mpInfo.find(stAppidcs.byAppid)); mpIter != net->mpInfo.end(); mpIter++)
  374. {
  375.   if (mpIter->first != stAppidcs.byAppid) break;
  376.   if (mpIter->second.sIp != stAppidcs.sIp) continue;
  377.   b_Insert = true;
  378.   break;
  379. }
  380. if (!b_Insert) net->mpInfo.insert(std::make_pair(stAppidcs.byAppid, stAppidcs));
  381. mlock_mp.release();
  382. iClientStateSign = 1;
  383. }
  384. std::string CServerUdpcs::GetSourceIp(const char* buf)
  385. {
  386. ACE_INET_Addr taddr;
  387. int    iIp = 0;
  388. memcpy(&iIp, buf + 17, 4);
  389. taddr.set(1000, iIp);
  390. std::string sip = taddr.get_host_addr();
  391. return sip;
  392. }
  393. int CServerUdpcs::GetSourcePort(const char* buf)
  394. {
  395. int iport = 0;
  396. memcpy(&iport, buf + 21, 2);
  397. return iport;
  398. }
  399. //////////////////////////////////////////////////////
  400. int CClientAcceptor::open(void * avg)
  401. {  
  402. ACE_INET_Addr addr(net->iport, net->sip.c_str());
  403. this->acp.open(addr, 5);
  404. return this->reactor()->register_handler(this, ACE_Event_Handler::ACCEPT_MASK);
  405. }
  406. ACE_HANDLE CClientAcceptor::get_handle()const
  407. {
  408. return this->acp.get_handle();
  409. }
  410. int CClientAcceptor::handle_input(ACE_HANDLE fd)
  411. {
  412. CClientService* cs = NULL;
  413. cs = new CClientService();
  414. cs->SetParentHander(net);
  415. if (this->acp.accept(cs->peer()) == -1)
  416. {
  417.   delete cs;
  418.   return 0;
  419. }
  420. cs->reactor(this->reactor());
  421. if (cs->Open() == -1) cs->handle_close(ACE_INVALID_HANDLE, 0);
  422. return 0;
  423. }
  424. int CClientAcceptor::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
  425. {
  426. if (this->acp.get_handle() != ACE_INVALID_HANDLE)
  427. {
  428.   ACE_Reactor_Mask m = ACE_Event_Handler::ACCEPT_MASK | ACE_Event_Handler::DONT_CALL;
  429.   this->reactor()->remove_handler(this, m);
  430.   this->acp.close();
  431. }
  432. delete this;
  433. return 0;
  434. }
  435. void CClientAcceptor::SetParentHander(CNetWork *p)
  436. {
  437. net = p;
  438. }
  439. ///////////////////////////////////////////////////
  440. int CClientService::Open()
  441. {  
  442. ACE_INET_Addr peeraddr;
  443. this->sockeam.get_remote_addr(peeraddr);
  444. stAppidcs.iPort = peeraddr.get_port_number();
  445. stAppidcs.sIp =  peeraddr.get_host_addr();
  446. stAppidcs.pcs = this;
  447. return this->reactor()->register_handler(this, ACE_Event_Handler::READ_MASK);
  448. }
  449. ACE_HANDLE CClientService::get_handle()const
  450. {
  451. return this->sockeam.get_handle();
  452. }
  453. int  CClientService::handle_input(ACE_HANDLE fd)
  454. {
  455. char buf[1024] = {0};
  456. int isize = 0;
  457. isize = this->sockeam.recv(buf, 1024);
  458. if (isize <= 0) return -1;
  459. if (isize > 0 && isize < 1024) ChackProtocol(buf, isize);
  460. return 0;
  461. }
  462. int  CClientService::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask mask)
  463. {
  464. // 如果对方断开连接,会触发此事件,则会被调用两次(已屏蔽系统自动调用此函数)
  465. if (mask == ACE_Event_Handler::WRITE_MASK) return 0;
  466. DeleteAppid(this); // 删除外部指针
  467. mask = ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL;
  468. this->reactor()->remove_handler(this, mask);
  469. this->sockeam.close();
  470. delete this;
  471. return 0;
  472. }
  473. void CClientService::SetParentHander(CNetWork *p)
  474. {
  475. net = p;
  476. }
  477. int CClientService::SendData(const char* buf, const int nSize)
  478. {
  479. int  isize = 0;   
  480. isize = this->sockeam.send_n(buf, nSize);
  481. if (isize <= 0) this->handle_close(0, 0);
  482. return isize;
  483. }
  484. int CClientService::connect(ACE_INET_Addr addr)
  485. {
  486. ACE_Time_Value itimeout(1);
  487. return this->con.connect(sockeam, addr, &itimeout);
  488. }
  489. void CClientService::ChackProtocol(const char* buf, const int nSize)
  490. {
  491. Byte *p = (Byte*)buf;
  492. if ((*(p + 11) == 0x01) && (*(p + 15) == 0x02))
  493. {
  494.   if (!checkFrame(p, nSize))
  495.   {  
  496.    stAppidcs.byAppid = *(p + 6);  // 请求连接的客户端APPID
  497.    stAppidcs.byGroup = *(p + 16);
  498.    AddAppid(stAppidcs);    // 增加客户连接
  499.    return;
  500.   }
  501. }
  502. ReadDataToList(buf, nSize);     // 保存用户数据
  503. }
  504. void CClientService::AddAppid(STAppidcs stAppidcs)
  505. {
  506. std::map<Byte, STAppidcs>::iterator mpIter;
  507. for (mpIter = net->mpInfo.find(stAppidcs.byAppid); mpIter != net->mpInfo.end(); mpIter++)   
  508. {
  509.   if (mpIter->first != stAppidcs.byAppid) break;
  510.   if (mpIter->second.sIp != stAppidcs.sIp) continue;
  511.   mpIter->second.pcs = this;
  512.   break;
  513. }
  514. }
  515. void CClientService::DeleteAppid(CClientService* cs)
  516. {
  517. std::map<Byte, STAppidcs>::iterator mpIter;
  518. for (mpIter = net->mpInfo.begin(); mpIter != net->mpInfo.end(); mpIter++)   
  519. {
  520.   if (mpIter->second.pcs == cs)
  521.   {
  522.    mpIter->second.pcs = NULL;
  523.    break;
  524.   }
  525. }
  526. }
  527. void CClientService::ReadDataToList(const char* buf, const int nSize)
  528. {
  529. STListData stlData;
  530. stlData.iLength = nSize;
  531. stlData.byData = new Byte[nSize];
  532. memcpy(stlData.byData, buf, nSize);
  533. // 将数据保存到连表
  534. if (net->lstData.size() > 500)
  535. {
  536.   std::list<STListData>::iterator lstIter;
  537.   mlocka.acquire();
  538.   lstIter = net->lstData.begin();
  539.   delete []lstIter->byData;
  540.   net->lstData.erase(lstIter);
  541.   mlocka.release();
  542. }
  543. mlocka.acquire();
  544. net->lstData.push_back(stlData);
  545. mlocka.release();
  546. }
  547. //////////////////////////////////////////////
  548. ///*
  549. bool  CTaskTimer::Open(CNetWork *p)
  550. {
  551. pnet = p;
  552. ACE_Time_Value idlay(1);
  553. ACE_Time_Value ival(40);
  554. timeid = this->reactor()->schedule_timer(this, 0, idlay, ival);
  555. return true;
  556. }
  557. int CTaskTimer::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
  558. {  
  559. if (timeid) this->reactor()->cancel_timer(this);
  560. delete this;
  561. return 0;
  562. }
  563. int CTaskTimer::handle_timeout(const ACE_Time_Value& current_time, const void* act)
  564. {
  565. unsigned char buf[255] = {0};
  566. unsigned char isize = 4;
  567. /*
  568. memcpy(buf, "\x00\x01\x01\x04", 4);
  569. fixFrame(buf, isize, (pnet->iprtype ? 0x02 : 0x01), ServerAppid, pnet->byAppid);
  570. */
  571. pnet->SendData(buf, isize, ServerAppid);
  572. return 0;
  573. }
复制代码
发表于 2008-1-20 05:55:44 | 显示全部楼层
拜读中,学习中,多谢!
发表于 2008-1-27 15:32:01 | 显示全部楼层
我刚刚看玩reactor相关的内容,自己也试着写了一个小例子,但是没有关于handle_output的内容,我问一个问题,我在例子中是这样写的,从客户端连接过来的请求,svc_handle中的handle_input接受到发过来的数据然后再把需要的数据发回去,我觉得这样不对。发数据回去应该就是handle_output的事情了吧,如果是这样的话,改怎么写,何时注册write mask等。请教一下,谢谢
 楼主| 发表于 2008-1-27 15:35:08 | 显示全部楼层
如果你在handle_input中的写操作出现阻塞错误,表示无法写入了,就注册write_mask,然后在handle_output里面把剩下的内容发出。
发表于 2008-1-27 16:56:25 | 显示全部楼层
void handle_output (void)

{

// Asynchronously send the file

// to the client.

if (client_->send (file_.data (),

file_.size ())

== SOCKET_ERROR

&& errno == EWOULDBLOCK)

// Register with reactor...

else

// Close down and release resources.

handle_close ();

}

恩,我刚看到以上代码了,还有接受时候如果没有接受完数据的操作方式,开始我还很困惑为什么在
handle_input里面要再注册自己
// This is called by the Reactor when

// we can read from the client handle.

void handle_input (void)

{

int result = 0;

 

// Non-blocking read from the network

// connection.

do

result = request_.recv (client_->handle ());

while (result != SOCKET_ERROR && request_.state_ == INCOMPLETE);

 

// No more progress possible,

// blocking will occur

if (request_.state_ == INCOMPLETE && errno == EWOULDBLOCK)

reactor_->register_handler

(client_->handle (),

this,

Reactor::READ_MASK);

else

// We now have the entire request

parse_request ();

}


我想写这样一个功能,支持客户端多线程的文件下载的一个服务端,每次请求的数据应该在500kB以内,请问有什么特别值得注意的地方吗,我进行网络开发和使用ACE都刚开始学,谢谢
 楼主| 发表于 2008-1-27 17:25:41 | 显示全部楼层
需要自己定制协议,处理500K的需求。
不建议你刚开始就用ACE来处理所有的问题,ACE封装的太好了,反而让你不知道后面的API在干什么。建议你先学习基础的网络编程理论、概念,再辅助学习ACE,效果会比较好。
发表于 2008-2-1 17:24:40 | 显示全部楼层
谢谢了.代码还算看得懂,就是配置经常出错,用minGW+ACE
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-11-25 04:01 , Processed in 0.020919 second(s), 6 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表