找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 6562|回复: 3

ACE_Task基本应用框架

[复制链接]
发表于 2008-9-10 23:01:57 | 显示全部楼层 |阅读模式
ACE的ACE_Task框架提供了一种基于消息的编程模式,可以Windows编程的消息循环进行类比。
   
ACE_Task
Windows 消息循环
说明
消息类型
ACE_Message_Block*
MSG
Windows消息用MSG结构表示,ACE_Task中因为不能预计各种应用中消息的类型,所以ACE_Message_Block基本上可以理解为是对一个指针的封装,这个指针指向实际的一块内存或是一个对象等等。在创建ACE_Message_Block时,可以指定是由ACE_Message_Block来管理内存(构造函数中指定一个 size_t类型的大小),还是由我们自己管理内存(构造函数中指定一个指针)。而一个ACE_Message_Block类型的指针,就是一个消息,我们通过传递它来进行逻辑的业务处理。
发送消息
ACE_Task::putq
SendMessage
事实上,到底用SendMessage还是PostMessage与ACE_Task::putq来进行类比,我很为难,PostMessage发送一个消息后立刻返回,这与通常的ACE_Task::putq行为非常类似,因为ACE_Task是运行在另外一个线程上,ACE_Task::putq只是完成将消息插入到消息队列的工作,理论上它应该立刻返回,但实际上,ACE_Task的消息队列有容量大小限制,这个限制由我们自己限定,当当前消息队列满时,ACE_Task::putq将阻塞一直到可以插入,这时候就比较类似与SendMessage,
取出消息
ACE_Task::getq
GetMessage
GetMessage和ACE_Task::getq在当前消息队列没有消息时都会阻塞到有消息可用为止,所以对于它们的使用比较类似,通常会写一个消息循环函数
BOOL bRet;MSG msg;while( (bRet = GetMessage( &msg, NULL, 0, 0 )) != 0){     if (bRet == -1) {               // handle the error and possibly exit    }       else    {               TranslateMessage(&msg);                DispatchMessage(&msg);         }}            
ACE_Message_Block * msg;while(getq(msg) != -1)     // int putq (ACE_Message_Block *, ACE_Time_Value *timeout = 0);{ // process msg here}              
消息处理函数
默认没有提供
WNDPROC
通过TranslateMessage和DispatchMessage,Windows将消息投递到相映窗口的WNDPROC上,ACE_Task没有提供类似与WNDPROC的回调函数,如果愿意,我们可以在ACE_Task上写出类似的结构,而通常,我们直接在消息循环中编写处理消息的代码


  尽管看起来ACE_Task提供的消息系统与WIndows的消息系统很象,但实际上,它们还是有比较大的区别,要搭架一个基于ACE_Task的消息系统,通常要做如下的步骤:
  • 编写一个派生自ACE_Task的类,指定它的同步模式
    ACE_Task的消息队列可以由多个处理线程共享使用,所以需要提供同步模式,例如 ACE_MT_SYNCH和ACE_NULL_SYNCH分别表示基于多线程的同步和不使用同步,这个参数是ACE_Task的一个模板参数。
    class My_Task : public ACE_Task<ACE_MT_SYNCH>{public: virtual int svc();}
  • 重载 ACE_Task的 svc 方法,编写消息循环相关的代码
    int My_Task::svc(){ ACE_Message_Block * msg; while(getq(msg) != -1) // int putq (ACE_Message_Block *, ACE_Time_Value *timeout = 0); {  // process msg here }} svc 方法相当与处理线程的入口方法。
  • 假设 My_Task是一个基于ACE_Task的类,创建一个唯一的My_Task实例,这个可以通过
    typedef ACE_Singleton<MyTask, SYNCH_METHOD> MYTASK;
    然后总是使用MYTASK::instance方法来获取一个My_Task的指针来完成。
  • 在适当位置(一般是程序开始的时候),让My_Task开始工作
    MYTASK::intance()->activate(
    THR_NEW_LWP | THR_JOINABLE |THR_INHERIT_SCHED , // 线程创建的属性
    n_threads = 1, // 线程的数目,即有多少处理线程
    ...)
  • 在有消息发生的时候发送消息 ACE_Message_Block * msg;// fill the msg...MYTASK::intance()->putq(msg);
  最后考虑一个使用ACE_Task的实例,在一个编写WEB服务器的项目中,类 Request_Handler负责处理HTTP请求,Request_Hanlder派生自ACE_Task,当有请求时,其他的代码将Http请求构造成一个ACE_Message_Block,并调用Request_Handler的putq方法将请求插入消息队列,Request_Handler配置为根据CPU的数目创建处理线程,Request_Handler的svc方法从队列中获取请求进行处理,然后将处理的结果构造成为一个ACE_Message_Block,插入到Response_Handler的消息队列,Response_Handler也派生自ACE_Task,但它只有一个处理线程,它仅仅将相应的数据写回给客户端。
 楼主| 发表于 2008-9-10 23:02:08 | 显示全部楼层
我的 P2P_Task_CDN_PrgmAgent task 类 svc 方法中不能访问到我的成员变量
  1. class P2P_Task_CDN_PrgmAgent : public P2P_Task_Base
  2. {
  3. private:
  4. std::string m_transactionip;
  5. unsigned int m_transactionport;
  6. std::map<std::string,unsigned int> m_esaddrport;
  7. ACE_Recursive_Thread_Mutex recursive_mutex_; // 锁
  8. public:
  9. P2P_Task_CDN_PrgmAgent(P2P_Application * pApplication,UInt32 taskid);
  10. virtual ~P2P_Task_CDN_PrgmAgent(void);
  11. virtual bool Initialize()
  12. {
  13. return true;
  14. }
  15. void get_all_interfaces();
  16. virtual int svc();
  17. bool SendFileToEs();
  18. bool ProcessMessage(P2P_Message_Base *pMsg,
  19. ACE_Message_Block *pCurrentMessageBlock);
  20. };
  21. int P2P_Task_CDN_PrgmAgent:pen()
  22. {
  23. m_transactionip = "127.0.0.1";
  24. m_transactionport = 8888;
  25. }
  26. int P2P_Task_CDN_PrgmAgent::svc()
  27. {
  28. while(1)
  29. {
  30. ACE_Message_Block * pMessage = 0;
  31. P2P_Message_Base * pMsg = 0;
  32. {
  33. ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, recursive_mutex_,0);
  34. this->getq(pMessage);
  35. assert(pMessage);
  36. pMsg = P2P_Message_Base:eserialize(pMessage->base());
  37. pMessage->release();
  38. }
  39. assert(pMsg);
  40. char addr[20];
  41. memset(addr,0,sizeof(addr));
  42. ACE_OS::inet_ntop(AF_INET,&pMsg->uiParameter1,addr,sizeof(addr));
  43. ACE_DEBUG((LM_DEBUG,"drive ES:%p%s",addr));
  44. //怪异的是下一行,总是不能得到 m_transactionip 的值,每次都是 ""
  45. int ret = TransFile("c:\\twz.mp4",addr,
  46. pMsg->uiParameter2,m_transactionip,m_transactionport);
  47. }
  48. return 0;
  49. }
复制代码
发表于 2010-12-29 17:15:20 | 显示全部楼层
本帖最后由 maddreamw88 于 2010-12-29 17:21 编辑

回复 1# peakzhang


    好!学习了,前阵子有项目windows移植到linux 还特意实现了个postmessage 和 sendmessage, 早知道ace_task封装了此功能,就不用自己写了
发表于 2011-1-17 20:32:21 | 显示全部楼层
ACE的这个东西还是很好用的
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-5-6 00:48 , Processed in 0.019362 second(s), 6 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表