cfgky 发表于 2012-5-29 13:27:22

求助:半同步/半异步线程池的使用

在下面的线程池中,我们在哪里添加自己的函数,让其能够使用线程池中的线程
class Manager: public ACE_Task<ACE_MT_SYNCH>, private IManager
{
public:
    enum {POOL_SIZE = 5, MAX_TIMEOUT = 5};

    Manager ()
      : shutdown_(0), workers_lock_(), workers_cond_(workers_lock_)
    {
      ACE_TRACE (ACE_TEXT ("Manager::Manager"));
    }

   
    int svc (void)
    {
      ACE_TRACE (ACE_TEXT ("Manager::svc"));

      ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Manager started ")));

      // Create pool.
      create_worker_pool ();

      while (!done ())
      {
            ACE_Message_Block *mb = 0;
            ACE_Time_Value tv ((long)MAX_TIMEOUT);
            tv += ACE_OS::time (0);

            // Get a message request.
            if (this->getq (mb, &tv) < 0)
            {
                shut_down ();
                break;
            }

            // Choose a worker.
            Worker *worker = 0;
            
            {   
                ACE_GUARD_RETURN (ACE_Thread_Mutex,
                  worker_mon, this->workers_lock_, -1);

               
                while (this->workers_.is_empty ())
                  workers_cond_.wait ();

               
                this->workers_.dequeue_head (worker);
            }   

            // Ask the worker to do the job.
            // 将请求消息放入到worker的消息队列中
            worker->putq (mb);
      }

      return 0;
    }

    int shut_down (void);

    ACE_thread_t thread_id (Worker *worker);

   
    virtual int return_to_work (Worker *worker)
    {
      ACE_GUARD_RETURN (ACE_Thread_Mutex,
            worker_mon, this->workers_lock_, -1);
      ACE_DEBUG ((LM_DEBUG,
            ACE_TEXT ("(%t) Worker %d returning to work. "),
            worker->thr_mgr ()->thr_self ()));
      // 将worker放入到线程池队列
      this->workers_.enqueue_tail (worker);
      // 触发条件变量,通知manager
      this->workers_cond_.signal ();

      return 0;
    }

private:
    // 创建worker线程池
    int create_worker_pool (void)
    {
      ACE_GUARD_RETURN (ACE_Thread_Mutex,
            worker_mon,
            this->workers_lock_,
            -1);
      for (int i = 0; i < POOL_SIZE; i++)
      {
            Worker *worker;
            // 创建worker
            ACE_NEW_RETURN (worker, Worker (this), -1);
            // 放入线程池队列
            this->workers_.enqueue_tail (worker);
            // 激活线程,调用该函数后,worker线程被创建,由于worker
            // 是ACE_Task的子类,线程激活后,从svc函数开始执行
            worker->activate ();
      }

      return 0;
    }

    int done (void);

private:
    int shutdown_;
   
    ACE_Thread_Mutex workers_lock_;
    ACE_Condition<ACE_Thread_Mutex> workers_cond_;
   
    ACE_Unbounded_Queue<Worker* > workers_;
};
class Worker : public ACE_Task<ACE_MT_SYNCH>
{
public:
    Worker (IManager *manager) : manager_(manager) { }

    virtual int svc (void)
    {
      thread_id_ = ACE_Thread::self ();
      while (1)
      {
            ACE_Message_Block *mb = 0;
            if (this->getq (mb) == -1)
                ACE_ERROR_BREAK
                ((LM_ERROR, ACE_TEXT ("%p "), ACE_TEXT ("getq")));
            // 如果是MB_HANGUP消息,就结束线程
            if (mb->msg_type () == ACE_Message_Block::MB_HANGUP)
            {
                ACE_DEBUG ((LM_INFO,
                  ACE_TEXT ("(%t) Shutting down ")));
                mb->release ();
                break;
            }
            // Process the message.
            process_message (mb);
            // Return to work.
            // 这里会将自己放到线程池中,并通过workers_cond_来通知manager
            this->manager_->return_to_work (this);
      }

      return 0;
    }
    // Listing 2

    ACE_thread_t thread_id (void)
    {
      return thread_id_;
    }

private:
    void process_message (ACE_Message_Block *mb)
    {
      ACE_TRACE (ACE_TEXT ("Worker::process_message"));
      int msgId;
      ACE_OS::memcpy (&msgId, mb->rd_ptr (), sizeof(int));
      mb->release ();

      ACE_DEBUG ((LM_DEBUG,
            ACE_TEXT ("(%t) Started processing message %d "),
            msgId));
      ACE_OS::sleep (3);
      ACE_DEBUG ((LM_DEBUG,
            ACE_TEXT ("(%t) Finished processing message %d "),
            msgId));
    }

    IManager *manager_;
    ACE_thread_t thread_id_;
};
int ACE_TMAIN (int, ACE_TCHAR *[])
{
    Manager tp;
    tp.activate ();

    // Wait for a moment every time you send a message.
    ACE_Time_Value tv;
    tv.msec (100);

    ACE_Message_Block *mb;
    for (int i = 0; i < 30; i++)
    {
      ACE_NEW_RETURN
            (mb, ACE_Message_Block(sizeof(int)), -1);

      ACE_OS::memcpy (mb->wr_ptr (), &i, sizeof(int));

      ACE_OS::sleep (tv);

      // Add a new work item.
      // 这里将请求消息首先发到了manager线程,由manager线程负责分发
      tp.putq (mb);
    }

    // 主线程等待子线程结束
    ACE_Thread_Manager::instance ()->wait ();
    return 0;
}



比如添加一个简单的socket的server
include <iostream>
#include <string>
#include <ace/ACE.h>
#include <ace/INET_Addr.h>
#include <ace/SOCK_Connector.h>
#include <ace/SOCK_Stream.h>
int main( int argc, char* argv[] )
{
ACE::init();//初始化ACE库,在windows下一定要
std::string str = "hello world";
//设置服务器地址
//第一个参数是端口,第二个是ip地址,也可以是域名。
//可以先定义一个地址对象,再用ACE_INET_Addr的set函数来设定。
//地址的配置很多,具体的参照文档
ACE_INET_Addr peer_addr( 5050, "127.0.0.1" );
ACE_SOCK_Stream peer_stream;//定义一个通讯队形
ACE_SOCK_Connector peer_connector;//定义一个主动连接对象
peer_connector.connect( peer_stream, peer_addr );//发起一个连接
peer_stream.send( str.c_str(), str.length() );//发送数据到服务器
str.erase();
str.resize( sizeof( "hello world" ) );
peer_stream.recv( (void*)str.c_str(), str.length() );//接收来自服务器的信息
std::cout << "from server message : " << str << std::endl;
    ACE::fini();
return 0;
}

应该怎么添加

winston 发表于 2012-5-29 16:01:33

不知道你要问的问题是什么?没搞明白。

cfgky 发表于 2012-5-29 16:28:54

我新人,初次使用线程池,比较白痴一点。其实我想问的就是这个线程池我们该如何使用。假若要用这个线程运行下面的那个程序,该如何做

cfgky 发表于 2012-5-29 16:29:43

winston 发表于 2012-5-29 16:01 static/image/common/back.gif
不知道你要问的问题是什么?没搞明白。

我新人,初次使用线程池,比较白痴一点。其实我想问的就是这个线程池我们该如何使用。假若要用这个线程运行下面的那个程序,该如何做

winston 发表于 2012-5-29 21:38:22

想象一下“正交”概念。就是线程归线程,处理归处理。
http://www.acejoy.com/forum.php?mod=viewthread&tid=701

有个简单的处理模式,监听对象获得连接通知,构建新的处理器对象进行处理,这些对象的处理和收、发,可以放到线程里面去。

检索一下本站,有不少解释和范本。

此间足迹 发表于 2012-6-7 15:49:59

把你每个连接要处理的逻辑放到work的 process_message (mb)里面,这个池子在没有数据的时候会关闭。当有消息到来,甩给work去处理消息

顺便问一下,TP_reactor是领导者/追随者的模式?
页: [1]
查看完整版本: 求助:半同步/半异步线程池的使用