• 2006-04-26

    ACE Streams框架介绍

    版权声明:转载时请以超链接形式标明文章原始出处和作者信息及本声明
    http://wolfkiller.blogbus.com/logs/2360081.html

    一、概述                     

     

    ACE Streams框架是ACE提供的“流”概念解决方案,实现了Pipes and Filters(管道与过滤器)模式。对于由一组有序步骤组成的过程而言,这个框架是一种极好的建模方式。过程中的每个步骤都被实现成ACE_Task的派生类。当每个步骤完成时,数据会通过ACE_Task对象的消息队列,交给下一个步骤继续进行处理。如果数据能进行并行处理,则各个步骤可以是多线程的,从而增加吞吐量。

    ACE Streams框架允许你把一组模块灵活地装配进一个流中,并且可以对其进行动态配置:信息将在这个流中移动。每个模块都有机会操纵流中的数据,并可以在把数据传给下一个模块之前对其进行修改、移动或增加内容。数据在流中双向流动,,流中的每个模块都有readerwriter任务――每个数据方向上有一个。

    流的工作方式包括“顺流”(down-stream)和“溯流”(up-stream)两种。当顺流任务向下游移动时,你可以认为它们是在移出你的主应用。溯流任务则正好相反。

    ACE Stream模式要求将流动于流中的数据装入ACE_Message_Block ,这样就能够使用ACE所设计的ACE_Message_BlockACE_Message_Queue这样的高效设施。

     

    二、工作方式

    使用ACE_Stream处理流式事件重点使用三个类:ACE_Task的子类用于处理各个分离的业务,ACE_Module将这些处理方法包装成模块,置入ACE_Stream(或其子类)中。可以认为:ACE_Stream代表了整个地“流”,而ACE_Task则是流的各个段落, ACE_Module则用于做接合工作。因此,一个程序中会有一个ACE_Stream、多个ACE_Module以及与之等量的ACE_Task(或者它们的子类),而每个ACE_Task可以派生多个进程来处理它任内的事务,只要它需要而且愿意这么做。一看到多线程就免不了要考虑令人头痛的并发控制,但是不用担心,ACE的信号机制已经完全承接了这些痛苦,我们要做的,仅仅是选择一下用哪一种方式而已。

    一般都使用如下的一些模板类

    typedef ACE_Stream<ACE_MT_SYNCH> MT_Stream;

    typedef ACE_Module<ACE_MT_SYNCH> MT_Module;

    typedef ACE_Task<ACE_MT_SYNCH> MT_Task;

    为了方便说明,下面统一用Stream来表述ACE_Stream或者是自定义的派生自它的类。与此类似,ModuleTask也具有同样的意义。

    工作流程如下:

    1、  创建一系列的Task实例,开启多个线程等待处理事务。

    2、  将这些Task“装”入Module

    3、  将这些Module依序压入Stream

    4、  把要处理的数据以ACE_Message_Block的形式置入队列。刚才派生出来的多线程会依次处理它们

    5、  工作完成,移出各个模块,关闭各个线程。

     

     

    三、示例

    下面,我们通过一个示例来看一下ACE Streams的使用方法。这个示例选自《The ACE Programmer’s Guidep312.为了简化,我把操作进行了改写。

    示例的总体思路是:把一个已经接受了连接的ACE_SOCK_Stream传给流,流中的模块依次接收数据、解析数据、获取连接对端方的地址名、进行应答,并把解析好的数据返还给上层。我们把前面一部分的任务配置成顺流任务,而把后面的返还过程配置成溯流任务,由此实现一个完整的双向流。

    示例中,CommandStream维护整个的流,这个流由四个Module构成:RecordMessageModule, PlayMessageModule, RetrieveCallerIDModule, AnswerCallModule,每个Module维护两个Task:一个顺流方向的writerTask*DownstreamTask,和一个溯流方向的readerTask(*UpstreamTask).

    四、实现

    Command

    这个类是我们返还给上层的数据结构的载体。之所以将它设计成一个类,是为了利用ACE_Message_Block的自动析构功能。在更健壮地应用中,我们可以进一步创建它的派生对象,而非像这里这样:将成员变量设置成为public属性

    class Command : public ACE_Data_Block 

    {

    public:

           enum {

                  PASS      = 1,

                         SUCCESS      = 0,

                         FAILURE = -1

           };

     

           enum {

                  UNKNOWN                        = -1,

                  ANSWER_CALL                  = 10,

                  RETRIEVE_CALLER_ID      = 8,

                  PLAY_MESSAGE             = 4,

                  RECORD_MESSAGE           = 2

           }commands;

     

           int flags_;

           int command_;

     

           char *extra_data_;

     

           int numeric_result_;

           char name[MAXHOSTNAMELEN];

           char cmd[16];

           char param[16];

     

           Command()

           {

                  extra_data_ = NULL;

                  memset(name,0,MAXHOSTNAMELEN);

                  memset(cmd,0,16);

                  memset(param,0,16);

           }

     

    };

    CommandTask

    CommandTask派生自ACE_Task<ACE_MT_SYNCH>,构成示例中全部Task类的基类。用于实现各个任务的默认处理方式。

    声明

    class CommandTask : public ACE_Task<ACE_MT_SYNCH> 

    {

    public:

           typedef ACE_Task<ACE_MT_SYNCH> inherited;

     

           virtual int open(void * = 0);

           int put(ACE_Message_Block *message,ACE_Time_Value *timeout);

           virtual int svc(void);

           virtual int close(u_long flags);

     

    protected:

           int command_;

           CommandTask(int command);

           virtual int process(Command *message);

    };

    工作方式:可以实现为两种工作方式:

    如果本任务接收到数据之后直接处理数据,只需在int put()里面添加代码即可,因为这个函数将会在传递数据给本任务时被调用。

    如果我们需要使用主动对象模式的子线程来处理数据,则可以在int open()方法里面开启多线程,子线程将运行int svc()里面的代码。int put()方法将数据交给已经被ACE置入的ACE_Message_Queue,之后各个子线程即可从其中取中数据进行处理。在关闭时,要注意协调与各个子线程的退出。

    实现:我们演示后一种工作方式

    CommandTask::CommandTask(int command)

    :inherited(),command_(command)

    {

          

    }

    int CommandTask::open(void *)

    {

           return this->activate();

    }

    int CommandTask::put(ACE_Message_Block *message, ACE_Time_Value *timeout)

    {

           return this->putq(message,timeout);

          

    }

    int CommandTask::svc()

    {

           ACE_Message_Block *message;

          

           for(;;){

                  if( -1 == this->getq(message) )

                         return -1;

                 

                  if(message->msg_type() == ACE_Message_Block::MB_HANGUP ){

                         ACE_DEBUG((LM_DEBUG,ACE_TEXT("(%P|%t)%s thread close\n"),this->module()->name()));

     

                         this->putq(message->duplicate());

                         message->release();

                         return 0;

                  }

                 

                  Command *command = (Command*)message->data_block();

                  if( command->command_ != this->command_ )

                         this->put_next(message->duplicate());

                  else {

                         int result = this->process(command);

                        

              //处理失败?丢弃

                         if( result == Command::FAILURE )

                                command->numeric_result_ = -1;

               //处理后需要下一个模块任务处理?

                         else if( result == Command::PASS){

                                this->put_next(message->duplicate());

                         }

              //处理完成?

                         else {

                  //在顺流方向?转向溯流方向

                                if(this->is_writer())

                                      this->sibling()->putq(message->duplicate());

                                else

                                       this->put_next(message->duplicate());

                         }

                  }

                  message->release();

           }

           return 0;

    }

    int CommandTask::close(u_long flags)


    收藏到:Del.icio.us




    引用地址: