mallon 发表于 2008-6-17 16:07:41

又一个关于ACE_TASK内部消息队列的问题,呵呵

说“又一个”是因为发帖之前恰好看到下面有个类似的帖子,不过打开看了以后发现和我碰到的问题还是有区别的,呵呵


问题是这样子的,我学习了《中篇:ACE程序员教程》中的ACE_TASK那个生产者消费者的例子,按照书上的例子原封不动运行没有任何问题,但是我突然想试验一下如果生产者发送消息比消费者接收消息的速度快,会有什么现象,于是我修改了一下代码,在消费者每接受处理完一条消息后睡2秒(生产者是睡1秒),结果生产者只能发送4条消息而消费者只能接受1条,后面就一直阻塞了。不管我如何调整时间,只要生产者速度快于消费者,都有这个现象。

在这个例子中消息队列的消息指针并没有像http://www.acejoy.com/bbs/viewthread.php?tid=173中描述的那样分配内存,而是直接把指针当成一个整数使用,所以应该没有内存分配的问题吧?

还请老鸟指教一二。

mallon 发表于 2008-6-17 16:09:05

这是代码,红色是我修改的部分

#define ACE_AS_STATIC_LIBS
#include <ace/ACE.h>
#include <ace/OS.h>
#include <ace/Task.h>

//The Consumer Task.
class Consumer: public ACE_Task<ACE_MT_SYNCH>
{
public:
    int open(void*)
    {
      ACE_DEBUG((LM_DEBUG, "(%t) Producer task opened \n"));

      //Activate the Task
      activate(THR_NEW_LWP, 1);

      return 0;
    }

    //The Service Processing routine
    int svc(void)
    {
      //Get ready to receive message from Producer
      ACE_Message_Block * mb =0;
      do
      {
            mb=0;

            //Get message from underlying queue
            getq(mb);
            ACE_DEBUG((LM_DEBUG,
                     "(%t)Got message: %d from remote task\n",*mb->rd_ptr()));
          ACE_OS::sleep(2);
      }
      while (*mb->rd_ptr()<10);

      return 0;
    }

    int close(u_long)
    {
      ACE_DEBUG((LM_DEBUG,"Consumer closes down \n"));
      return 0;
    }
};

class Producer: public ACE_Task<ACE_MT_SYNCH>
{
public:
    Producer(Consumer * consumer): consumer_(consumer), data_(0)
    {
      mb_=new ACE_Message_Block((char*)&data_,sizeof(data_));
    }

    int open(void*)
    {
      ACE_DEBUG((LM_DEBUG, "(%t) Producer task opened \n"));

      //Activate the Task
      activate(THR_NEW_LWP,1);
      return 0;
    }

    //The Service Processing routine
    int svc(void)
    {
      while (data_<11)
      {
            //Send message to consumer
            ACE_DEBUG((LM_DEBUG,
                     "(%t)Sending message: %d to remote task\n",data_));
            consumer_->putq(mb_);

            //Go to sleep for a sec.
            ACE_OS::sleep(1);
            data_++;
      }

      return 0;
    }

    int close(u_long)
    {
      ACE_DEBUG((LM_DEBUG,"Producer closes down \n"));
      return 0;
    }

private:
    char data_;
    Consumer * consumer_;
    ACE_Message_Block * mb_;
};

int ACE_TMAIN (int, ACE_TCHAR *[])
{
    // 初始化
    ACE::init();

    Consumer *consumer = new Consumer;
    Producer *producer = new Producer(consumer);

    producer->open(0);
    consumer->open(0);

    //Wait for all the tasks to exit.
    ACE_Thread_Manager::instance()->wait();

    // 终止
    ACE::fini();
    return 0;
}

winston 发表于 2008-6-17 21:05:33

需要跟踪一下,初步判断可能和队列的水位设置有关系。

mallon 发表于 2008-6-18 10:40:50

显示查了一下,高水位是16XXX,应该足够了啊?郁闷,不解。。。

jonathanliu2004 发表于 2008-6-18 10:47:37

可以不要消费者,然后不听向里面送msg,看看能有多少。然后跟踪一下,看看错误处在哪里。

jonathanliu2004 发表于 2008-6-18 11:06:58

代码错误。

生产者不应该总是使用一个msg,否则他放到联表上后,如果没有消费掉,那么它的next指针就没有空的情况。这样造成队列遍例进入死循环中。

也就是这里:
while (seq_tail->next () != 0)
    {
      seq_tail->next ()->prev (seq_tail);
      seq_tail = seq_tail->next ();
      ++this->cur_count_;
      seq_tail->total_size_and_length (this->cur_bytes_,
                                       this->cur_length_);
    }

如此改写就不会有问题:
while (data_<11)
      {
            mb_=new ACE_Message_Block((char*)&data_,sizeof(data_));

            //Send message to consumer
            ACE_DEBUG((LM_DEBUG,
                     "(%t)Sending message: %d to remote task\n",data_));
            consumer_->putq(mb_);

            //Go to sleep for a sec.
            ACE_OS::sleep(1);
            data_++;
      }

mallon 发表于 2008-6-18 11:34:57

谢谢!我试试看:) :)

mallon 发表于 2008-6-18 16:11:23

原帖由 jonathanliu2004 于 2008-6-18 11:06 发表 http://www.acejoy.com/bbs/images/common/back.gif
代码错误。

生产者不应该总是使用一个msg,否则他放到联表上后,如果没有消费掉,那么它的next指针就没有空的情况。这样造成队列遍例进入死循环中。

也就是这里:
while (seq_tail->next () != 0)
    {
      seq_tail- ...

试了一下,这样修改还是有问题,消息块并没有真正把data_复制进去,而仅仅保存了指针,所以消费者不能把从0到10的所有制都读出来,看了N久资料,终于改好了!

#define ACE_AS_STATIC_LIBS
#include <ace/ACE.h>
#include <ace/OS.h>
#include <ace/Task.h>

class Consumer: public ACE_Task<ACE_MT_SYNCH>
{
public:
    int open(void*)
    {
      ACE_DEBUG((LM_DEBUG, "(%t) Producer task opened \n"));

      activate(THR_NEW_LWP, 1);

      return 0;
    }

    int svc(void)
    {
      ACE_Message_Block *mb;
      do
      {
            mb = 0;

            getq(mb);
            ACE_DEBUG((LM_DEBUG,
                     "(%t)Got message: %d\n", *mb->rd_ptr()));
            ACE_OS::sleep(1);
      }
      while (*mb->rd_ptr() < 9);

      return 0;
    }

    int close(u_long)
    {
      ACE_DEBUG((LM_DEBUG,"Consumer closes down \n"));
      return 0;
    }
};

class Producer: public ACE_Task<ACE_MT_SYNCH>
{
public:
    Producer(Consumer * consumer): consumer_(consumer)
    {
    }

    int open(void*)
    {
      ACE_DEBUG((LM_DEBUG, "(%t) Producer task opened \n"));

      activate(THR_NEW_LWP,1);
      return 0;
    }

    int svc(void)
    {
      for (char data_ = 0; data_<10; data_++)
      {
            // 这样子构造的消息块才能被自动释放!
            ACE_Message_Block *mb_ = new ACE_Message_Block(sizeof(data_));
            mb_->copy(&data_, sizeof(data_));

            ACE_DEBUG((LM_DEBUG,
                     "(%t)Sending message: %d\n",data_));
            consumer_->putq(mb_);
      }

      return 0;
    }

    int close(u_long)
    {
      ACE_DEBUG((LM_DEBUG,"Producer closes down \n"));
      return 0;
    }

private:
    Consumer * consumer_;
};

int ACE_TMAIN (int, ACE_TCHAR *[])
{
    ACE::init();

    Consumer *consumer = new Consumer;
    Producer *producer = new Producer(consumer);

    producer->open(0);
    consumer->open(0);

    ACE_Thread_Manager::instance()->wait();

    ACE::fini();
    return 0;
}

jonathanliu2004 发表于 2008-6-18 19:16:24

嗯,我当时仅是验证为什么产生死循环的问题。

关于后面所说的没有copy上,应该不会的。只是你要考虑到所有的msg都是共享该data_,而data_是不断的变化的,所以你才感觉没有copy。

最后你做的是copy了数据,所以是正确的。

也就是说要正确理解ACE_Message_Block(char * , int )这个构造函数的含义。
页: [1]
查看完整版本: 又一个关于ACE_TASK内部消息队列的问题,呵呵