找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 5056|回复: 3

ACE中使用完成端口-Proactor框架

[复制链接]
发表于 2008-5-11 17:17:11 | 显示全部楼层 |阅读模式
作者:阿彪
地址:阿彪的Blog

使用ACE中的Proactor的话,会要比我们使用我们直接写的要来得简单。
在说Proactor之前我们需要了解Windows里的完成端口的工作原理。
      完成端口是WinNT内核里的一个框架。我们可以为我们的一些异步的操作
新建一个完成端口,然后这个完成端口会有几个工作线程来处理。我们
可以将socket,或是一个文件读写,或是一个串口的收发数据的句柄,
梆定到这个完成端口之上,当一个读或是写的事件完成之后,完成端口
机制将会自动将一个完成消息放到完成队列中,完成端口的工作线程池
将会被触发调用。回调的时候我们梆定时将一些基础的信息也梆在其中,
当工作线程也会通过一种叫做完成项的指针返回给你。就是说,你可能
梆定了多个socket或是文件都是没有问题的。按微软人写的文档里说的
可以面对百万个这样的异步对象。
      这里我就不再使用WinAPI写完成端口了。
      现在是使用ACE框架来写一个。
      使用他来做一个完成端口步骤也是一样的。
开始的时候需要一个完成端口,还有完成端口的工作线程池。在ACE框架
里提供了一种叫ACE_Task的线程池模块类
和一般的线程类一样,它的工作时调用的函数是
virtual int svc (void);
只是如何使用呢。无非是开启线程与关闭线程两个操作。
在此类中定义一个ACE_Thread_Semaphore sem_;变量
然后开户n个线程的过程就是这样的:
int Proactor_Task::star(int nMax)
{
...
this->activate (THR_NEW_LWP, nMax);
for (;nMax>0;nMax--)
{
  sem_.acquire();
}
return 0;
}
一个是创建,二个是一个一个的触发。让这一些线程都工作.
当然工作线程都要释放自己:
int Proactor_Task::svc()
{
ACE_DEBUG((LM_INFO,ACE_TEXT("svc函数调用!\n")));
sem_.release(1);
...
return 0;
}
好了。这个线程池开始工作了。接下来,我们要做将完成端口对象给创建出来:
在这个线程池里定义一个完成端口对象指针:
ACE_Proactor * proactor_;
创建的过程是这样的。
//是在Win32下,就使用这个Proactor的实现
ACE_WIN32_Proactor *proactor_impl = new ACE_WIN32_Proactor();   //新建proactor的实现
proactor_=new ACE_Proactor(proactor_impl,1);     //与proactor关联
ACE_Proactor::instance (this->proactor_, 1);     //将新建出来的proactor保存在静态框架里
如何删除呢。
ACE_Proactor::end_event_loop();
this->wait();
之后来写线程池里的函数
ACE_Proactor::run_event_loop();
只要写一句就OK了。
这就完成了一个完成端口对象的创建过程。我们只要做一下封装就OK了。
给它一个工作线程的大小。之后它就会自动的新建一个完成端口在ACE_Proactor::instance里。
接下来我们要做Acceptor与recv。
实计上ACE里已经为我们写好了。它们就是:
ACE_Asynch_Acceptor类ACE_Service_Handler类
class Accepte : public ACE_Asynch_Acceptor<Receive>
class Receive : public ACE_Service_Handler
这样一继承,工作就已经完成了。
如果我们想得到这一些网络事件的话,可以做一些继承就OK了。
他们内部调用的过程是这样的:
当有一个新的用户连接上来之后。
Accepte会有一个函数回调。
virtual HANDLER *make_handler (void);
这个函数里,我们必需写一个new Receive对象。
new完成之后Receive的open函数将会回调
open函数调用的时候,此接收对象的socket句柄就得到了。我们就在这个时候需要将一个
读、写的流梆定在其中。还有就是做一步异步的接收数据。这个如果你写过重叠方式的话就会
比较的了解,这也叫做异步的I/O的投递。等这个recv完成之后就会回调。
好了。这就算完成了。现在把代码贴出来。
我不知道如何做一个下载点。不好意思只有大家自己复制下来
我使用的VC6.0,ACE5.4.1的编程环境
  1. // Accepte.h: interface for the Accepte class.
  2. //
  3. //////////////////////////////////////////////////////////////////////
  4. #if !defined(AFX_ACCEPTE_H__DCEC809D_E5D2_48D1_A8A7_C9FD3C4D7C15__INCLUDED_)
  5. #define AFX_ACCEPTE_H__DCEC809D_E5D2_48D1_A8A7_C9FD3C4D7C15__INCLUDED_
  6. #if _MSC_VER > 1000
  7. #pragma once
  8. #endif // _MSC_VER > 1000
  9. #include <ace/Asynch_Acceptor.h>
  10. #include "Receive.h"
  11. class Accepte : public ACE_Asynch_Acceptor<Receive>
  12. {
  13. public:
  14. Receive* make_handler (void);
  15. Accepte();
  16. virtual ~Accepte();
  17. };
  18. #endif // !defined(AFX_ACCEPTE_H__DCEC809D_E5D2_48D1_A8A7_C9FD3C4D7C15__INCLUDED_)
  19. // Accepte.cpp: implementation of the Accepte class.
  20. //
  21. //////////////////////////////////////////////////////////////////////
  22. #include "Accepte.h"
  23. //////////////////////////////////////////////////////////////////////
  24. // Construction/Destruction
  25. //////////////////////////////////////////////////////////////////////
  26. Accepte::Accepte()
  27. {
  28. }
  29. Accepte::~Accepte()
  30. {
  31. }
  32. Receive* Accepte::make_handler(void)
  33. {
  34. return new Receive();
  35. }
  36. // Proactor_Task.h: interface for the Proactor_Task class.
  37. //
  38. //////////////////////////////////////////////////////////////////////
  39. #if !defined(AFX_PROACTOR_TASK_H__12F37C95_9872_4923_89A2_5A59AE7AC1FD__INCLUDED_)
  40. #define AFX_PROACTOR_TASK_H__12F37C95_9872_4923_89A2_5A59AE7AC1FD__INCLUDED_
  41. #if _MSC_VER > 1000
  42. #pragma once
  43. #endif // _MSC_VER > 1000
  44. #include "ace\Task_T.h"
  45. #include "ace\Thread_Semaphore.h"
  46. #include "ace\Proactor.h"
  47. #include "ace\WIN32_Proactor.h"
  48. class Proactor_Task : public ACE_Task<ACE_MT_SYNCH>
  49. {
  50. public:
  51. Proactor_Task();
  52. virtual ~Proactor_Task();
  53. int star(int nMax);
  54. int stop();
  55. virtual int svc (void);
  56. int create_proactor();
  57. int release_proactor();
  58. ACE_Thread_Semaphore sem_;
  59. ACE_Proactor * proactor_;
  60. };
  61. #endif // !defined(AFX_PROACTOR_TASK_H__12F37C95_9872_4923_89A2_5A59AE7AC1FD__INCLUDED_)
  62. // Proactor_Task.cpp: implementation of the Proactor_Task class.
  63. //
  64. //////////////////////////////////////////////////////////////////////
  65. #include "Proactor_Task.h"
  66. //////////////////////////////////////////////////////////////////////
  67. // Construction/Destruction
  68. //////////////////////////////////////////////////////////////////////
  69. Proactor_Task::Proactor_Task()
  70. {
  71. }
  72. Proactor_Task::~Proactor_Task()
  73. {
  74. }
  75. int Proactor_Task::star(int nMax)
  76. {
  77. create_proactor();
  78. this->activate (THR_NEW_LWP, nMax);
  79. for (;nMax>0;nMax--)
  80. {
  81.   sem_.acquire();
  82. }
  83. return 0;
  84. }
  85. int Proactor_Task::stop()
  86. {
  87. ACE_Proactor::end_event_loop();
  88. this->wait();
  89. return 0;
  90. }
  91. int Proactor_Task::release_proactor()
  92. {
  93. ACE_Proactor::close_singleton ();
  94. proactor_ = 0;
  95. return 0;
  96. }
  97. int Proactor_Task::create_proactor()
  98. {
  99. ACE_WIN32_Proactor *proactor_impl = 0;
  100. ACE_NEW_RETURN (proactor_impl,
  101.   ACE_WIN32_Proactor,
  102.   -1);
  103.   // always delete implementation  1 , not  !(proactor_impl == 0)
  104. ACE_NEW_RETURN (this->proactor_,
  105.   ACE_Proactor (proactor_impl, 1 ),
  106.   -1);
  107. // Set new singleton and delete it in close_singleton()
  108. ACE_Proactor::instance (this->proactor_, 1);
  109. return 0;
  110. }
  111. int Proactor_Task::svc()
  112. {
  113. ACE_DEBUG((LM_INFO,ACE_TEXT("svc函数调用!\n")));
  114. sem_.release(1);
  115. ACE_Proactor::run_event_loop();
  116. return 0;
  117. }
  118. // Receive.h: interface for the Receive class.
  119. //
  120. //////////////////////////////////////////////////////////////////////
  121. #if !defined(AFX_RECEIVE_H__0E7EF8C0_465F_4D9C_8A29_0C2A0F1EAFFE__INCLUDED_)
  122. #define AFX_RECEIVE_H__0E7EF8C0_465F_4D9C_8A29_0C2A0F1EAFFE__INCLUDED_
  123. #if _MSC_VER > 1000
  124. #pragma once
  125. #endif // _MSC_VER > 1000
  126. #include <ace/Asynch_io.h>
  127. #include <ace/Message_Block.h>
  128. #include <ace/Log_Msg.h>
  129. #include <ace/OS_Memory.h>
  130. class Receive : public ACE_Service_Handler
  131. {
  132. public:
  133. Receive();
  134. virtual ~Receive()
  135. {
  136.   if (this->handle() != ACE_INVALID_HANDLE )
  137.   {
  138.    closesocket(SOCKET(this->handle()));
  139.   }
  140. }
  141. virtual void open(ACE_HANDLE h,ACE_Message_Block& );
  142. virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result);
  143. virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result);
  144. private:
  145. ACE_Asynch_Write_Stream write_;
  146. ACE_Asynch_Read_Stream reader_;
  147. };
  148. #endif // !defined(AFX_RECEIVE_H__0E7EF8C0_465F_4D9C_8A29_0C2A0F1EAFFE__INCLUDED_)
  149. // Receive.cpp: implementation of the Receive class.
  150. //
  151. //////////////////////////////////////////////////////////////////////
  152. #include "Receive.h"
  153. //////////////////////////////////////////////////////////////////////
  154. // Construction/Destruction
  155. //////////////////////////////////////////////////////////////////////
  156. Receive::Receive()
  157. {
  158. }
  159. void Receive::open(ACE_HANDLE h,ACE_Message_Block& )
  160. {
  161. this->handle(h);
  162. if (this->write_.open(*this)!=0 ||
  163.   this->reader_.open(*this) != 0 )
  164. {
  165.   delete this;
  166.   return ;
  167. }
  168. ACE_Message_Block *mb;
  169. ACE_NEW_NORETURN(mb,ACE_Message_Block(1024));
  170. if ( this->reader_.read(*mb,mb->space()) != 0)
  171. {
  172.   ACE_ERROR((LM_ERROR,ACE_TEXT(" (%t) error information %p.")));
  173.   mb->release();
  174.   delete this;
  175.   return;
  176. }
  177. }
  178. void Receive::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
  179. {
  180. ACE_Message_Block &mb = result.message_block();
  181. if ( !result.success() || result.bytes_transferred() == 0)
  182. {
  183.   mb.release();
  184.   delete this;
  185. }
  186. else
  187. {
  188.   ACE_Message_Block* new_mb;
  189.   ACE_NEW_NORETURN(new_mb,ACE_Message_Block(1024));
  190.   this->reader_.read(*new_mb,new_mb->space());
  191. }
  192. return ;
  193. }
  194. void Receive::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
  195. {
  196. result.message_block().release();
  197. return ;
  198. }
  199. ////////////////////////////////////////////////////////////////////////////////////////////////////
  200. //main.cpp
  201. #ifdef _DEBUG
  202. #pragma comment(lib,"aced")
  203. #else
  204. #pragma comment(lib,"ace")
  205. #endif
  206. #include <ace\ace.h>
  207. #include "Accepte.h"
  208. #include "Proactor_Task.h"
  209. int ACE_TMAIN(int ,char*[])
  210. {
  211. Proactor_Task task;
  212. task.star(3);
  213. Accepte accepte;
  214. accepte.open(ACE_INET_Addr (2222), 0, 1,ACE_DEFAULT_BACKLOG,1,ACE_Proactor::instance());
  215. int nExit=0;
  216. while (nExit==0)
  217.   scanf("%d",&nExit);
  218. return 0;
  219. }
复制代码
 楼主| 发表于 2008-5-11 17:17:56 | 显示全部楼层
这个用在MFC的工程中会有问题,开始一下,CPU占用很高,开了一个Listen的端口。过几秒就关闭那个Socket了。
把Main函数放在一个类中了
  1. BOOL CServerMsg::StartNetWork(WORD wPort)
  2. {
  3. //m_network.Stop();
  4. ////这里应该修正异步事件Socket的启动函数,当启动线程失败应该返回-1
  5. //m_wListenPort = m_network.Start(m_wListenPort,m_wListenPort?FALSE:TRUE);
  6. //if(m_wListenPort!=0xFFFF)
  7. // return TRUE;
  8. if (wPort > 0)
  9. {
  10.   m_wListenPort = wPort;
  11. }
  12. //ACE_Proactor * pAP = ACE_Proactor::instance();
  13. m_task.star(3);
  14. m_accepte.open(ACE_INET_Addr (m_wListenPort), 0, 1,
  15.   ACE_DEFAULT_BACKLOG,1, ACE_Proactor::instance());
  16. int nExit=0;
  17. while (nExit==0)
  18. {
  19.   Sleep(100);
  20.   //scanf("%d",&nExit);
  21. }
  22. return TRUE;
  23. }
复制代码
然后在对话框的初始化中调用这个函数。
其它的类一样的处理
 楼主| 发表于 2008-5-11 17:18:19 | 显示全部楼层
问题解决了,这个例子是正确可以用了。
我查宏发现 ACE_DEFAULT_BACKLOG为5,可是调试时不知道为什么这个变成了一个很大的数字几百万的随机数.
use 5 directly,
m_accepte.open(ACE_INET_Addr (m_wListenPort), 0, 1,
  5 ,1, ACE_Proactor::instance()); //ACE_DEFAULT_BACKLOG
 楼主| 发表于 2008-5-11 17:18:35 | 显示全部楼层
这个函数时有问题的
void Receive::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
{
ACE_Message_Block &mb = result.message_block();
if ( !result.success() || result.bytes_transferred() == 0)
{
  mb.release();
  delete this;
}
else
{
  ACE_Message_Block* new_mb;
  ACE_NEW_NORETURN(new_mb,ACE_Message_Block(1024));
  this->reader_.read(*new_mb,new_mb->space());
}
这个已经是接受完成后 的消息处理,为什么还要从Stream中去Read呢,直接处理 Result的mb 就行啊?
void Receive::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
{
size_t nMsgLen = result.bytes_transferred() ;
ACE_Message_Block &mb = result.message_block();
if ( !result.success() || nMsgLen == 0)
{
  mb.release();
  delete this;
}
else
{
//ACE_Message_Block* new_mb;
  //ACE_NEW_NORETURN(new_mb,ACE_Message_Block(nMsgLen + 1));
  //this->reader_.read(*new_mb, nMsgLen);
  //消息头分析处理发到给处理中处理
  RecvMsgHander(&mb);//自定义消息处理函数,直接处理接收完成的数据
}
mb.release();
return ;
}
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-5-4 13:55 , Processed in 0.013656 second(s), 5 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表