运用ACE_Thread_Manager类创建线程
创建线程需要要解决两个问题,一是调用线程函数,二是提供一个途径让线程能够访问到外部传递过来的参数。下面的代码演示了基本的用法:#include
<stdexcept>
#include "ace/ACE.h"
#include
"ace/Log_Msg.h"
#include "ace/Thread_Manager.h"
#include
<map>
#include <string>
#include <iostream>
using
namespace std;
class ThreadArg {
public:
ThreadArg()
{
}
private:
string arg0;
public:
void
setArg0(string value) {
arg0 = value;
}
string
getArg0() const {
return arg0;
}
};
class MyThread
{
public:
static void* run_svc(void* arg) {
ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%t) MyThread running/n")));
ThreadArg* pArg = static_cast<ThreadArg*> (arg);
cout <<
pArg->getArg0() << endl;
}
};
int ACE_TMAIN(int,
ACE_TCHAR *[]) {
ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%t) Main Thread
running/n")));
ThreadArg threadArg;
threadArg.setArg0("ok");
ACE_thread_t threadID;
if
(ACE_Thread_Manager::instance()->spawn(MyThread::run_svc,
static_cast<void*>(&threadArg),
THR_DETACHED |
THR_SCOPE_SYSTEM, &threadID) == -1) {
throw
std::runtime_error("can't create a new thread in SnapShotReqWiater::run
method");
}
ACE_Thread_Manager::instance()->wait();
return 0;
}
使用ACE_Thread_Manager::spawn方法创建线程,第一个参数是线程的函数,第二个是一个对象指针,里面存放了参数,其他的参数请参考文档
http://www.dre.vanderbilt.edu/Doxygen/5.7.5/html/ace/a00676.html#a36262a470e556182f5d69c4de7cfeaa1
wait方法等待线程运行完毕后才会返回。
运用ACE_Task_Base类创建线程 前面一种方法不够面向对象,线程需要成为一个对象,并且参数可以通过设置属性的形式自然的进行。下面的例子来自于<<ACE Programmers Guide>>,略作修改:
#include "ace/Task.h"
#include
<string>
#include <iostream>
using namespace std;
class
MyThread : public ACE_Task_Base {
public:
virtual int svc(void)
{
ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%t) Handler Thread
running/n")));
cout<<arg0<<endl;
ACE_OS::sleep(4);
return 0;
}
void setArg0(string
const& arg0) {
this->arg0 = arg0;
}
string
getArg0() const {
return arg0;
}
private:
string
arg0;
};
int ACE_TMAIN(int, ACE_TCHAR *[]) {
ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%t) Main Thread running/n")));
MyThread thread;
string arg="ok";
thread.setArg0(arg);
int
result = thread.activate();
ACE_ASSERT(result == 0);
thread.wait();
return 0;
}
MyThread类可以添加一些属性,svc成员函数可以内部访问这些属性。activate内部的实现代码实际上用的还是ACE_Thread_Manager::spawn_n方法。现在我们很清楚,通过继承ACE_Task_Base类来创建线程是更方便和优雅的。
ACE_Task
线程之间常常需要通信,ACE有一种机制叫做消息队列。要获得这种能力,我们只需要使用ACE_Task_Base的子类ACT_Task即可。上面的例 子代码进行了一些修改,MyThread类从ACE_Task<>模板类派生,并且内部有一个循环,每隔四秒钟就会检查一下有没有消息,如果 消息类型是ACE_Message_Block::MB_STOP,则结束线程。
#include "ace/Task_T.h"
#include
<string>
#include <iostream>
using namespace std;
class
MyThread : public ACE_Task<ACE_MT_SYNCH> {
public:
virtual
int svc(void) {
ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%t) Handler Thread
running/n")));
cout << arg0 << endl;
for (;;)
{
ACE_OS::sleep(4);
ACE_Message_Block *
pMsg;
ACE_Time_Value time(0, 100);
if (getq(pMsg,
&time) != -1) {
if (pMsg->msg_type() ==
ACE_Message_Block::MB_STOP) {
pMsg->release();
msg_queue_->close();
ACE_Thread::exit(0);
}
}
}
return 0;
}
void setArg0(string const& arg0)
{
this->arg0 = arg0;
}
string getArg0() const
{
return arg0;
}
private:
string
arg0;
};
int ACE_TMAIN(int, ACE_TCHAR *[]) {
ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%t) Main Thread running/n")));
MyThread thread;
string arg="ok";
thread.setArg0(arg);
int
result = thread.activate();
ACE_ASSERT(result == 0);
ACE_Message_Block* pMessage=new ACE_Message_Block(1);
pMessage->msg_type(ACE_Message_Block::MB_STOP);
thread.putq(pMessage);
thread.wait();
return
0;
}
注意:
1)putq和getq的第二个参数都是代表超时的时间,缺省为一直等待。
2)ACE_Message_Block作为消息,一般在发送者处用new创建,在接受者处调用release()方法销毁
3)msg_queue_->close();是关闭消息队列,使得发送者无法再发送消息。
4)msg_queue_->is_full()可以查看消息队列是否已经满
5) msg_queue_->high_water_mark()返回消息队列可以容纳的最后一个消息的索引,默认从0开始计数。我的机器上是默认最大索引是16384,也就是可以存放16385个消息。
6) msg_queue_->high_water_mark(size_t size)可以让我们重新设置消息队列的最大索引。
关于ACE_Task的消息队列和Windows消息队列的比较,可以看这篇文章:http://blog.csdn.net/imjj/archive/2006/08/19/1097248.aspx
最后给一个比较复杂的消息队列的应用,利用ACE_OutputCDR类将对象序列化到ACE_Message_Block中,然后加入消息队列,另一个线程取出后,通过ACE_InputCDR类反序列化回对象。/*
* File: main.cpp
* Author: chenshu
*
* Created on December 1,
2009, 7:21 PM
*/
#include "ace/Task_T.h"
#include
<string>
#include <iostream>
using namespace std;
#include
"ace/CDR_Stream.h"
class FileMessage {
public:
static const
ACE_CDR::ULong createdFile=0;
public:
ACE_CString getFolderPath()
const {
return folderPath_;
}
void
setFolderPath(ACE_CString const& path) {
folderPath_ =
path;
}
ACE_CString getFileName() const {
return
fileName_;
}
void setFileName(ACE_CString const& name)
{
fileName_ = name;
}
ACE_CDR::ULong
getFileMessageType() const{
return messageType_;
}
void setFileMessageType(ACE_CDR::ULong type) {
messageType_ =
type;
}
ACE_CDR::ULong getSize() const {
return
sizeof(ACE_CDR::ULong)+fileName_.length()+1+
sizeof(ACE_CDR::ULong)+
sizeof(ACE_CDR::ULong)+folderPath_.length()+1;
}
private:
ACE_CString fileName_;
ACE_CDR::ULong messageType_;
ACE_CString
folderPath_;
};
int operator<<(ACE_OutputCDR & cdr,
FileMessage const& message) {
cdr<<message.getFileName();
cdr<<message.getFileMessageType();
cdr<<message.getFolderPath();
return
cdr.good_bit();
}
int operator>>(ACE_InputCDR & cdr,
FileMessage & message) {
ACE_CString name;
cdr>>name;
message.setFileName(name);
ACE_CDR::ULong
type;
cdr >> type;
message.setFileMessageType(type);
ACE_CString folderPath;
cdr>>folderPath;
message.setFolderPath(folderPath);
return
cdr.good_bit();
}
class MyThread : public ACE_Task<ACE_MT_SYNCH>
{
public:
virtual int svc(void) {
ACE_DEBUG((LM_DEBUG,
ACE_TEXT("(%t) Handler Thread running/n")));
cout << arg0
<< endl;
cout << "high_water_mark:" <<
msg_queue_->high_water_mark() << endl;
for (;;)
{
ACE_OS::sleep(4);
ACE_Message_Block *
pMsg;
ACE_Time_Value time(0, 100);
if (getq(pMsg,
&time) != -1) {
if (pMsg->msg_type() ==
ACE_Message_Block::MB_STOP) {
pMsg->release();
msg_queue_->close();
ACE_Thread::exit(0);
} else if(ACE_Message_Block::MB_DATA)
{
ACE_InputCDR inputCDR(pMsg);
FileMessage message;
inputCDR>>message;
}
}
}
return 0;
}
void setArg0(string const& arg0)
{
this->arg0 = arg0;
}
string getArg0() const
{
return arg0;
}
private:
string
arg0;
};
int ACE_TMAIN(int, ACE_TCHAR *[]) {
ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%t) Main Thread running/n")));
MyThread thread;
string arg = "ok";
thread.setArg0(arg);
int result = thread.activate();
ACE_ASSERT(result == 0);
FileMessage message;
message.setFileMessageType(FileMessage::createdFile);
message.setFileName("chenshu");
message.setFolderPath("/home/chenshu");
ACE_OutputCDR
outputCDR(message.getSize()+ACE_CDR::MAX_ALIGNMENT);
outputCDR<<message;
ACE_Message_Block*
pMessage=const_cast<ACE_Message_Block*>(outputCDR.begin());
thread.putq(pMessage);
thread.wait();
return 0;
}
http://blog.csdn.net/wallwind/article/details/7220083
页:
[1]