redf0x 发表于 2009-7-17 21:35:13

使用ACE Proactor的一个问题

斑竹好,大家好,学习ACE中,使用proactor的程序在使用中偶尔会在ace的这段代码里面出错
ACE_INLINE void
ACE_Message_Block::wr_ptr (size_t n)
{
ACE_TRACE ("ACE_Message_Block::wr_ptr");
this->wr_ptr_ += n;
}
也就是说this已经是一个非法指针了,仍旧在访问wr_ptr,检查来检查去也不知道是哪里的问题,希望大家帮忙,先谢谢了

程序只有一个线程,流程是这样的,一个异步的acceptor一直处于监听网络连接状态,当有新的连接进来时,创建一个ClientNetworkHandler对象和
一个Client对象,在Client对象的构造函数里面启动一个定时器,当这个client过了10秒钟还没有收到512个字节的数据的话,就将这个Client和ClientNetworkHandler
删除,ClientNetworkHandler用于从网络上接收数据,接收到数据之后放到Client对象的缓冲区中,由Client对象处理接收到的数据,
每从网络上接收到4个字节的数据,则通过ClientNetworkHandler回应10K的数据。
ClientNetworkHandler从ACE_Service_Handler和ACE_Task<ACE_MT_SYNCH>上派生,当socket不可写时,则先通过putq将数据放在
ClientNetworkHandler的缓冲区(也就是ACE_Task<ACE_MT_SYNCH>的message_queue)中,在下一次socket可写时,再继续发送
如果网络出错,则将Client和ClientNetworkHandler的对象删除
在ClientNetworkHandler的析构函数里面关闭socket句柄和释放message_queue中的message_block,下面是ClientNetworkHandler与Client的代码

Class Client;

class ClientNetworkHandler : public ACE_Service_Handler, public ACE_Task<ACE_MT_SYNCH>
{
public:
    static const int    client_message_head_size = 8;
    static const int    max_http_request_size = 10240;

private:
    Client*    m_client;                                          //the client of this handler
    rw::net::NetAddress m_remote;                  //the client address of this handler
    rw::net::NetAddress m_local;                  //the server address of this hanlder

    ACE_Asynch_Read_Stream    m_reader;                //reader read from network
    ACE_Asynch_Write_Stream    m_writer;                //writer write to network

    ACE_Message_Block      *m_readBlock;                //temp data read from network
   
    bool                m_canWrite;                                    //if the socket can write now, if we've pending a write operation on this client, we assume that the socket cannot
                                                                                    //be write, else the socket can be write

public:
    ClientNetworkHandler(Client* client = NULL);
    virtual ~ClientNetworkHandler();

public:
    void dispatchReceived(ACE_Message_Block* data);
    void write(ACE_Message_Block* block = NULL);

public:
    virtual void open(ACE_HANDLE new_handle, ACE_Message_Block& message_block);
    virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result& result);
    virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result& result);
    virtual void addresses(const ACE_INET_Addr& remote_address, const ACE_INET_Addr& local_address);

public:
    void    drop();
    void    send(const ACE_Message_Block* data);

    const rw::net::NetAddress& remote() const{ return m_remote; }
};

ClientNetworkHandler::ClientNetworkHandler(Client* client)
: m_readBlock(NULL), m_canWrite(true)
{
    m_client = client;
}

ClientNetworkHandler::~ClientNetworkHandler()
{
    //关闭socket句柄
    if(handle() != ACE_INVALID_HANDLE){
      ACE_OS::closesocket(handle());
    }
   
    //释放从网络上读取数据的message_block
    if(m_readBlock)    m_readBlock->release();
    m_readBlock = NULL;

//将这个对象里面存储的还没有发送出去的message_block释放掉
    ACE_Message_Block* block = NULL;
    ACE_Time_Value non_block(0, 0);
    this->getq(block, &non_block);
    while(block != NULL){
      block->release();
      block = NULL;
      this->getq(block, &non_block);
    }

    m_client = NULL;
    m_canWrite = false;
   
    //释放读stream
    m_reader.cancel();
    //释放写stream
    m_writer.cancel();
}


//将从网络上读取的数据交给对应的client处理
void
ClientNetworkHandler::dispatchReceived(ACE_Message_Block *data)
{
    if(m_client)    m_client->handleReceived(data);
}

//往网络上写数据
void
ClientNetworkHandler::write(ACE_Message_Block *block)
{
    //如果写的是null,则从本对象的发送缓存里面取message_block进行发送
    if(block == NULL){
      ACE_Time_Value    non_block(0);
      this->getq(block, &non_block);
    }
    if(block){
      m_canWrite = false;
      if(m_writer.write(*block, block->length()) == -1){
            //将发送出去的block从本对象的发送缓存中移除
            ungetq(block);
      }
    }
}

//初始化
void
ClientNetworkHandler::open(ACE_HANDLE new_handle, ACE_Message_Block &message_block)
{
    if(new_handle != ACE_INVALID_HANDLE)    handle(new_handle);
    if(m_reader.open(*this) < 0){
      rf_log(RW_TEXT("open reader failed: ") << errno, RFLogger::LL_fatal);
      drop();
      return;
    }
    if(m_writer.open(*this) < 0){
      rf_log(RW_TEXT("open writer failed: ") << errno, RFLogger::LL_fatal);
      drop();
      return;   
    }
    ACE_NEW_NORETURN(m_readBlock, ACE_Message_Block(max_http_request_size));
    if(m_readBlock == NULL){
      rf_log(RW_TEXT("new message block failed: ") << errno, RFLogger::LL_fatal);
      drop();
      return;
    }
    m_reader.read(*m_readBlock, max_http_request_size);
}

//处理从网络上读取的数据
void
ClientNetworkHandler::handle_read_stream(const ACE_Asynch_Read_Stream::Result& result)
{
    if(result.handle() != handle()){
      return ;
    }
    if(!result.success() ||
      (result.bytes_to_read() != 0 && result.bytes_transferred() == 0)){
      //如果读取失败,则关闭这个对象
            rf_log(RW_TEXT("socket closed by client:") << charsetAlign(m_remote.toString()).c_str(), RFLogger::LL_debug);
            drop();
            return;
    } else {
      //如果读取到了数据则将数据交给相应的client对象处理
      dispatchReceived(m_readBlock);
      m_readBlock->reset();
      m_reader.read(*m_readBlock, max_http_request_size);
    }
}

//处理网络的写事件
void
ClientNetworkHandler::handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
{
    ACE_Message_Block &temp_block = result.message_block();
    if(!result.success()){
      //如果写失败了,则关闭这个对象
      drop();
      return;
    } else {
      if(handle() != result.handle()){
      //如果穿回来了错误的句柄,则关闭这个对象
            m_canWrite = false;
            drop();
            return;
      }
      m_canWrite = true;
      if (temp_block.length () == 0) {
          //如果返回的message_block已经完全发送了,则将这个message_block释放
            temp_block.release();
            //同时从缓冲区中取出新的message_block进行发送
            write();
      } else {
         //如果返回的message_block还没有发送完成,则继续发送这个message_block剩下的数据
            write(&temp_block);
      }
    }
}

void
ClientNetworkHandler::addresses(const ACE_INET_Addr &remote_address, const ACE_INET_Addr &local_address)
{
    m_remote.family(remote_address.get_type());
    m_remote.ip(ACE_HTONL(remote_address.get_ip_address()));
    m_remote.port(remote_address.get_port_number());
    m_local.family(local_address.get_type());
    m_local.ip(ACE_HTONL(local_address.get_ip_address()));
    m_local.port(local_address.get_port_number());

    //cout << "remote address: " << m_remote.toString() << " local address: " << m_local.toString() << endl;
}

//关闭这个网络数据处理对象
void
ClientNetworkHandler::drop()
{
    //如果这个网络对象有相应的client,则告诉这个client这个网络对象已经关闭
    if(m_client)    m_client->handleHandlerClosed();
    else{
    //否则把自己给删除了
      rf_log(RW_TEXT("no client for this handler: ") << charsetAlign(m_remote.toString()).c_str(), RFLogger::LL_fatal);
      delete this;
    }
}

void
ClientNetworkHandler::send(const ACE_Message_Block* data)
{
    //如果可以直接往网络上发送数据,则将data发送给网络
    if(m_canWrite)    write(const_cast<ACE_Message_Block*>(data));
    //如果不行,则将这个数据先放到缓冲区中,等待下次调用handle_write_stream时再进行发送
    else            this->putq(const_cast<ACE_Message_Block*>(data));
}

class Client : public ACE_Handler
{
public:
    static const int    client_id_size = 20;            //id size of client
    static const int    max_initial_timeout = 10;    //the maximum time we wait for the client to set it's initial imformation, 10 seconds

protected:
    ClientNetworkHandler*    m_networkHandler;            //handler of this client

    char                              m_buffer;                  //接收客户数据的缓冲区(客户发送的最多数据不会超过1024个字节)
    int                                    m_currentBufferLength;    //当期缓冲区数据总长度
    int                                    m_currentParseOffset;      //当前解析到的数据偏移

public:
    Client();
    virtual ~Client();

public:
    virtual void handle_time_out(const ACE_Time_Value& tv, const void* art = 0);

    void    handleReceived(ACE_Message_Block* msg_block);
    void    handleHandlerClosed();

    void dropSelf();

public:
    int      processInputBuffer();

    void    sendData(const ACE_Message_Block* data);
};

Client::Client()
: m_networkHandler(NULL), m_currentBufferLength(0),m_currentParseOffset(0)
{
   ACE_Proactor::instance()->schedule_timer(*this, (void*)1, ACE_Time_Value(10));
}

Client::~Client()
{
}

void
Client::handle_time_out(const ACE_Time_Value &tv, const void *art)
{
    int timer = (int)art;
    switch(timer)
    {
    case 1:
      {
            if(m_currentBufferLength < 512){
                dropSelf();
            }
      }
      break;
    }
}

void
Client::handleReceived(ACE_Message_Block* msg_block)
{
    if(msg_block->length() + m_currentBufferLength > 1024){
      //如果数据大于所规定的最大数据,则关闭这个客户
      dropSelf();
      return;
    }
   
    //将数据复制到客户的缓冲区中
    memcpy(m_buffer + m_currentBufferLength, msg_block->rd_ptr(), msg_block->length());
    m_currentBufferLength += msg_block->length();

//处理缓冲区中的数据
    processInputBuffer();
}

void
Client::handleHandlerClosed()
{
    //如果客户的网络处理对象关了,则把这个客户也关了
    dropSelf();
}

void
Client::dropSelf()
{
    //delete这个客户的网络处理对象
    if(m_networkHandler)    delete m_networkHandler;
    m_networkHandler = NULL;
   
    //delete这个客户
    delete this;
}

int
Client::processInputBuffer()
{
    while(true){
      if(m_currentParseOffset + 4 > m_currentBufferLength)    break;//如果当前解析数据的便宜已经大于当前缓冲区长度
      m_currentParseOffset += 4;    //解析偏移4个字节
      
      //客户没发送4个字节的数据上来,回应10K的数据回去
      ACE_Message_Block* ret_data;
      ACE_NEW_NORETURN(ret_data, ACE_Message_Block(10240));
      send(ret_data);
    }

    return 0;
}

void
Client::sendData(const ACE_Message_Block* data)
{
    m_networkHandler->send(data);
}

redf0x 发表于 2009-7-17 23:28:36

怎么这么多人路过不回复啊。。。
大家帮帮忙啊

redf0x 发表于 2009-7-18 10:08:27

路过的人很多,还是没人回答,是太简单了么?还是大家都过周末去了啊?
继续求助

winston 发表于 2009-7-18 14:33:01

代码问题很多的。
下面的代码,完全没有考虑到任何同步的问题。
Proactor架构难点就在于线程、异步、对象相互结合,稍有理解不当,就会出现错误。
void Client::dropSelf()
{
    //delete这个客户的网络处理对象
    if(m_networkHandler)    delete m_networkHandler;//删除时候,此对象正在注册接受信息,怎么能如此删除?
    m_networkHandler = NULL;
   
    //delete这个客户
    delete this;
}

winston 发表于 2009-7-18 14:33:35

你可以把所有的delete this先屏蔽,看看结果是不是不同。

redf0x 发表于 2009-7-18 17:26:20

谢谢管理员的回答,由于刚刚接触ACE没多久,看了它所介绍的那两本书(n1/n2),看了它所介绍的关于reactor与proactor的代码,参照它的一个示例所写的一个程序进行测试,没有考虑太多其它的东西,结果运行一段时间后就会出现上面的问题

这个错误比较很难重现,不过管理员说得很有道理,这里delete自身的话可能会有问题
dropSelf有两种情况下可能会被调用
1.网络错误,也就是对方关闭了这个socket的连接,这个时候应该不会出现你说的这个问题
2.超时时仍旧没有接收到足够多的数据,这时可能出现你说的这个问题

如果不这样删除的话,想实现超时时把这个socket关闭并删除这个对象应该如何实现呢?
或者有没有办法把相应的socket从proactor里面解注册?难道在析构函数中的closesocket和取消reader、writer不能将这个socket的事件从proactor中解注册掉吗?

还有就是管理员说代码问题很多,还有其它哪些问题呢?大家可否多加指点?先谢谢了

[ 本帖最后由 redf0x 于 2009-7-18 17:46 编辑 ]

winston 发表于 2009-7-18 17:57:14

这个涉及到对Proactor的理解。
看完下面的帖子:
http://www.acejoy.com/bbs/viewthread.php?tid=1126
http://www.acejoy.com/bbs/viewthread.php?tid=1178&extra=page%3D1
http://www.acejoy.com/bbs/viewthread.php?tid=690&extra=page%3D1

redf0x 发表于 2009-7-18 18:19:50

谢谢管理员的回答,昨晚发贴时,已经把这三个帖子看过了,没有仔细理解,刚才又仔细看了一下
我的理解是这样的:
   在主动调用closesocket时,ClientNetworkHandler投递了几个操作在proactor里面,ace就一定还会回调回调几次该ClientNetworkHandler相应的handle_read_stream或者相应的handle_write_stream,调用完这些相应的函数之后,proactor就会把ClientNetworkHandler从里面解注册,不知是否这样?这个时候调用的handle_read_stream或者handle_write_stream中的result的应该是失败吧?也就是result的success()函数返回假?
   那么在我的实例里面的话超时析构之后有可能handle_read_stream和handle_write_stream都还会被回调?虽然我的是单线程?

redf0x 发表于 2009-7-18 18:23:27

但是如果是这样的话,为什么出错是在ACE的这段代码里面呢?
ACE_INLINE void
ACE_Message_Block::wr_ptr (size_t n)
{
ACE_TRACE ("ACE_Message_Block::wr_ptr");
this->wr_ptr_ += n;
}
是因为即使我close了ClientNetworkHandler的这个socket之后proactor还在试图从ClientNetworkHandler的buffer里面来读取数据进行发送吗?

redf0x 发表于 2009-7-18 20:05:15

我刚才测试了一下,在open的时候用reader去read网络事件,超时时关闭这个socket,发现ClientNetworkHandler的handler_read_stream并没有被调用,这是为什么?
页: [1] 2
查看完整版本: 使用ACE Proactor的一个问题