|
#include "ace/Message_Queue.h"
#include "ace/Asynch_IO.h"
#include "ace/OS.h"
#include "ace/Proactor.h"
#include "ace/Asynch_Acceptor.h"
#include "ace/SOCK_Stream.h"
#include "ace/INET_Addr.h"
#include "ace/Task.h"
class CProactor : public ACE_Service_Handler, public ACE_Task<ACE_NULL_SYNCH>
{
public:
CProactor();
virtual ~CProactor();
//打开连接.
virtual void open( ACE_HANDLE h, ACE_Message_Block& );
//异步读完成后会调用此函数
virtual void handle_read_stream( const ACE_Asynch_Read_Stream::Result& result );
//异步写完成后会调用此函数
virtual void handle_write_stream( const ACE_Asynch_Write_Stream::Result& result );
bool Send( void* pBuf, int nLen );
int put( ACE_Message_Block *mb, ACE_Time_Value *timeout = 0 );
void start_write(ACE_Message_Block* mblk = 0 );
//获得对端IP地址.
virtual void addresses (const ACE_INET_Addr &remote_address, const ACE_INET_Addr &local_address )
{
ACE_DEBUG( ( LM_DEBUG, ">>>>>IP: %s, 端口:%d\n",
remote_address.get_host_addr(), remote_address.get_port_number() ) );
ACE_OS::memcpy( &m_Addr, &remote_address, sizeof(ACE_INET_Addr) );
}
//整理字符串.
void rtrim( char* str );
public:
//根据消息类型分派消息
int MsgHandle( void* pBuf );
private:
ACE_Asynch_Read_Stream reader_; //从socket中异步读取数据的流
ACE_Asynch_Write_Stream writer_; //向socket中异步写入数据的流
ACE_Atomic_Op<ACE_Thread_Mutex, bool> m_bValid; //连接是否失效,如果失效,则读写都不可能,实例即将被销毁
ACE_Atomic_Op<ACE_Thread_Mutex, bool> m_bWritePending; //是否有写操作正在异步进行,如果有,则不能马上销毁实例
ACE_Message_Block m_data; //接收的数据
ACE_INET_Addr m_Addr; //临时保存客户端ip地址.
char buffer[ MAX_MSG_SIZE ]; //接收消息使用.
};
-------------------------------------------------------------------------------
Proactor.cpp
/*
收发消息都在这个类里操作.
*/
#include "Proactor.h"
CProactor::CProactor()
{
m_data.size( MAX_MSG_SIZE );
m_bValid = false;
m_bWritePending = false;
}
CProactor::~CProactor()
{
if ( this->handle() != ACE_INVALID_HANDLE )
{
//关闭连接.
ACE_DEBUG( ( LM_DEBUG, "<<<<<IP: %s, 端口:%d\n",
m_Addr.get_host_addr(), m_Addr.get_port_number() ) );
ACE_OS::closesocket( this->handle() );
}
}
void CProactor:open( ACE_HANDLE h, ACE_Message_Block& )
{
this->handle_ = h;
//初始化异步读写流
if ( -1 == reader_.open( *this, this->handle_ ) )
{
delete this;
return;
}
if ( -1 == writer_.open( *this, this->handle_ ) )
{
delete this;
return;
}
m_bValid = true;
//接收包
ACE_Message_Block *mb = new ACE_Message_Block( buffer, 1024 );
if ( 0 != reader_.read( *mb, mb->space() ) )
{
delete this;
return;
}
return;
}
void CProactor::handle_read_stream( const ACE_Asynch_Read_Stream::Result &result )
{
ACE_Message_Block &mb = result.message_block();
if ( !result.success() || result.bytes_transferred() == 0 )
{
mb.release();
delete this;
return;
}
//连接关闭
if ( 0 == result.bytes_transferred() )
{
m_bValid = false;
reader_.cancel();
writer_.cancel();
if ( !m_bWritePending.value() ) //如果没有正在进行的写操作,则销毁实例
{
delete this;
}
return;
}
//处理消息
MsgHandle( (void*)(mb.base() ) );
mb.release();
ACE_Message_Block *nmb = new ACE_Message_Block( buffer, 1024 );
if ( this->reader_.read( *nmb, nmb->space() ) != 0 )
return;
}
void CProactor::handle_write_stream( const ACE_Asynch_Write_Stream::Result &result )
{
if ( !m_bValid.value() )
{
//连接不再有效,销毁实例
result.message_block().release();
delete this;
return;
}
//发送失败
if ( result.error() )
{
delete this;
return;
}
if (result.message_block().length())
{
//没有写完应该写的字节数,要继续往下写
start_write(&result.message_block());
}
else//释放写内存
{
ACE_DEBUG( ( LM_DEBUG, "数据发送完毕。\n" ) );
result.message_block().release();
}
start_write(0);
}
int CProactor::put( ACE_Message_Block *mb, ACE_Time_Value *timeout )
{
if ( !m_bWritePending.value() )
{
start_write(mb);
return 0;
}
return putq(mb, timeout);
}
void CProactor::start_write(ACE_Message_Block* mblk )
{
if ( mblk == 0 )
{
ACE_Time_Value nonblock(0);
getq( mblk, &nonblock );
}
if ( mblk != 0 )
{
m_bWritePending = true;
size_t nLen = mblk->length();
int nnn = writer_.write( *mblk, nLen );
//ACE_DEBUG( ( LM_DEBUG, "write result: %d\n", nnn ) );
if ( nnn == -1 )
{
ungetq(mblk);
}
}
else
{
m_bWritePending = false;
}
}
bool CProactor::Send( void* pBuf, int nLen )
{
if ( !m_bValid.value() )//连接已经失效,不可以再发送
{
return false;
}
ACE_Message_Block* mb = new ACE_Message_Block( nLen );
if ( 0 == mb )
{
delete this;
return false;
}
ACE_OS::memcpy( mb->wr_ptr(), pBuf, nLen );
mb->wr_ptr( nLen );
if ( -1 == put(mb) )
{
delete this;
return false;
}
return true;
}
/***********************************************************************/
// 处理消息
// 从客户端发过来的消息必须先经过该函数来处理.
/***********************************************************************/
int CProactor::MsgHandle( void* pBuf )
{
//在这里实现将消息发送给另一个连接客户端.这样的话,显示pSend的m_bValid.value()值是false,
//强制将这个值设置为true的话写就会出错...
CProactor* pSend;
ACE_NEW_RETURN( pSend, CProactor(), -1 );
pSend->handle(this->handle_);
pSend->Send( "123\n", 4 );
return 100;
}
如果想同时给 多个连接的客户端发送消息 或者 收到一个客户端的消息后将消息发送给另一个客户端 需要怎么实现啊? |
|