找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 5876|回复: 5

求助:半同步/半异步线程池的使用

[复制链接]
发表于 2012-5-29 13:27:22 | 显示全部楼层 |阅读模式
在下面的线程池中,我们在哪里添加自己的函数,让其能够使用线程池中的线程
class Manager: public ACE_Task<ACE_MT_SYNCH>, private IManager
{
public:
    enum {POOL_SIZE = 5, MAX_TIMEOUT = 5};

    Manager ()
        : shutdown_(0), workers_lock_(), workers_cond_(workers_lock_)
    {
        ACE_TRACE (ACE_TEXT ("Manager::Manager"));
    }

   
    int svc (void)
    {
        ACE_TRACE (ACE_TEXT ("Manager::svc"));

        ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Manager started ")));

        // Create pool.
        create_worker_pool ();

        while (!done ())
        {
            ACE_Message_Block *mb = 0;
            ACE_Time_Value tv ((long)MAX_TIMEOUT);
            tv += ACE_OS::time (0);

            // Get a message request.
            if (this->getq (mb, &tv) < 0)
            {
                shut_down ();
                break;
            }

            // Choose a worker.
            Worker *worker = 0;
            
            {   
                ACE_GUARD_RETURN (ACE_Thread_Mutex,
                    worker_mon, this->workers_lock_, -1);

               
                while (this->workers_.is_empty ())
                    workers_cond_.wait ();

               
                this->workers_.dequeue_head (worker);
            }   

            // Ask the worker to do the job.
            // 将请求消息放入到worker的消息队列中
            worker->putq (mb);
        }

        return 0;
    }

    int shut_down (void);

    ACE_thread_t thread_id (Worker *worker);

   
    virtual int return_to_work (Worker *worker)
    {
        ACE_GUARD_RETURN (ACE_Thread_Mutex,
            worker_mon, this->workers_lock_, -1);
        ACE_DEBUG ((LM_DEBUG,
            ACE_TEXT ("(%t) Worker %d returning to work. "),
            worker->thr_mgr ()->thr_self ()));
        // 将worker放入到线程池队列
        this->workers_.enqueue_tail (worker);
        // 触发条件变量,通知manager
        this->workers_cond_.signal ();

        return 0;
    }

private:
    // 创建worker线程池
    int create_worker_pool (void)
    {
        ACE_GUARD_RETURN (ACE_Thread_Mutex,
            worker_mon,
            this->workers_lock_,
            -1);
        for (int i = 0; i < POOL_SIZE; i++)
        {
            Worker *worker;
            // 创建worker
            ACE_NEW_RETURN (worker, Worker (this), -1);
            // 放入线程池队列
            this->workers_.enqueue_tail (worker);
            // 激活线程,调用该函数后,worker线程被创建,由于worker
            // 是ACE_Task的子类,线程激活后,从svc函数开始执行
            worker->activate ();
        }

        return 0;
    }

    int done (void);

private:
    int shutdown_;
   
    ACE_Thread_Mutex workers_lock_;
    ACE_Condition<ACE_Thread_Mutex> workers_cond_;
   
    ACE_Unbounded_Queue<Worker* > workers_;
};
class Worker : public ACE_Task<ACE_MT_SYNCH>
{
public:
    Worker (IManager *manager) : manager_(manager) { }

    virtual int svc (void)
    {
        thread_id_ = ACE_Thread::self ();
        while (1)
        {
            ACE_Message_Block *mb = 0;
            if (this->getq (mb) == -1)
                ACE_ERROR_BREAK
                ((LM_ERROR, ACE_TEXT ("%p "), ACE_TEXT ("getq")));
            // 如果是MB_HANGUP消息,就结束线程
            if (mb->msg_type () == ACE_Message_Block::MB_HANGUP)
            {
                ACE_DEBUG ((LM_INFO,
                    ACE_TEXT ("(%t) Shutting down ")));
                mb->release ();
                break;
            }
            // Process the message.
            process_message (mb);
            // Return to work.
            // 这里会将自己放到线程池中,并通过workers_cond_来通知manager
            this->manager_->return_to_work (this);
        }

        return 0;
    }
    // Listing 2

    ACE_thread_t thread_id (void)
    {
        return thread_id_;
    }

private:
    void process_message (ACE_Message_Block *mb)
    {
        ACE_TRACE (ACE_TEXT ("Worker::process_message"));
        int msgId;
        ACE_OS::memcpy (&msgId, mb->rd_ptr (), sizeof(int));
        mb->release ();

        ACE_DEBUG ((LM_DEBUG,
            ACE_TEXT ("(%t) Started processing message %d "),
            msgId));
        ACE_OS::sleep (3);
        ACE_DEBUG ((LM_DEBUG,
            ACE_TEXT ("(%t) Finished processing message %d "),
            msgId));
    }

    IManager *manager_;
    ACE_thread_t thread_id_;
};
int ACE_TMAIN (int, ACE_TCHAR *[])
{
    Manager tp;
    tp.activate ();

    // Wait for a moment every time you send a message.
    ACE_Time_Value tv;
    tv.msec (100);

    ACE_Message_Block *mb;
    for (int i = 0; i < 30; i++)
    {
        ACE_NEW_RETURN
            (mb, ACE_Message_Block(sizeof(int)), -1);

        ACE_OS::memcpy (mb->wr_ptr (), &i, sizeof(int));

        ACE_OS::sleep (tv);

        // Add a new work item.
        // 这里将请求消息首先发到了manager线程,由manager线程负责分发
        tp.putq (mb);
    }

    // 主线程等待子线程结束
    ACE_Thread_Manager::instance ()->wait ();
    return 0;
}



比如添加一个简单的socket的server
include <iostream>
#include <string>
#include <ace/ACE.h>
#include <ace/INET_Addr.h>
#include <ace/SOCK_Connector.h>
#include <ace/SOCK_Stream.h>
int main( int argc, char* argv[] )
{
ACE::init();//初始化ACE库,在windows下一定要
std::string str = "hello world";
//设置服务器地址
//第一个参数是端口,第二个是ip地址,也可以是域名。
//可以先定义一个地址对象,再用ACE_INET_Addr的set函数来设定。
//地址的配置很多,具体的参照文档
ACE_INET_Addr peer_addr( 5050, "127.0.0.1" );
ACE_SOCK_Stream peer_stream;//定义一个通讯队形
ACE_SOCK_Connector peer_connector;//定义一个主动连接对象
peer_connector.connect( peer_stream, peer_addr );//发起一个连接
peer_stream.send( str.c_str(), str.length() );//发送数据到服务器
str.erase();
str.resize( sizeof( "hello world" ) );
peer_stream.recv( (void*)str.c_str(), str.length() );//接收来自服务器的信息
std::cout << "from server message : " << str << std::endl;
    ACE::fini();
return 0;
}

应该怎么添加
发表于 2012-5-29 16:01:33 | 显示全部楼层
不知道你要问的问题是什么?没搞明白。
 楼主| 发表于 2012-5-29 16:28:54 | 显示全部楼层
我新人,初次使用线程池,比较白痴一点。其实我想问的就是这个线程池我们该如何使用。假若要用这个线程运行下面的那个程序,该如何做
 楼主| 发表于 2012-5-29 16:29:43 | 显示全部楼层
winston 发表于 2012-5-29 16:01
不知道你要问的问题是什么?没搞明白。

我新人,初次使用线程池,比较白痴一点。其实我想问的就是这个线程池我们该如何使用。假若要用这个线程运行下面的那个程序,该如何做
发表于 2012-5-29 21:38:22 | 显示全部楼层
想象一下“正交”概念。就是线程归线程,处理归处理。
http://www.acejoy.com/forum.php?mod=viewthread&tid=701

有个简单的处理模式,监听对象获得连接通知,构建新的处理器对象进行处理,这些对象的处理和收、发,可以放到线程里面去。

检索一下本站,有不少解释和范本。
发表于 2012-6-7 15:49:59 | 显示全部楼层
把你每个连接要处理的逻辑放到work的 process_message (mb)里面,这个池子在没有数据的时候会关闭。当有消息到来,甩给work去处理消息

顺便问一下,TP_reactor是领导者/追随者的模式?
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-12-22 17:34 , Processed in 0.020436 second(s), 6 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表