在下面的线程池中,我们在哪里添加自己的函数,让其能够使用线程池中的线程
class Manager: public ACE_Task<ACE_MT_SYNCH>, private IManager
{
public:
enum {POOL_SIZE = 5, MAX_TIMEOUT = 5};
Manager ()
: shutdown_(0), workers_lock_(), workers_cond_(workers_lock_)
{
ACE_TRACE (ACE_TEXT ("Manager::Manager"));
}
int svc (void)
{
ACE_TRACE (ACE_TEXT ("Manager::svc"));
ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Manager started ")));
// Create pool.
create_worker_pool ();
while (!done ())
{
ACE_Message_Block *mb = 0;
ACE_Time_Value tv ((long)MAX_TIMEOUT);
tv += ACE_OS::time (0);
// Get a message request.
if (this->getq (mb, &tv) < 0)
{
shut_down ();
break;
}
// Choose a worker.
Worker *worker = 0;
{
ACE_GUARD_RETURN (ACE_Thread_Mutex,
worker_mon, this->workers_lock_, -1);
while (this->workers_.is_empty ())
workers_cond_.wait ();
this->workers_.dequeue_head (worker);
}
// Ask the worker to do the job.
// 将请求消息放入到worker的消息队列中
worker->putq (mb);
}
return 0;
}
int shut_down (void);
ACE_thread_t thread_id (Worker *worker);
virtual int return_to_work (Worker *worker)
{
ACE_GUARD_RETURN (ACE_Thread_Mutex,
worker_mon, this->workers_lock_, -1);
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%t) Worker %d returning to work. "),
worker->thr_mgr ()->thr_self ()));
// 将worker放入到线程池队列
this->workers_.enqueue_tail (worker);
// 触发条件变量,通知manager
this->workers_cond_.signal ();
return 0;
}
private:
// 创建worker线程池
int create_worker_pool (void)
{
ACE_GUARD_RETURN (ACE_Thread_Mutex,
worker_mon,
this->workers_lock_,
-1);
for (int i = 0; i < POOL_SIZE; i++)
{
Worker *worker;
// 创建worker
ACE_NEW_RETURN (worker, Worker (this), -1);
// 放入线程池队列
this->workers_.enqueue_tail (worker);
// 激活线程,调用该函数后,worker线程被创建,由于worker
// 是ACE_Task的子类,线程激活后,从svc函数开始执行
worker->activate ();
}
return 0;
}
int done (void);
private:
int shutdown_;
ACE_Thread_Mutex workers_lock_;
ACE_Condition<ACE_Thread_Mutex> workers_cond_;
ACE_Unbounded_Queue<Worker* > workers_;
};
class Worker : public ACE_Task<ACE_MT_SYNCH>
{
public:
Worker (IManager *manager) : manager_(manager) { }
virtual int svc (void)
{
thread_id_ = ACE_Thread::self ();
while (1)
{
ACE_Message_Block *mb = 0;
if (this->getq (mb) == -1)
ACE_ERROR_BREAK
((LM_ERROR, ACE_TEXT ("%p "), ACE_TEXT ("getq")));
// 如果是MB_HANGUP消息,就结束线程
if (mb->msg_type () == ACE_Message_Block::MB_HANGUP)
{
ACE_DEBUG ((LM_INFO,
ACE_TEXT ("(%t) Shutting down ")));
mb->release ();
break;
}
// Process the message.
process_message (mb);
// Return to work.
// 这里会将自己放到线程池中,并通过workers_cond_来通知manager
this->manager_->return_to_work (this);
}
return 0;
}
// Listing 2
ACE_thread_t thread_id (void)
{
return thread_id_;
}
private:
void process_message (ACE_Message_Block *mb)
{
ACE_TRACE (ACE_TEXT ("Worker::process_message"));
int msgId;
ACE_OS::memcpy (&msgId, mb->rd_ptr (), sizeof(int));
mb->release ();
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%t) Started processing message %d "),
msgId));
ACE_OS::sleep (3);
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%t) Finished processing message %d "),
msgId));
}
IManager *manager_;
ACE_thread_t thread_id_;
};
int ACE_TMAIN (int, ACE_TCHAR *[])
{
Manager tp;
tp.activate ();
// Wait for a moment every time you send a message.
ACE_Time_Value tv;
tv.msec (100);
ACE_Message_Block *mb;
for (int i = 0; i < 30; i++)
{
ACE_NEW_RETURN
(mb, ACE_Message_Block(sizeof(int)), -1);
ACE_OS::memcpy (mb->wr_ptr (), &i, sizeof(int));
ACE_OS::sleep (tv);
// Add a new work item.
// 这里将请求消息首先发到了manager线程,由manager线程负责分发
tp.putq (mb);
}
// 主线程等待子线程结束
ACE_Thread_Manager::instance ()->wait ();
return 0;
}
比如添加一个简单的socket的server
include <iostream>
#include <string>
#include <ace/ACE.h>
#include <ace/INET_Addr.h>
#include <ace/SOCK_Connector.h>
#include <ace/SOCK_Stream.h>
int main( int argc, char* argv[] )
{
ACE::init();//初始化ACE库,在windows下一定要
std::string str = "hello world";
//设置服务器地址
//第一个参数是端口,第二个是ip地址,也可以是域名。
//可以先定义一个地址对象,再用ACE_INET_Addr的set函数来设定。
//地址的配置很多,具体的参照文档
ACE_INET_Addr peer_addr( 5050, "127.0.0.1" );
ACE_SOCK_Stream peer_stream;//定义一个通讯队形
ACE_SOCK_Connector peer_connector;//定义一个主动连接对象
peer_connector.connect( peer_stream, peer_addr );//发起一个连接
peer_stream.send( str.c_str(), str.length() );//发送数据到服务器
str.erase();
str.resize( sizeof( "hello world" ) );
peer_stream.recv( (void*)str.c_str(), str.length() );//接收来自服务器的信息
std::cout << "from server message : " << str << std::endl;
ACE::fini();
return 0;
}
应该怎么添加 |