找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 6218|回复: 8

关于ACE_Task内部消息队列问题

[复制链接]
发表于 2007-12-19 22:00:30 | 显示全部楼层 |阅读模式
ACE_Task对象中集成了消息队列,对于任务间通讯提供了很容易实现的框架,但我有一点不明白的地方,如下段生产者-消费者模型代码:
  1. class Producer : public ACE_Task<ACE_MT_SYNCH>
  2. {
  3. public:
  4.     int svc(void)
  5.     {
  6.       ACE_Message_Block *mb = 0;
  7.       // 初始化mb,此处省略
  8.       consumer_.putq(mb);
  9.       ACE_OS::sleep(1);  
  10.     }
  11. private:
  12.         Consumer *consumer_;
  13. }
  14. class Consumer : public ACE_Task<ACE_MT_SYNCH>
  15. {
  16. public:
  17.     int svc(void)
  18.     {
  19.       ACE_Message_Block *mb = 0;
  20.       getq(mb);
  21.   
  22.       ACE_OS::sleep(60);  // 处理消息,此处处理消息花费时间长
  23.       mb->release();
  24.     }
  25. }
复制代码
Producer产生一条数据后,立即发送给Consumer,但是对于Consumer来说,处理消息速度比Producer产生消息速度慢,所以Consumer需要将消息缓存在消息队列当中(意思是:Producer发送给Consumer 60条消息的时间里,Consumer 只能处理1条,剩下的59条需要缓存在队列里),但是不知为何在ACE_Task中消息无法缓存!不知是不是我的用法不正确,请坛子的各位DX赐教。
 楼主| 发表于 2007-12-19 22:00:39 | 显示全部楼层
消息队列默认的水位尺寸只有16k,你推入的消息有多大?
 楼主| 发表于 2007-12-19 22:00:45 | 显示全部楼层
这段测试的mb=“this is a test”,远远小与16k,现在的问题是Consumer队列中的消息为什么没有缓存?
 楼主| 发表于 2007-12-19 22:01:06 | 显示全部楼层
全部代码如下:
  1. #include "ace/OS.h"
  2. #include "ace/Task.h"
  3. #include "ace/Message_Block.h"
  4. //The Consumer Task.
  5. class Consumer : public ACE_Task<ACE_MT_SYNCH>
  6. {
  7. public:
  8. int open(void*)
  9. {
  10.   ACE_DEBUG((LM_DEBUG, "(%t) Producer task opened \n"));  
  11.   this->activate();
  12.   return 0;
  13. }
  14. //The Service Processing routine
  15. int svc(void)
  16. {
  17.   int ret = 0;
  18.   //Get ready to receive message from Producer
  19.   ACE_Message_Block *mb = 0;  
  20.   while (1)
  21.   {   
  22.    ret = this->getq(mb);
  23.    if (ret < 0)
  24.     ACE_DEBUG((LM_DEBUG, "(%t)error.\n"));
  25.    ProcessTask(mb);
  26.    mb->release();
  27.   }
  28.   return 0;
  29. }
  30. void ProcessTask(ACE_Message_Block *mb)
  31. {
  32.   char *pBuf = mb->rd_ptr();
  33.   ACE_DEBUG((LM_DEBUG, "(%t)Got message: '%s' from remote task\n",pBuf));
  34.   ACE_OS::sleep(10);
  35. }
  36. int close(u_long)
  37. {
  38.   ACE_DEBUG((LM_DEBUG,"Consumer closes down \n"));
  39.   return 0;
  40. }
  41. };
  42. class Producer : public ACE_Task<ACE_MT_SYNCH>
  43. {
  44. public:
  45. Producer(Consumer * consumer) : consumer_(consumer), count_(0)
  46. {  
  47. }
  48. int open(void*)
  49. {
  50.   ACE_DEBUG((LM_DEBUG, "(%t) Producer task opened \n"));
  51.   
  52.   this->activate();
  53.   return 0;
  54. }
  55. //The Service Processing routine
  56. int svc(void)
  57. {
  58.   while(count_ < 21)
  59.   {
  60.    char data[32] = {0};
  61.    ACE_OS::sprintf(data, "data form Producer, NO=%d.", data_);   
  62.    ACE_NEW_RETURN(mb_, ACE_Message_Block(32, ACE_Message_Block::MB_DATA, 0, data), -1);
  63.    mb_->wr_ptr(32);
  64.    //Send message to consumer
  65.    ACE_DEBUG((LM_DEBUG, "(%t)Sending message: %d to remote task\n", data_));
  66.    consumer_->putq(mb_);   
  67.    //Go to sleep for a sec.
  68.    ACE_OS::sleep(1);
  69.    count_++;
  70.   }
  71.   return 0;
  72. }
  73. int close(u_long)
  74. {  
  75.   ACE_DEBUG((LM_DEBUG, "Producer closes down \n"));  
  76.   return 0;
  77. }
  78. private:
  79. int   count_;
  80. Consumer *consumer_;
  81. ACE_Message_Block * mb_;
  82. };
  83. int main(int argc, char * argv[])
  84. {
  85. Consumer *consumer = new Consumer;
  86. Producer *producer = new Producer(consumer);
  87. consumer->open(0);
  88. producer->open(0);
  89. //Wait for all the tasks to exit.
  90. ACE_Thread_Manager::instance()->wait();
  91. if (consumer)
  92. {
  93.   delete consumer;
  94.   consumer = NULL;
  95. }
  96. if (producer)
  97. {
  98.   delete producer;
  99.   producer = NULL;
  100. }
  101. return 0;
  102. }
复制代码
 楼主| 发表于 2007-12-19 22:01:16 | 显示全部楼层
问题找到了,原来还是内存问题,发送到消息队列中的数据,应该事先动态分配内存空间

Producer中svc() char data[32] = {0},data为局部变量每次分配的内存空间都是相同的,所以运行例子看到Consumer除了第一次收到的正常数据外,其余的数据都是相同的,其实不是缓存区没有缓存,而是缓存的数据都是指向同一个地址空间的数据;若data该成动态分配,即:char *data = new char[32]则运行正常。

看来还是对ACE消息队列的原理理解不深啊,学习ing...
 楼主| 发表于 2007-12-19 22:01:32 | 显示全部楼层
但是不知为何在ACE_Task中消息无法缓存!????
How do u know?
Can you acctually run this code? it looks buggy.
e.g.
consumer_->putq(mb_->clone());//data is in stack.without copy you would end up with core dump.
 楼主| 发表于 2007-12-19 22:01:37 | 显示全部楼层
现在发现问题,不是“无法缓存”引起的。

PS: My en is so so poor, please use Chinese, thank u!
 楼主| 发表于 2007-12-19 22:01:44 | 显示全部楼层
可以认为几乎大部分C++的问题,都和内存管理相关。
发表于 2008-6-17 11:19:09 | 显示全部楼层
学习了
我也碰到这个问题
呵呵
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-12-23 19:09 , Processed in 0.027787 second(s), 5 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表