找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 4915|回复: 0

运用ACE_Thread_Manager类创建线程

[复制链接]
发表于 2012-2-29 00:24:47 | 显示全部楼层 |阅读模式
     创建线程需要要解决两个问题,一是调用线程函数,二是提供一个途径让线程能够访问到外部传递过来的参数。下面的代码演示了基本的用法:
  1. #include
  2. <stdexcept>
  3. #include "ace/ACE.h"
  4. #include
  5. "ace/Log_Msg.h"
  6. #include "ace/Thread_Manager.h"
  7. #include
  8. <map>
  9. #include <string>
  10. #include <iostream>
  11. using
  12. namespace std;
  13. class ThreadArg {
  14. public:
  15.     ThreadArg()
  16. {
  17.     }
  18. private:
  19.     string arg0;
  20. public:
  21.     void
  22. setArg0(string value) {
  23.         arg0 = value;
  24.     }
  25.     string
  26. getArg0() const {
  27.         return arg0;
  28.     }
  29. };
  30. class MyThread
  31. {
  32. public:
  33.     static void* run_svc(void* arg) {
  34.         
  35. ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%t) MyThread running/n")));
  36.         
  37. ThreadArg* pArg = static_cast<ThreadArg*> (arg);
  38.         cout <<
  39. pArg->getArg0() << endl;
  40.     }
  41. };
  42. int ACE_TMAIN(int,
  43. ACE_TCHAR *[]) {
  44.     ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%t) Main Thread
  45. running/n")));
  46.     ThreadArg threadArg;
  47.    
  48. threadArg.setArg0("ok");
  49.     ACE_thread_t threadID;
  50.     if
  51. (ACE_Thread_Manager::instance()->spawn(MyThread::run_svc,
  52. static_cast<void*>(&threadArg),
  53.             THR_DETACHED |
  54. THR_SCOPE_SYSTEM, &threadID) == -1) {
  55.         throw
  56. std::runtime_error("can't create a new thread in SnapShotReqWiater::run
  57. method");
  58.     }
  59.     ACE_Thread_Manager::instance()->wait();
  60.    
  61. return 0;
  62. }
复制代码
     使用ACE_Thread_Manager::spawn方法创建线程,第一个参数是线程的函数,第二个是一个对象指针,里面存放了参数,其他的参数请参考文档
http://www.dre.vanderbilt.edu/Doxygen/5.7.5/html/ace/a00676.html#a36262a470e556182f5d69c4de7cfeaa1
    wait方法等待线程运行完毕后才会返回。

运用ACE_Task_Base类创建线程    前面一种方法不够面向对象,线程需要成为一个对象,并且参数可以通过设置属性的形式自然的进行。下面的例子来自于<<ACE Programmers Guide>>,略作修改:
  1. #include "ace/Task.h"
  2. #include
  3. <string>
  4. #include <iostream>
  5. using namespace std;
  6. class
  7. MyThread : public ACE_Task_Base {
  8. public:
  9.     virtual int svc(void)
  10. {
  11.         ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%t) Handler Thread
  12. running/n")));
  13.         cout<<arg0<<endl;
  14.         
  15. ACE_OS::sleep(4);
  16.         return 0;
  17.     }
  18.     void setArg0(string
  19. const& arg0) {
  20.         this->arg0 = arg0;
  21.     }
  22.     string
  23. getArg0() const {
  24.         return arg0;
  25.     }
  26. private:
  27.     string
  28. arg0;
  29. };
  30. int ACE_TMAIN(int, ACE_TCHAR *[]) {
  31.    
  32. ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%t) Main Thread running/n")));
  33.    
  34. MyThread thread;
  35.     string arg="ok";
  36.     thread.setArg0(arg);
  37.     int
  38. result = thread.activate();
  39.     ACE_ASSERT(result == 0);
  40.    
  41. thread.wait();
  42.     return 0;
  43. }
复制代码

    MyThread类可以添加一些属性,svc成员函数可以内部访问这些属性。activate内部的实现代码实际上用的还是ACE_Thread_Manager::spawn_n方法。现在我们很清楚,通过继承ACE_Task_Base类来创建线程是更方便和优雅的。

ACE_Task
   线程之间常常需要通信,ACE有一种机制叫做消息队列。要获得这种能力,我们只需要使用ACE_Task_Base的子类ACT_Task即可。上面的例 子代码进行了一些修改,MyThread类从ACE_Task<>模板类派生,并且内部有一个循环,每隔四秒钟就会检查一下有没有消息,如果 消息类型是ACE_Message_Block::MB_STOP,则结束线程。
  1. #include "ace/Task_T.h"
  2. #include
  3. <string>
  4. #include <iostream>
  5. using namespace std;
  6. class
  7. MyThread : public ACE_Task<ACE_MT_SYNCH> {
  8. public:
  9.     virtual
  10. int svc(void) {
  11.         ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%t) Handler Thread
  12. running/n")));
  13.         cout << arg0 << endl;
  14.         for (;;)
  15. {
  16.             ACE_OS::sleep(4);
  17.             ACE_Message_Block *
  18. pMsg;
  19.             ACE_Time_Value time(0, 100);
  20.             if (getq(pMsg,
  21. &time) != -1) {
  22.                 if (pMsg->msg_type() ==
  23. ACE_Message_Block::MB_STOP) {
  24.                     
  25. pMsg->release();
  26.                     
  27. msg_queue_->close();
  28.                     
  29. ACE_Thread::exit(0);
  30.                 }
  31.             }
  32.         
  33. }
  34.         return 0;
  35.     }
  36.     void setArg0(string const& arg0)
  37. {
  38.         this->arg0 = arg0;
  39.     }
  40.     string getArg0() const
  41. {
  42.         return arg0;
  43.     }
  44. private:
  45.     string
  46. arg0;
  47. };
  48. int ACE_TMAIN(int, ACE_TCHAR *[]) {
  49.    
  50. ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%t) Main Thread running/n")));
  51.    
  52. MyThread thread;
  53.     string arg="ok";
  54.     thread.setArg0(arg);
  55.     int
  56. result = thread.activate();
  57.     ACE_ASSERT(result == 0);
  58.    
  59. ACE_Message_Block* pMessage=new ACE_Message_Block(1);
  60.    
  61. pMessage->msg_type(ACE_Message_Block::MB_STOP);
  62.    
  63. thread.putq(pMessage);
  64.    
  65.     thread.wait();
  66.     return
  67. 0;
  68. }
复制代码
注意:
1)putq和getq的第二个参数都是代表超时的时间,缺省为一直等待。
2)ACE_Message_Block作为消息,一般在发送者处用new创建,在接受者处调用release()方法销毁
3)msg_queue_->close();是关闭消息队列,使得发送者无法再发送消息。
4)msg_queue_->is_full()可以查看消息队列是否已经满
5) msg_queue_->high_water_mark()返回消息队列可以容纳的最后一个消息的索引,默认从0开始计数。我的机器上是默认最大索引是16384,也就是可以存放16385个消息。
6) msg_queue_->high_water_mark(size_t size)可以让我们重新设置消息队列的最大索引。

    关于ACE_Task的消息队列和Windows消息队列的比较,可以看这篇文章:http://blog.csdn.net/imjj/archive/2006/08/19/1097248.aspx
最后给一个比较复杂的消息队列的应用,利用ACE_OutputCDR类将对象序列化到ACE_Message_Block中,然后加入消息队列,另一个线程取出后,通过ACE_InputCDR类反序列化回对象。
  1. /*
  2. * File:   main.cpp
  3. * Author: chenshu
  4. *
  5. * Created on December 1,
  6. 2009, 7:21 PM
  7. */
  8. #include "ace/Task_T.h"
  9. #include
  10. <string>
  11. #include <iostream>
  12. using namespace std;
  13. #include
  14. "ace/CDR_Stream.h"
  15. class FileMessage {
  16. public:
  17.     static const
  18. ACE_CDR::ULong createdFile=0;
  19. public:
  20.     ACE_CString getFolderPath()
  21. const {
  22.         return folderPath_;
  23.     }
  24.     void
  25. setFolderPath(ACE_CString const& path) {
  26.         folderPath_ =
  27. path;
  28.     }
  29.     ACE_CString getFileName() const {
  30.         return
  31. fileName_;
  32.     }
  33.     void setFileName(ACE_CString const& name)
  34. {
  35.         fileName_ = name;
  36.     }
  37.     ACE_CDR::ULong
  38. getFileMessageType() const{
  39.         return messageType_;
  40.     }
  41.    
  42. void setFileMessageType(ACE_CDR::ULong type) {
  43.         messageType_ =
  44. type;
  45.     }
  46.     ACE_CDR::ULong getSize() const {
  47.         return
  48. sizeof(ACE_CDR::ULong)+fileName_.length()+1+
  49.                
  50. sizeof(ACE_CDR::ULong)+
  51.                
  52. sizeof(ACE_CDR::ULong)+folderPath_.length()+1;
  53.     }
  54. private:
  55.    
  56. ACE_CString fileName_;
  57.     ACE_CDR::ULong messageType_;
  58.     ACE_CString
  59. folderPath_;
  60. };
  61. int operator<<(ACE_OutputCDR & cdr,
  62. FileMessage const& message) {
  63.    
  64. cdr<<message.getFileName();
  65.    
  66. cdr<<message.getFileMessageType();
  67.    
  68. cdr<<message.getFolderPath();
  69.     return
  70. cdr.good_bit();
  71. }
  72. int operator>>(ACE_InputCDR & cdr,
  73. FileMessage & message) {
  74.     ACE_CString name;
  75.    
  76. cdr>>name;
  77.     message.setFileName(name);
  78.     ACE_CDR::ULong
  79. type;
  80.     cdr >> type;
  81.     message.setFileMessageType(type);
  82.    
  83. ACE_CString folderPath;
  84.     cdr>>folderPath;
  85.    
  86. message.setFolderPath(folderPath);
  87.     return
  88. cdr.good_bit();
  89. }
  90. class MyThread : public ACE_Task<ACE_MT_SYNCH>
  91. {
  92. public:
  93.     virtual int svc(void) {
  94.         ACE_DEBUG((LM_DEBUG,
  95. ACE_TEXT("(%t) Handler Thread running/n")));
  96.         cout << arg0
  97. << endl;
  98.         cout << "high_water_mark:" <<
  99. msg_queue_->high_water_mark() << endl;
  100.         for (;;)
  101. {
  102.             ACE_OS::sleep(4);
  103.             ACE_Message_Block *
  104. pMsg;
  105.             ACE_Time_Value time(0, 100);
  106.             if (getq(pMsg,
  107. &time) != -1) {
  108.                 if (pMsg->msg_type() ==
  109. ACE_Message_Block::MB_STOP) {
  110.                     
  111. pMsg->release();
  112.                     
  113. msg_queue_->close();
  114.                     
  115. ACE_Thread::exit(0);
  116.                 } else if(ACE_Message_Block::MB_DATA)
  117. {
  118.                     ACE_InputCDR inputCDR(pMsg);
  119.                     
  120. FileMessage message;
  121.                     
  122. inputCDR>>message;
  123.                 }
  124.             }
  125.         
  126. }
  127.         return 0;
  128.     }
  129.     void setArg0(string const& arg0)
  130. {
  131.         this->arg0 = arg0;
  132.     }
  133.     string getArg0() const
  134. {
  135.         return arg0;
  136.     }
  137. private:
  138.     string
  139. arg0;
  140. };
  141. int ACE_TMAIN(int, ACE_TCHAR *[]) {
  142.    
  143. ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%t) Main Thread running/n")));
  144.    
  145. MyThread thread;
  146.     string arg = "ok";
  147.     thread.setArg0(arg);
  148.    
  149. int result = thread.activate();
  150.     ACE_ASSERT(result == 0);
  151.    
  152. FileMessage message;
  153.    
  154. message.setFileMessageType(FileMessage::createdFile);
  155.    
  156. message.setFileName("chenshu");
  157.    
  158. message.setFolderPath("/home/chenshu");
  159.     ACE_OutputCDR
  160. outputCDR(message.getSize()+ACE_CDR::MAX_ALIGNMENT);
  161.    
  162. outputCDR<<message;
  163.     ACE_Message_Block*
  164. pMessage=const_cast<ACE_Message_Block*>(outputCDR.begin());
  165.    
  166. thread.putq(pMessage);
  167.     thread.wait();
  168.     return 0;
  169. }
复制代码

http://blog.csdn.net/wallwind/article/details/7220083

您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-12-22 16:28 , Processed in 0.017067 second(s), 6 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表