|
小弟是ACE初学者,看完两本教材和一些网上的帖子,自己写了个基于proactor框架的服务器,代码编译通过,如下:
//////////////////////////////////
server.h
class Server:public ACE_Service_Handler
{
public:
Server (void);
~Server (void);
virtual void open (ACE_HANDLE handle,
ACE_Message_Block &message_block);
protected:
virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
// This is called when asynchronous writes to the socket complete
virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
// This is called when asynchronous reads from the socket complete
private:
int init_read_stream(void);
int initWriteStream(ACE_Message_Block &mb ,size_t nBytes);
ACE_Asynch_Read_Stream rs_;
ACE_Asynch_Write_Stream ws_;
};
////////////////////////////////////////////////////////////////////////////////
Server.cpp
class LogicThread:public ACE_Task<ACE_MT_SYNCH>
{
public:
enum{POOL_SIZE=5};
LogicThread(){}
int open()
{
return this->activate(THR_NEW_LWP,POOL_SIZE);
}
int close()
{
ACE_Message_Block * hangup;
for(int i=0; i<POOL_SIZE; i++)
{
ACE_NEW_RETURN (hangup, ACE_Message_Block(0, ACE_Message_Block::MB_HANGUP), -1);
this->putq (hangup);
}
this->wait ();
return 0;
}
virtual int svc(void)
{
while(1)
{
ACE_Message_Block *mb=NULL;
if(this->getq(mb)==-1)
{
ACE_ERROR_RETURN ((LM_ERROR,ACE_TEXT("%p\n"), ACE_TEXT("getq")), -1);
break;
}
if (mb->msg_type () == ACE_Message_Block::MB_HANGUP)
{
mb->release ();
break;
}
process_message(mb); //////////消息处理
}
return 0;
}
private:
void process_message (ACE_Message_Block *mb)
{
//消息解析 略
//根据解析的结果发送数据给客户端实现响应。
}
};
Server::Server (void){}
Server::~Server (void)
{
if (this->handle () != ACE_INVALID_HANDLE)
ACE_OS::closesocket (this->handle ());
}
void
Server::open (ACE_HANDLE h,ACE_Message_Block &message_block)
{
this->handle(h);
if (this->rs_.open (*this) != 0 || this->ws_.open (*this) != 0 )
{
delete this;
return;
}
if(this->init_read_stream() != 0){
return;
}
}
void
Server::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
{
ACE_Message_Block &mb = result.message_block();
if (!result.success()||result.bytes_transferred()==0)
{
mb.release();
delete this;
}
LogicThread pool;
pool.putq(&mb);
}
int Server::init_read_stream(void)
{
ACE_Message_Block *mb=0;
ACE_NEW_RETURN(mb,ACE_Message_Block(1024),-1);
if(rs_.read(*mb,mb->space())==-1)
{
ACE_ERROR_RETURN ((LM_ERROR,
"%p\n",
"ACE_Asynch_Read_Stream::read"),
-1);
}
int Server:: initWriteStream(ACE_Message_Block &mb ,size_t nBytes)
{
if (this->ws_.write (mb , nBytes ) == -1)
{
mb.release ();
ACE_ERROR_RETURN((LM_ERROR, "%p\n", "ACE_Asynch_Write_File::write"), -1);
return -1;
}
return 0;
}
return 0;
}
void
Server::handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
{
ACE_DEBUG ((LM_DEBUG, ACE_TEXT("Write completely\n")));
//释放消息
result.message_block().release();
}
class NetThread : public ACE_Task_Base
{
public:
int open()
{
return this->activate();
}
int close()
{
ACE_Proactor::instance()->proactor_end_event_loop();
this->wait();
return 0;
}
virtual int svc()
{
ACE_INET_Addr listen_addr(3000);
ACE_Asynch_Acceptor<Server> aio_acceptor;
if (0 != aio_acceptor.open (listen_addr, 0, 0, 5, 1, 0, 0))
ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("acceptor open")), 1);
ACE_Proactor::instance ()->proactor_run_event_loop ();
ACE_DEBUG ((LM_DEBUG, ACE_TEXT("Thread ended\n")));
return 0;
}
};
int ACE_TMAIN (int, ACE_TCHAR *[])
{
NetThread net_thread;
LogicThread logic_thread;
net_thread.open();
logic_thread.open();
int x;
std::cin >> x;
logic_thread.close();
net_thread.close();
return 0;
}
线程NetThread和线程池LogicThread实现网络IO和逻辑处理的分离,在handle_read_stream()中,IO线程接收到的数据使用putq()方法将该数据包发送到逻辑线程的消息队列中,逻辑线程接收消息并处理。 请问各位高人,我的这种架构思想没错吧?在逻辑线程收到消息以后在process_message()函数中进行处理,根据不同的结果进行不同的响应。但是有个问题难倒我了,如果向客户端发送响应,怎么调用initWriteStream()发送呢,我该怎么写:'( ,谢谢赐教!其他地方有什么不对的,也可以提醒下我!:) |
|