minglong717 发表于 2008-6-25 13:24:19

Proactor实现写操作问题

#include "ace/Message_Queue.h"
#include "ace/Asynch_IO.h"
#include "ace/OS.h"
#include "ace/Proactor.h"
#include "ace/Asynch_Acceptor.h"
#include "ace/SOCK_Stream.h"
#include "ace/INET_Addr.h"
#include "ace/Task.h"

class CProactor : public ACE_Service_Handler,      public   ACE_Task<ACE_NULL_SYNCH>
{
public:
      CProactor();
      virtual      ~CProactor();

      //打开连接.
      virtual void open( ACE_HANDLE h, ACE_Message_Block& );

      //异步读完成后会调用此函数
      virtual void handle_read_stream( const ACE_Asynch_Read_Stream::Result& result );
      //异步写完成后会调用此函数
      virtual void handle_write_stream( const ACE_Asynch_Write_Stream::Result& result );

      bool      Send( void* pBuf, int nLen );

      int                put( ACE_Message_Block *mb, ACE_Time_Value *timeout = 0 );

      void      start_write(ACE_Message_Block* mblk = 0 );

      //获得对端IP地址.
      virtual void addresses (const ACE_INET_Addr &remote_address, const ACE_INET_Addr &local_address )
      {
                ACE_DEBUG( ( LM_DEBUG, ">>>>>IP: %s, 端口:%d\n",
                        remote_address.get_host_addr(), remote_address.get_port_number() ) );

                ACE_OS::memcpy( &m_Addr, &remote_address, sizeof(ACE_INET_Addr) );
      }

      //整理字符串.
      void      rtrim( char* str );

public:
      //根据消息类型分派消息
      int                              MsgHandle( void* pBuf );

private:
      ACE_Asynch_Read_Stream                                        reader_;                //从socket中异步读取数据的流
      ACE_Asynch_Write_Stream                                        writer_;                //向socket中异步写入数据的流

      ACE_Atomic_Op<ACE_Thread_Mutex, bool>      m_bValid;                              //连接是否失效,如果失效,则读写都不可能,实例即将被销毁
      ACE_Atomic_Op<ACE_Thread_Mutex, bool>      m_bWritePending;                        //是否有写操作正在异步进行,如果有,则不能马上销毁实例

      ACE_Message_Block                                                m_data;                        //接收的数据
      ACE_INET_Addr                                                m_Addr;                              //临时保存客户端ip地址.

      char                                                      buffer[ MAX_MSG_SIZE ];      //接收消息使用.
};

-------------------------------------------------------------------------------
Proactor.cpp

/*
      收发消息都在这个类里操作.
*/
#include "Proactor.h"

CProactor::CProactor()
{
      m_data.size( MAX_MSG_SIZE );
      m_bValid = false;
      m_bWritePending = false;
}

CProactor::~CProactor()
{
      if ( this->handle() != ACE_INVALID_HANDLE )
      {
                //关闭连接.
                ACE_DEBUG( ( LM_DEBUG, "<<<<<IP: %s, 端口:%d\n",
                        m_Addr.get_host_addr(), m_Addr.get_port_number() ) );

                ACE_OS::closesocket( this->handle() );
      }
}

void      CProactor:open( ACE_HANDLE h, ACE_Message_Block& )
{
      this->handle_ = h;

      //初始化异步读写流
      if ( -1 == reader_.open( *this, this->handle_ ) )
      {
                delete this;
                return;
      }
      if ( -1 == writer_.open( *this, this->handle_ ) )
      {
                delete this;
                return;
      }

      m_bValid = true;

      //接收包
      ACE_Message_Block *mb = new ACE_Message_Block( buffer, 1024 );
      if ( 0 != reader_.read( *mb, mb->space() ) )
      {
                delete this;
                return;
      }

      return;
}

void      CProactor::handle_read_stream( const ACE_Asynch_Read_Stream::Result &result )
{
      ACE_Message_Block &mb = result.message_block();
      if ( !result.success() || result.bytes_transferred() == 0 )
      {
                mb.release();
                delete this;
                return;
      }

      //连接关闭
      if ( 0 == result.bytes_transferred() )
      {
                m_bValid = false;
                reader_.cancel();
                writer_.cancel();
                if ( !m_bWritePending.value() ) //如果没有正在进行的写操作,则销毁实例
                {
                        delete this;
                }
                return;
      }

      //处理消息
      MsgHandle( (void*)(mb.base() ) );

      mb.release();

      ACE_Message_Block *nmb = new ACE_Message_Block( buffer, 1024 );

      if ( this->reader_.read( *nmb, nmb->space() ) != 0 )

      return;
}

void      CProactor::handle_write_stream( const ACE_Asynch_Write_Stream::Result &result )
{
      if ( !m_bValid.value() )
      {
                //连接不再有效,销毁实例   
                result.message_block().release();   
                delete this;   
                return;
      }

      //发送失败   
      if ( result.error() )   
      {   
                delete this;
                return;   
      }   

      if (result.message_block().length())   
      {
                //没有写完应该写的字节数,要继续往下写   
                start_write(&result.message_block());   
      }   
      else//释放写内存   
      {
                ACE_DEBUG( ( LM_DEBUG, "数据发送完毕。\n" ) );
                result.message_block().release();   
      }

      start_write(0);   
}

int                CProactor::put( ACE_Message_Block *mb, ACE_Time_Value *timeout )
{   
      if ( !m_bWritePending.value() )   
      {   
                start_write(mb);   
                return   0;   
      }   
      return   putq(mb,   timeout);   
}   

void      CProactor::start_write(ACE_Message_Block* mblk )
{   
      if ( mblk == 0 )
      {
                ACE_Time_Value   nonblock(0);
                getq( mblk, &nonblock );
      }

      if ( mblk != 0 )
      {
                m_bWritePending = true;
                size_t nLen = mblk->length();
                int nnn = writer_.write( *mblk, nLen );
                //ACE_DEBUG( ( LM_DEBUG, "write result: %d\n", nnn ) );

                if ( nnn == -1 )
                {
                        ungetq(mblk);
                }
      }   
      else
      {
                m_bWritePending = false;
      }
}   

bool      CProactor::Send( void* pBuf, int nLen )
{
      if ( !m_bValid.value() )//连接已经失效,不可以再发送
      {
                return false;
      }

      ACE_Message_Block* mb = new ACE_Message_Block( nLen );
      if ( 0 == mb )
      {
                delete this;
                return false;
      }

      ACE_OS::memcpy( mb->wr_ptr(), pBuf, nLen );
      mb->wr_ptr( nLen );

      if ( -1 == put(mb) )
      {
                delete this;
                return false;
      }

      return true;
}   

/***********************************************************************/
//      处理消息
//      从客户端发过来的消息必须先经过该函数来处理.
/***********************************************************************/
int CProactor::MsgHandle( void* pBuf )
{
            //在这里实现将消息发送给另一个连接客户端.这样的话,显示pSend的m_bValid.value()值是false,
            //强制将这个值设置为true的话写就会出错...
            CProactor*   pSend;
            ACE_NEW_RETURN( pSend, CProactor(), -1 );
            pSend->handle(this->handle_);
            pSend->Send( "123\n", 4 );
            return 100;
}


如果想同时给多个连接的客户端发送消息或者 收到一个客户端的消息后将消息发送给另一个客户端    需要怎么实现啊?

winston 发表于 2008-6-25 13:39:52

你可能还没理解PROACTOR的框架。
如果需要多个客户端同时相互操作,需要你自己保存客户端的地址信息、对象信息。一般是做个MAP映射即可。
在需要一个客户端信息发到另外的客户端的时候,只需要按图索骥,找到对象指针,进行发送。

minglong717 发表于 2008-6-25 13:56:31

刚刚试过了...我将连接的客户端的的信息放到 LIST 了...

能问一下...上面这样的框架大概可以支持处理多少客户端的消息处理吗?

winston 发表于 2008-6-25 14:35:52

没有标准 - 看系统的资源了。内存、CPU处理能力、网络带宽。。。

minglong717 发表于 2008-6-25 14:49:28

哦...谢谢...麻烦您了...
页: [1]
查看完整版本: Proactor实现写操作问题