peakzhang 发表于 2007-12-19 22:00:30

关于ACE_Task内部消息队列问题

ACE_Task对象中集成了消息队列,对于任务间通讯提供了很容易实现的框架,但我有一点不明白的地方,如下段生产者-消费者模型代码:

class Producer : public ACE_Task<ACE_MT_SYNCH>
{
public:
    int svc(void)
    {
      ACE_Message_Block *mb = 0;
      // 初始化mb,此处省略
      consumer_.putq(mb);
      ACE_OS::sleep(1);
    }
private:
      Consumer *consumer_;
}

class Consumer : public ACE_Task<ACE_MT_SYNCH>
{
public:
    int svc(void)
    {
      ACE_Message_Block *mb = 0;
      getq(mb);

      ACE_OS::sleep(60);// 处理消息,此处处理消息花费时间长
      mb->release();
    }
}

Producer产生一条数据后,立即发送给Consumer,但是对于Consumer来说,处理消息速度比Producer产生消息速度慢,所以Consumer需要将消息缓存在消息队列当中(意思是:Producer发送给Consumer 60条消息的时间里,Consumer 只能处理1条,剩下的59条需要缓存在队列里),但是不知为何在ACE_Task中消息无法缓存!不知是不是我的用法不正确,请坛子的各位DX赐教。

peakzhang 发表于 2007-12-19 22:00:39

消息队列默认的水位尺寸只有16k,你推入的消息有多大?

peakzhang 发表于 2007-12-19 22:00:45

这段测试的mb=“this is a test”,远远小与16k,现在的问题是Consumer队列中的消息为什么没有缓存?

peakzhang 发表于 2007-12-19 22:01:06

全部代码如下:

#include "ace/OS.h"
#include "ace/Task.h"
#include "ace/Message_Block.h"

//The Consumer Task.
class Consumer : public ACE_Task<ACE_MT_SYNCH>
{
public:
int open(void*)
{
ACE_DEBUG((LM_DEBUG, "(%t) Producer task opened \n"));
this->activate();

return 0;
}

//The Service Processing routine
int svc(void)
{
int ret = 0;

//Get ready to receive message from Producer
ACE_Message_Block *mb = 0;
while (1)
{   
   ret = this->getq(mb);
   if (ret < 0)
    ACE_DEBUG((LM_DEBUG, "(%t)error.\n"));

   ProcessTask(mb);

   mb->release();
}

return 0;
}

void ProcessTask(ACE_Message_Block *mb)
{
char *pBuf = mb->rd_ptr();
ACE_DEBUG((LM_DEBUG, "(%t)Got message: '%s' from remote task\n",pBuf));

ACE_OS::sleep(10);
}

int close(u_long)
{
ACE_DEBUG((LM_DEBUG,"Consumer closes down \n"));
return 0;
}
};

class Producer : public ACE_Task<ACE_MT_SYNCH>
{
public:
Producer(Consumer * consumer) : consumer_(consumer), count_(0)
{
}

int open(void*)
{
ACE_DEBUG((LM_DEBUG, "(%t) Producer task opened \n"));

this->activate();
return 0;
}

//The Service Processing routine
int svc(void)
{
while(count_ < 21)
{
   char data = {0};
   ACE_OS::sprintf(data, "data form Producer, NO=%d.", data_);   

   ACE_NEW_RETURN(mb_, ACE_Message_Block(32, ACE_Message_Block::MB_DATA, 0, data), -1);
   mb_->wr_ptr(32);

   //Send message to consumer
   ACE_DEBUG((LM_DEBUG, "(%t)Sending message: %d to remote task\n", data_));
   consumer_->putq(mb_);   

   //Go to sleep for a sec.
   ACE_OS::sleep(1);
   count_++;
}

return 0;
}

int close(u_long)
{
ACE_DEBUG((LM_DEBUG, "Producer closes down \n"));
return 0;
}

private:
int   count_;
Consumer *consumer_;
ACE_Message_Block * mb_;
};


int main(int argc, char * argv[])
{
Consumer *consumer = new Consumer;
Producer *producer = new Producer(consumer);

consumer->open(0);
producer->open(0);

//Wait for all the tasks to exit.
ACE_Thread_Manager::instance()->wait();

if (consumer)
{
delete consumer;
consumer = NULL;
}

if (producer)
{
delete producer;
producer = NULL;
}

return 0;
}

peakzhang 发表于 2007-12-19 22:01:16

问题找到了,原来还是内存问题,发送到消息队列中的数据,应该事先动态分配内存空间

Producer中svc() char data = {0},data为局部变量每次分配的内存空间都是相同的,所以运行例子看到Consumer除了第一次收到的正常数据外,其余的数据都是相同的,其实不是缓存区没有缓存,而是缓存的数据都是指向同一个地址空间的数据;若data该成动态分配,即:char *data = new char则运行正常。

看来还是对ACE消息队列的原理理解不深啊,学习ing...

peakzhang 发表于 2007-12-19 22:01:32

但是不知为何在ACE_Task中消息无法缓存!????
How do u know?
Can you acctually run this code? it looks buggy.
e.g.
consumer_->putq(mb_->clone());//data is in stack.without copy you would end up with core dump.

peakzhang 发表于 2007-12-19 22:01:37

现在发现问题,不是“无法缓存”引起的。

PS: My en is so so poor, please use Chinese, thank u!

peakzhang 发表于 2007-12-19 22:01:44

可以认为几乎大部分C++的问题,都和内存管理相关。

renxianfu 发表于 2008-6-17 11:19:09

学习了
我也碰到这个问题
呵呵
页: [1]
查看完整版本: 关于ACE_Task内部消息队列问题