winston 发表于 2012-2-29 00:24:47

运用ACE_Thread_Manager类创建线程

   创建线程需要要解决两个问题,一是调用线程函数,二是提供一个途径让线程能够访问到外部传递过来的参数。下面的代码演示了基本的用法:
#include
<stdexcept>
#include "ace/ACE.h"
#include
"ace/Log_Msg.h"
#include "ace/Thread_Manager.h"
#include
<map>
#include <string>
#include <iostream>
using
namespace std;

class ThreadArg {
public:

    ThreadArg()
{
    }
private:
    string arg0;
public:

    void
setArg0(string value) {
      arg0 = value;
    }

    string
getArg0() const {
      return arg0;
    }
};

class MyThread
{
public:
    static void* run_svc(void* arg) {
      
ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%t) MyThread running/n")));
      
ThreadArg* pArg = static_cast<ThreadArg*> (arg);
      cout <<
pArg->getArg0() << endl;
    }
};

int ACE_TMAIN(int,
ACE_TCHAR *[]) {
    ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%t) Main Thread
running/n")));
    ThreadArg threadArg;
   
threadArg.setArg0("ok");
    ACE_thread_t threadID;
    if
(ACE_Thread_Manager::instance()->spawn(MyThread::run_svc,
static_cast<void*>(&threadArg),
            THR_DETACHED |
THR_SCOPE_SYSTEM, &threadID) == -1) {
      throw
std::runtime_error("can't create a new thread in SnapShotReqWiater::run
method");
    }
    ACE_Thread_Manager::instance()->wait();
   
return 0;
}

   使用ACE_Thread_Manager::spawn方法创建线程,第一个参数是线程的函数,第二个是一个对象指针,里面存放了参数,其他的参数请参考文档
http://www.dre.vanderbilt.edu/Doxygen/5.7.5/html/ace/a00676.html#a36262a470e556182f5d69c4de7cfeaa1
    wait方法等待线程运行完毕后才会返回。

运用ACE_Task_Base类创建线程    前面一种方法不够面向对象,线程需要成为一个对象,并且参数可以通过设置属性的形式自然的进行。下面的例子来自于<<ACE Programmers Guide>>,略作修改:
#include "ace/Task.h"
#include
<string>
#include <iostream>
using namespace std;

class
MyThread : public ACE_Task_Base {
public:

    virtual int svc(void)
{
      ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%t) Handler Thread
running/n")));
      cout<<arg0<<endl;
      
ACE_OS::sleep(4);
      return 0;
    }

    void setArg0(string
const& arg0) {
      this->arg0 = arg0;
    }

    string
getArg0() const {
      return arg0;
    }

private:
    string
arg0;
};

int ACE_TMAIN(int, ACE_TCHAR *[]) {
   
ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%t) Main Thread running/n")));

   
MyThread thread;
    string arg="ok";
    thread.setArg0(arg);
    int
result = thread.activate();
    ACE_ASSERT(result == 0);

   
thread.wait();
    return 0;
}

    MyThread类可以添加一些属性,svc成员函数可以内部访问这些属性。activate内部的实现代码实际上用的还是ACE_Thread_Manager::spawn_n方法。现在我们很清楚,通过继承ACE_Task_Base类来创建线程是更方便和优雅的。

ACE_Task
   线程之间常常需要通信,ACE有一种机制叫做消息队列。要获得这种能力,我们只需要使用ACE_Task_Base的子类ACT_Task即可。上面的例 子代码进行了一些修改,MyThread类从ACE_Task<>模板类派生,并且内部有一个循环,每隔四秒钟就会检查一下有没有消息,如果 消息类型是ACE_Message_Block::MB_STOP,则结束线程。
#include "ace/Task_T.h"
#include
<string>
#include <iostream>
using namespace std;

class
MyThread : public ACE_Task<ACE_MT_SYNCH> {
public:

    virtual
int svc(void) {
      ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%t) Handler Thread
running/n")));
      cout << arg0 << endl;
      for (;;)
{
            ACE_OS::sleep(4);
            ACE_Message_Block *
pMsg;
            ACE_Time_Value time(0, 100);
            if (getq(pMsg,
&time) != -1) {
                if (pMsg->msg_type() ==
ACE_Message_Block::MB_STOP) {
                  
pMsg->release();
                  
msg_queue_->close();
                  
ACE_Thread::exit(0);
                }
            }
      
}
      return 0;
    }

    void setArg0(string const& arg0)
{
      this->arg0 = arg0;
    }

    string getArg0() const
{
      return arg0;
    }

private:
    string
arg0;
};

int ACE_TMAIN(int, ACE_TCHAR *[]) {
   
ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%t) Main Thread running/n")));

   
MyThread thread;
    string arg="ok";
    thread.setArg0(arg);
    int
result = thread.activate();
    ACE_ASSERT(result == 0);

   
ACE_Message_Block* pMessage=new ACE_Message_Block(1);
   
pMessage->msg_type(ACE_Message_Block::MB_STOP);
   
thread.putq(pMessage);
   
    thread.wait();
    return
0;
}

注意:
1)putq和getq的第二个参数都是代表超时的时间,缺省为一直等待。
2)ACE_Message_Block作为消息,一般在发送者处用new创建,在接受者处调用release()方法销毁
3)msg_queue_->close();是关闭消息队列,使得发送者无法再发送消息。
4)msg_queue_->is_full()可以查看消息队列是否已经满
5) msg_queue_->high_water_mark()返回消息队列可以容纳的最后一个消息的索引,默认从0开始计数。我的机器上是默认最大索引是16384,也就是可以存放16385个消息。
6) msg_queue_->high_water_mark(size_t size)可以让我们重新设置消息队列的最大索引。

    关于ACE_Task的消息队列和Windows消息队列的比较,可以看这篇文章:http://blog.csdn.net/imjj/archive/2006/08/19/1097248.aspx
最后给一个比较复杂的消息队列的应用,利用ACE_OutputCDR类将对象序列化到ACE_Message_Block中,然后加入消息队列,另一个线程取出后,通过ACE_InputCDR类反序列化回对象。/*

* File:   main.cpp
* Author: chenshu
*
* Created on December 1,
2009, 7:21 PM
*/


#include "ace/Task_T.h"
#include
<string>
#include <iostream>
using namespace std;
#include
"ace/CDR_Stream.h"

class FileMessage {
public:
    static const
ACE_CDR::ULong createdFile=0;

public:
    ACE_CString getFolderPath()
const {
      return folderPath_;
    }

    void
setFolderPath(ACE_CString const& path) {
      folderPath_ =
path;
    }

    ACE_CString getFileName() const {
      return
fileName_;
    }

    void setFileName(ACE_CString const& name)
{
      fileName_ = name;
    }

    ACE_CDR::ULong
getFileMessageType() const{
      return messageType_;
    }

   
void setFileMessageType(ACE_CDR::ULong type) {
      messageType_ =
type;
    }

    ACE_CDR::ULong getSize() const {
      return
sizeof(ACE_CDR::ULong)+fileName_.length()+1+
               
sizeof(ACE_CDR::ULong)+
               
sizeof(ACE_CDR::ULong)+folderPath_.length()+1;
    }
private:
   
ACE_CString fileName_;
    ACE_CDR::ULong messageType_;
    ACE_CString
folderPath_;
};

int operator<<(ACE_OutputCDR & cdr,
FileMessage const& message) {
   
cdr<<message.getFileName();
   
cdr<<message.getFileMessageType();
   
cdr<<message.getFolderPath();
    return
cdr.good_bit();
}

int operator>>(ACE_InputCDR & cdr,
FileMessage & message) {
    ACE_CString name;
   
cdr>>name;
    message.setFileName(name);
    ACE_CDR::ULong
type;
    cdr >> type;
    message.setFileMessageType(type);
   
ACE_CString folderPath;
    cdr>>folderPath;
   
message.setFolderPath(folderPath);
    return
cdr.good_bit();
}

class MyThread : public ACE_Task<ACE_MT_SYNCH>
{
public:

    virtual int svc(void) {
      ACE_DEBUG((LM_DEBUG,
ACE_TEXT("(%t) Handler Thread running/n")));
      cout << arg0
<< endl;
      cout << "high_water_mark:" <<
msg_queue_->high_water_mark() << endl;
      for (;;)
{
            ACE_OS::sleep(4);
            ACE_Message_Block *
pMsg;
            ACE_Time_Value time(0, 100);
            if (getq(pMsg,
&time) != -1) {
                if (pMsg->msg_type() ==
ACE_Message_Block::MB_STOP) {
                  
pMsg->release();
                  
msg_queue_->close();
                  
ACE_Thread::exit(0);
                } else if(ACE_Message_Block::MB_DATA)
{
                  ACE_InputCDR inputCDR(pMsg);
                  
FileMessage message;
                  
inputCDR>>message;
                }
            }
      
}
      return 0;
    }

    void setArg0(string const& arg0)
{
      this->arg0 = arg0;
    }

    string getArg0() const
{
      return arg0;
    }

private:
    string
arg0;
};

int ACE_TMAIN(int, ACE_TCHAR *[]) {
   
ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%t) Main Thread running/n")));

   
MyThread thread;
    string arg = "ok";
    thread.setArg0(arg);
   
int result = thread.activate();
    ACE_ASSERT(result == 0);

   
FileMessage message;
   
message.setFileMessageType(FileMessage::createdFile);
   
message.setFileName("chenshu");
   
message.setFolderPath("/home/chenshu");
    ACE_OutputCDR
outputCDR(message.getSize()+ACE_CDR::MAX_ALIGNMENT);
   
outputCDR<<message;
    ACE_Message_Block*
pMessage=const_cast<ACE_Message_Block*>(outputCDR.begin());
   
thread.putq(pMessage);

    thread.wait();
    return 0;
}

http://blog.csdn.net/wallwind/article/details/7220083

页: [1]
查看完整版本: 运用ACE_Thread_Manager类创建线程