关于ACE_Task内部消息队列问题
ACE_Task对象中集成了消息队列,对于任务间通讯提供了很容易实现的框架,但我有一点不明白的地方,如下段生产者-消费者模型代码:class Producer : public ACE_Task<ACE_MT_SYNCH>
{
public:
int svc(void)
{
ACE_Message_Block *mb = 0;
// 初始化mb,此处省略
consumer_.putq(mb);
ACE_OS::sleep(1);
}
private:
Consumer *consumer_;
}
class Consumer : public ACE_Task<ACE_MT_SYNCH>
{
public:
int svc(void)
{
ACE_Message_Block *mb = 0;
getq(mb);
ACE_OS::sleep(60);// 处理消息,此处处理消息花费时间长
mb->release();
}
}
Producer产生一条数据后,立即发送给Consumer,但是对于Consumer来说,处理消息速度比Producer产生消息速度慢,所以Consumer需要将消息缓存在消息队列当中(意思是:Producer发送给Consumer 60条消息的时间里,Consumer 只能处理1条,剩下的59条需要缓存在队列里),但是不知为何在ACE_Task中消息无法缓存!不知是不是我的用法不正确,请坛子的各位DX赐教。 消息队列默认的水位尺寸只有16k,你推入的消息有多大? 这段测试的mb=“this is a test”,远远小与16k,现在的问题是Consumer队列中的消息为什么没有缓存? 全部代码如下:
#include "ace/OS.h"
#include "ace/Task.h"
#include "ace/Message_Block.h"
//The Consumer Task.
class Consumer : public ACE_Task<ACE_MT_SYNCH>
{
public:
int open(void*)
{
ACE_DEBUG((LM_DEBUG, "(%t) Producer task opened \n"));
this->activate();
return 0;
}
//The Service Processing routine
int svc(void)
{
int ret = 0;
//Get ready to receive message from Producer
ACE_Message_Block *mb = 0;
while (1)
{
ret = this->getq(mb);
if (ret < 0)
ACE_DEBUG((LM_DEBUG, "(%t)error.\n"));
ProcessTask(mb);
mb->release();
}
return 0;
}
void ProcessTask(ACE_Message_Block *mb)
{
char *pBuf = mb->rd_ptr();
ACE_DEBUG((LM_DEBUG, "(%t)Got message: '%s' from remote task\n",pBuf));
ACE_OS::sleep(10);
}
int close(u_long)
{
ACE_DEBUG((LM_DEBUG,"Consumer closes down \n"));
return 0;
}
};
class Producer : public ACE_Task<ACE_MT_SYNCH>
{
public:
Producer(Consumer * consumer) : consumer_(consumer), count_(0)
{
}
int open(void*)
{
ACE_DEBUG((LM_DEBUG, "(%t) Producer task opened \n"));
this->activate();
return 0;
}
//The Service Processing routine
int svc(void)
{
while(count_ < 21)
{
char data = {0};
ACE_OS::sprintf(data, "data form Producer, NO=%d.", data_);
ACE_NEW_RETURN(mb_, ACE_Message_Block(32, ACE_Message_Block::MB_DATA, 0, data), -1);
mb_->wr_ptr(32);
//Send message to consumer
ACE_DEBUG((LM_DEBUG, "(%t)Sending message: %d to remote task\n", data_));
consumer_->putq(mb_);
//Go to sleep for a sec.
ACE_OS::sleep(1);
count_++;
}
return 0;
}
int close(u_long)
{
ACE_DEBUG((LM_DEBUG, "Producer closes down \n"));
return 0;
}
private:
int count_;
Consumer *consumer_;
ACE_Message_Block * mb_;
};
int main(int argc, char * argv[])
{
Consumer *consumer = new Consumer;
Producer *producer = new Producer(consumer);
consumer->open(0);
producer->open(0);
//Wait for all the tasks to exit.
ACE_Thread_Manager::instance()->wait();
if (consumer)
{
delete consumer;
consumer = NULL;
}
if (producer)
{
delete producer;
producer = NULL;
}
return 0;
}
问题找到了,原来还是内存问题,发送到消息队列中的数据,应该事先动态分配内存空间
Producer中svc() char data = {0},data为局部变量每次分配的内存空间都是相同的,所以运行例子看到Consumer除了第一次收到的正常数据外,其余的数据都是相同的,其实不是缓存区没有缓存,而是缓存的数据都是指向同一个地址空间的数据;若data该成动态分配,即:char *data = new char则运行正常。
看来还是对ACE消息队列的原理理解不深啊,学习ing... 但是不知为何在ACE_Task中消息无法缓存!????
How do u know?
Can you acctually run this code? it looks buggy.
e.g.
consumer_->putq(mb_->clone());//data is in stack.without copy you would end up with core dump. 现在发现问题,不是“无法缓存”引起的。
PS: My en is so so poor, please use Chinese, thank u! 可以认为几乎大部分C++的问题,都和内存管理相关。 学习了
我也碰到这个问题
呵呵
页:
[1]