|
ACE Streams框架介绍 作者:菩提一、概述
ACE Streams框架是ACE提供的“流”概念解决方案,实现了Pipes and Filters(管道与过滤器)模式。对于由一组有序步骤组成的过程而言,这个框架是一种极好的建模方式。过程中的每个步骤都被实现成ACE_Task的派生类。当每个步骤完成时,数据会通过ACE_Task对象的消息队列,交给下一个步骤继续进行处理。如果数据能进行并行处理,则各个步骤可以是多线程的,从而增加吞吐量。
ACE Streams框架允许你把一组模块灵活地装配进一个流中,并且可以对其进行动态配置:信息将在这个流中移动。每个模块都有机会操纵流中的数据,并可以在把数据传给下一个模块之前对其进行修改、移动或增加内容。数据在流中双向流动,,流中的每个模块都有reader和writer任务――每个数据方向上有一个。
流的工作方式包括“顺流”(down-stream)和“溯流”(up-stream)两种。当顺流任务向下游移动时,你可以认为它们是在移出你的主应用。溯流任务则正好相反。
ACE Stream模式要求将流动于流中的数据装入ACE_Message_Block ,这样就能够使用ACE所设计的ACE_Message_Block和ACE_Message_Queue这样的高效设施。
二、工作方式
使用ACE_Stream处理流式事件重点使用三个类:ACE_Task的子类用于处理各个分离的业务,ACE_Module将这些处理方法包装成模块,置入ACE_Stream(或其子类)中。可以认为:ACE_Stream代表了整个地“流”,而ACE_Task则是流的各个段落, ACE_Module则用于做接合工作。因此,一个程序中会有一个ACE_Stream、多个ACE_Module以及与之等量的ACE_Task(或者它们的子类),而每个ACE_Task可以派生多个进程来处理它任内的事务,只要它需要而且愿意这么做。一看到多线程就免不了要考虑令人头痛的并发控制,但是不用担心,ACE的信号机制已经完全承接了这些痛苦,我们要做的,仅仅是选择一下用哪一种方式而已。
一般都使用如下的一些模板类
typedef ACE_Stream<ACE_MT_SYNCH> MT_Stream;
typedef ACE_Module<ACE_MT_SYNCH> MT_Module;
typedef ACE_Task<ACE_MT_SYNCH> MT_Task;
为了方便说明,下面统一用Stream来表述ACE_Stream或者是自定义的派生自它的类。与此类似,Module、Task也具有同样的意义。
工作流程如下:
1、 创建一系列的Task实例,开启多个线程等待处理事务。
2、 将这些Task“装”入Module
3、 将这些Module依序压入Stream
4、 把要处理的数据以ACE_Message_Block的形式置入队列。刚才派生出来的多线程会依次处理它们
5、 工作完成,移出各个模块,关闭各个线程。
三、示例
下面,我们通过一个示例来看一下ACE Streams的使用方法。这个示例选自《The ACE Programmer’s Guide》p312.为了简化,我把操作进行了改写。
示例的总体思路是:把一个已经接受了连接的ACE_SOCK_Stream传给流,流中的模块依次接收数据、解析数据、获取连接对端方的地址名、进行应答,并把解析好的数据返还给上层。我们把前面一部分的任务配置成顺流任务,而把后面的返还过程配置成溯流任务,由此实现一个完整的双向流。
示例中,CommandStream维护整个的流,这个流由四个Module构成:RecordMessageModule, PlayMessageModule, RetrieveCallerIDModule, AnswerCallModule,每个Module维护两个Task:一个顺流方向的writer端Task(*DownstreamTask),和一个溯流方向的reader端Task(*UpstreamTask).
四、实现
Command
这个类是我们返还给上层的数据结构的载体。之所以将它设计成一个类,是为了利用ACE_Message_Block的自动析构功能。在更健壮地应用中,我们可以进一步创建它的派生对象,而非像这里这样:将成员变量设置成为public属性
class Command : public ACE_Data_Block
{
public:
enum {
PASS = 1,
SUCCESS = 0,
FAILURE = -1
};
enum {
UNKNOWN = -1,
ANSWER_CALL = 10,
RETRIEVE_CALLER_ID = 8,
PLAY_MESSAGE = 4,
RECORD_MESSAGE = 2
}commands;
int flags_;
int command_;
char *extra_data_;
int numeric_result_;
char name[MAXHOSTNAMELEN];
char cmd[16];
char param[16];
Command()
{
extra_data_ = NULL;
memset(name,0,MAXHOSTNAMELEN);
memset(cmd,0,16);
memset(param,0,16);
}
};
CommandTask族
CommandTask派生自ACE_Task<ACE_MT_SYNCH>,构成示例中全部Task类的基类。用于实现各个任务的默认处理方式。
声明
class CommandTask : public ACE_Task<ACE_MT_SYNCH>
{
public:
typedef ACE_Task<ACE_MT_SYNCH> inherited;
virtual int open(void * = 0);
int put(ACE_Message_Block *message,ACE_Time_Value *timeout);
virtual int svc(void);
virtual int close(u_long flags);
protected:
int command_;
CommandTask(int command);
virtual int process(Command *message);
};
工作方式:可以实现为两种工作方式:
如果本任务接收到数据之后直接处理数据,只需在int put()里面添加代码即可,因为这个函数将会在传递数据给本任务时被调用。
如果我们需要使用主动对象模式的子线程来处理数据,则可以在int open()方法里面开启多线程,子线程将运行int svc()里面的代码。int put()方法将数据交给已经被ACE置入的ACE_Message_Queue,之后各个子线程即可从其中取中数据进行处理。在关闭时,要注意协调与各个子线程的退出。
实现:我们演示后一种工作方式
CommandTask::CommandTask(int command)
:inherited(),command_(command)
{
}
int CommandTask::open(void *)
{
return this->activate();
}
int CommandTask::put(ACE_Message_Block *message, ACE_Time_Value *timeout)
{
return this->putq(message,timeout);
}
int CommandTask::svc()
{
ACE_Message_Block *message;
for(;;){
if( -1 == this->getq(message) )
return -1;
if(message->msg_type() == ACE_Message_Block::MB_HANGUP ){
ACE_DEBUG((LM_DEBUG,ACE_TEXT("(%P|%t)%s thread close\n"),this->module()->name()));
this->putq(message->duplicate());
message->release();
return 0;
}
Command *command = (Command*)message->data_block();
if( command->command_ != this->command_ )
this->put_next(message->duplicate());
else {
int result = this->process(command);
//处理失败?丢弃
if( result == Command::FAILURE )
command->numeric_result_ = -1;
//处理后需要下一个模块任务处理?
else if( result == Command::PASS){
this->put_next(message->duplicate());
}
//处理完成?
else {
//在顺流方向?转向溯流方向
if(this->is_writer())
this->sibling()->putq(message->duplicate());
else
this->put_next(message->duplicate());
}
}
message->release();
}
return 0;
}
int CommandTask::close(u_long flags) |
|