找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 5638|回复: 4

Proactor实现写操作问题

[复制链接]
发表于 2008-6-25 13:24:19 | 显示全部楼层 |阅读模式
#include "ace/Message_Queue.h"
#include "ace/Asynch_IO.h"
#include "ace/OS.h"
#include "ace/Proactor.h"
#include "ace/Asynch_Acceptor.h"
#include "ace/SOCK_Stream.h"
#include "ace/INET_Addr.h"
#include "ace/Task.h"

class CProactor : public ACE_Service_Handler,        public   ACE_Task<ACE_NULL_SYNCH>
{
public:
        CProactor();
        virtual        ~CProactor();

        //打开连接.
        virtual void open( ACE_HANDLE h, ACE_Message_Block& );

        //异步读完成后会调用此函数
        virtual void handle_read_stream( const ACE_Asynch_Read_Stream::Result& result );
        //异步写完成后会调用此函数
        virtual void handle_write_stream( const ACE_Asynch_Write_Stream::Result& result );

        bool        Send( void* pBuf, int nLen );

        int                put( ACE_Message_Block *mb, ACE_Time_Value *timeout = 0 );

        void        start_write(ACE_Message_Block* mblk = 0 );

        //获得对端IP地址.
        virtual void addresses (const ACE_INET_Addr &remote_address, const ACE_INET_Addr &local_address )
        {
                ACE_DEBUG( ( LM_DEBUG, ">>>>>IP: %s, 端口:%d\n",
                        remote_address.get_host_addr(), remote_address.get_port_number() ) );

                ACE_OS::memcpy( &m_Addr, &remote_address, sizeof(ACE_INET_Addr) );
        }

        //整理字符串.
        void        rtrim( char* str );

public:
        //根据消息类型分派消息
        int                                MsgHandle( void* pBuf );

private:
        ACE_Asynch_Read_Stream                                        reader_;                //从socket中异步读取数据的流
        ACE_Asynch_Write_Stream                                        writer_;                //向socket中异步写入数据的流

        ACE_Atomic_Op<ACE_Thread_Mutex, bool>        m_bValid;                                //连接是否失效,如果失效,则读写都不可能,实例即将被销毁
        ACE_Atomic_Op<ACE_Thread_Mutex, bool>        m_bWritePending;                        //是否有写操作正在异步进行,如果有,则不能马上销毁实例

        ACE_Message_Block                                                m_data;                        //接收的数据
        ACE_INET_Addr                                                m_Addr;                                //临时保存客户端ip地址.

        char                                                        buffer[ MAX_MSG_SIZE ];        //接收消息使用.
};

-------------------------------------------------------------------------------
Proactor.cpp

/*
        收发消息都在这个类里操作.
*/
#include "Proactor.h"

CProactor::CProactor()
{
        m_data.size( MAX_MSG_SIZE );
        m_bValid = false;
        m_bWritePending = false;
}

CProactor::~CProactor()
{
        if ( this->handle() != ACE_INVALID_HANDLE )
        {
                //关闭连接.
                ACE_DEBUG( ( LM_DEBUG, "<<<<<IP: %s, 端口:%d\n",
                        m_Addr.get_host_addr(), m_Addr.get_port_number() ) );

                ACE_OS::closesocket( this->handle() );
        }
}

void        CProactor:open( ACE_HANDLE h, ACE_Message_Block& )
{
        this->handle_ = h;

        //初始化异步读写流
        if ( -1 == reader_.open( *this, this->handle_ ) )
        {
                delete this;
                return;
        }
        if ( -1 == writer_.open( *this, this->handle_ ) )
        {
                delete this;
                return;
        }

        m_bValid = true;

        //接收包
        ACE_Message_Block *mb = new ACE_Message_Block( buffer, 1024 );
        if ( 0 != reader_.read( *mb, mb->space() ) )
        {
                delete this;
                return;
        }

        return;
}

void        CProactor::handle_read_stream( const ACE_Asynch_Read_Stream::Result &result )
{
        ACE_Message_Block &mb = result.message_block();
        if ( !result.success() || result.bytes_transferred() == 0 )
        {
                mb.release();
                delete this;
                return;
        }

        //连接关闭
        if ( 0 == result.bytes_transferred() )
        {
                m_bValid = false;
                reader_.cancel();
                writer_.cancel();
                if ( !m_bWritePending.value() ) //如果没有正在进行的写操作,则销毁实例
                {
                        delete this;
                }
                return;
        }

        //处理消息
        MsgHandle( (void*)(mb.base() ) );

        mb.release();

        ACE_Message_Block *nmb = new ACE_Message_Block( buffer, 1024 );

        if ( this->reader_.read( *nmb, nmb->space() ) != 0 )

        return;
}

void        CProactor::handle_write_stream( const ACE_Asynch_Write_Stream::Result &result )
{
        if ( !m_bValid.value() )
        {
                //连接不再有效,销毁实例   
                result.message_block().release();   
                delete this;   
                return;
        }

        //发送失败   
        if ( result.error() )   
        {   
                delete this;
                return;   
        }   

        if (result.message_block().length())   
        {
                //没有写完应该写的字节数,要继续往下写   
                start_write(&result.message_block());   
        }   
        else//释放写内存   
        {
                ACE_DEBUG( ( LM_DEBUG, "数据发送完毕。\n" ) );
                result.message_block().release();   
        }

        start_write(0);   
}

int                CProactor::put( ACE_Message_Block *mb, ACE_Time_Value *timeout )
{   
        if ( !m_bWritePending.value() )   
        {   
                start_write(mb);   
                return   0;   
        }   
        return   putq(mb,   timeout);   
}   

void        CProactor::start_write(ACE_Message_Block* mblk )
{   
        if ( mblk == 0 )
        {
                ACE_Time_Value   nonblock(0);
                getq( mblk, &nonblock );
        }

        if ( mblk != 0 )
        {
                m_bWritePending = true;
                size_t nLen = mblk->length();
                int nnn = writer_.write( *mblk, nLen );
                //ACE_DEBUG( ( LM_DEBUG, "write result: %d\n", nnn ) );

                if ( nnn == -1 )
                {
                        ungetq(mblk);
                }
        }   
        else
        {
                m_bWritePending = false;
        }
}   

bool        CProactor::Send( void* pBuf, int nLen )
{
        if ( !m_bValid.value() )//连接已经失效,不可以再发送
        {
                return false;
        }

        ACE_Message_Block* mb = new ACE_Message_Block( nLen );
        if ( 0 == mb )
        {
                delete this;
                return false;
        }

        ACE_OS::memcpy( mb->wr_ptr(), pBuf, nLen );
        mb->wr_ptr( nLen );

        if ( -1 == put(mb) )
        {
                delete this;
                return false;
        }

        return true;
}   

/***********************************************************************/
//        处理消息
//        从客户端发过来的消息必须先经过该函数来处理.
/***********************************************************************/
int CProactor::MsgHandle( void* pBuf )
{
            //在这里实现将消息发送给另一个连接客户端.这样的话,显示pSend的m_bValid.value()值是false,
            //强制将这个值设置为true的话写就会出错...
            CProactor*   pSend;
            ACE_NEW_RETURN( pSend, CProactor(), -1 );
            pSend->handle(this->handle_);
            pSend->Send( "123\n", 4 );

            return 100;
}


如果想同时给  多个连接的客户端发送消息  或者 收到一个客户端的消息后将消息发送给另一个客户端    需要怎么实现啊?
发表于 2008-6-25 13:39:52 | 显示全部楼层
你可能还没理解PROACTOR的框架。
如果需要多个客户端同时相互操作,需要你自己保存客户端的地址信息、对象信息。一般是做个MAP映射即可。
在需要一个客户端信息发到另外的客户端的时候,只需要按图索骥,找到对象指针,进行发送。
 楼主| 发表于 2008-6-25 13:56:31 | 显示全部楼层
刚刚试过了...我将连接的客户端的的信息放到 LIST 了...

能问一下...上面这样的框架大概可以支持处理多少客户端的消息处理吗?
发表于 2008-6-25 14:35:52 | 显示全部楼层
没有标准 - 看系统的资源了。内存、CPU处理能力、网络带宽。。。
 楼主| 发表于 2008-6-25 14:49:28 | 显示全部楼层
哦...谢谢...麻烦您了...
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-12-22 21:01 , Processed in 0.030983 second(s), 6 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表