找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 6696|回复: 2

ACE Streams框架介绍

[复制链接]
发表于 2008-9-10 23:04:02 | 显示全部楼层 |阅读模式
   ACE Streams框架介绍 作者:菩提一、概述                     

ACE Streams框架是ACE提供的“流”概念解决方案,实现了Pipes and Filters(管道与过滤器)模式。对于由一组有序步骤组成的过程而言,这个框架是一种极好的建模方式。过程中的每个步骤都被实现成ACE_Task的派生类。当每个步骤完成时,数据会通过ACE_Task对象的消息队列,交给下一个步骤继续进行处理。如果数据能进行并行处理,则各个步骤可以是多线程的,从而增加吞吐量。
ACE Streams框架允许你把一组模块灵活地装配进一个流中,并且可以对其进行动态配置:信息将在这个流中移动。每个模块都有机会操纵流中的数据,并可以在把数据传给下一个模块之前对其进行修改、移动或增加内容。数据在流中双向流动,,流中的每个模块都有readerwriter任务――每个数据方向上有一个。
流的工作方式包括“顺流”(down-stream)和“溯流”(up-stream)两种。当顺流任务向下游移动时,你可以认为它们是在移出你的主应用。溯流任务则正好相反。
ACE Stream模式要求将流动于流中的数据装入ACE_Message_Block ,这样就能够使用ACE所设计的ACE_Message_BlockACE_Message_Queue这样的高效设施。

二、工作方式
使用ACE_Stream处理流式事件重点使用三个类:ACE_Task的子类用于处理各个分离的业务,ACE_Module将这些处理方法包装成模块,置入ACE_Stream(或其子类)中。可以认为:ACE_Stream代表了整个地“流”,而ACE_Task则是流的各个段落, ACE_Module则用于做接合工作。因此,一个程序中会有一个ACE_Stream、多个ACE_Module以及与之等量的ACE_Task(或者它们的子类),而每个ACE_Task可以派生多个进程来处理它任内的事务,只要它需要而且愿意这么做。一看到多线程就免不了要考虑令人头痛的并发控制,但是不用担心,ACE的信号机制已经完全承接了这些痛苦,我们要做的,仅仅是选择一下用哪一种方式而已。
一般都使用如下的一些模板类
typedef ACE_Stream<ACE_MT_SYNCH> MT_Stream;
typedef ACE_Module<ACE_MT_SYNCH> MT_Module;
typedef ACE_Task<ACE_MT_SYNCH> MT_Task;
为了方便说明,下面统一用Stream来表述ACE_Stream或者是自定义的派生自它的类。与此类似,ModuleTask也具有同样的意义。
工作流程如下:
1、  创建一系列的Task实例,开启多个线程等待处理事务。
2、  将这些Task“装”入Module
3、  将这些Module依序压入Stream
4、  把要处理的数据以ACE_Message_Block的形式置入队列。刚才派生出来的多线程会依次处理它们
5、  工作完成,移出各个模块,关闭各个线程。


三、示例
下面,我们通过一个示例来看一下ACE Streams的使用方法。这个示例选自《The ACE Programmer’s Guidep312.为了简化,我把操作进行了改写。
示例的总体思路是:把一个已经接受了连接的ACE_SOCK_Stream传给流,流中的模块依次接收数据、解析数据、获取连接对端方的地址名、进行应答,并把解析好的数据返还给上层。我们把前面一部分的任务配置成顺流任务,而把后面的返还过程配置成溯流任务,由此实现一个完整的双向流。

示例中,CommandStream维护整个的流,这个流由四个Module构成:RecordMessageModule, PlayMessageModule, RetrieveCallerIDModule, AnswerCallModule,每个Module维护两个Task:一个顺流方向的writerTask*DownstreamTask,和一个溯流方向的readerTask(*UpstreamTask).
四、实现
Command
这个类是我们返还给上层的数据结构的载体。之所以将它设计成一个类,是为了利用ACE_Message_Block的自动析构功能。在更健壮地应用中,我们可以进一步创建它的派生对象,而非像这里这样:将成员变量设置成为public属性
class Command : public ACE_Data_Block  
{
public:
       enum {
              PASS      = 1,
                     SUCCESS      = 0,
                     FAILURE = -1
       };

       enum {
              UNKNOWN                        = -1,
              ANSWER_CALL                  = 10,
              RETRIEVE_CALLER_ID      = 8,
              PLAY_MESSAGE             = 4,
              RECORD_MESSAGE           = 2
       }commands;

       int flags_;
       int command_;

       char *extra_data_;

       int numeric_result_;
       char name[MAXHOSTNAMELEN];
       char cmd[16];
       char param[16];

       Command()
       {
              extra_data_ = NULL;
              memset(name,0,MAXHOSTNAMELEN);
              memset(cmd,0,16);
              memset(param,0,16);
       }

};
CommandTask
CommandTask派生自ACE_Task<ACE_MT_SYNCH>,构成示例中全部Task类的基类。用于实现各个任务的默认处理方式。
声明
class CommandTask : public ACE_Task<ACE_MT_SYNCH>  
{
public:
       typedef ACE_Task<ACE_MT_SYNCH> inherited;

       virtual int open(void * = 0);
       int put(ACE_Message_Block *message,ACE_Time_Value *timeout);
       virtual int svc(void);
       virtual int close(u_long flags);

protected:
       int command_;
       CommandTask(int command);
       virtual int process(Command *message);
};
工作方式:可以实现为两种工作方式:
如果本任务接收到数据之后直接处理数据,只需在int put()里面添加代码即可,因为这个函数将会在传递数据给本任务时被调用。
如果我们需要使用主动对象模式的子线程来处理数据,则可以在int open()方法里面开启多线程,子线程将运行int svc()里面的代码。int put()方法将数据交给已经被ACE置入的ACE_Message_Queue,之后各个子线程即可从其中取中数据进行处理。在关闭时,要注意协调与各个子线程的退出。
实现:我们演示后一种工作方式
CommandTask::CommandTask(int command)
:inherited(),command_(command)
{
      
}
int CommandTask::open(void *)
{
       return this->activate();
}
int CommandTask::put(ACE_Message_Block *message, ACE_Time_Value *timeout)
{
       return this->putq(message,timeout);
      
}
int CommandTask::svc()
{
       ACE_Message_Block *message;
      
       for(;;){
              if( -1 == this->getq(message) )
                     return -1;
              
              if(message->msg_type() == ACE_Message_Block::MB_HANGUP ){
                     ACE_DEBUG((LM_DEBUG,ACE_TEXT("(%P|%t)%s thread close\n"),this->module()->name()));

                     this->putq(message->duplicate());
                     message->release();
                     return 0;
              }
              
              Command *command = (Command*)message->data_block();
              if( command->command_ != this->command_ )
                     this->put_next(message->duplicate());
              else {
                     int result = this->process(command);
                     
          //处理失败?丢弃
                     if( result == Command::FAILURE )
                            command->numeric_result_ = -1;
           //处理后需要下一个模块任务处理?
                     else if( result == Command::PASS){
                            this->put_next(message->duplicate());
                     }
          //处理完成?
                     else {
              //在顺流方向?转向溯流方向
                            if(this->is_writer())
                                  this->sibling()->putq(message->duplicate());
                            else
                                   this->put_next(message->duplicate());
                     }
              }
              message->release();
       }
       return 0;
}
int CommandTask::close(u_long flags)
发表于 2008-9-11 12:23:04 | 显示全部楼层
请问,这个框架应用广泛吗?使用时都该注意些什么问题啊?
发表于 2008-9-11 14:55:19 | 显示全部楼层
看书,书上介绍的很细致。
主要适合于协议分层的场合。
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-4-30 11:28 , Processed in 0.014664 second(s), 6 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表