Proactor模式下,分别从ACE_Service_Handler派生出2个类来做服务器处理和客户端处理类。 
建立一个连接后服务器端接收了36次数据后,再也收接收不到数据,跟踪进去,客户端那边的数据handle_write_stream()这个函数都调用了,但是服务器端的handle_read_stream()没有进去。 
 
客户端也存在这样的问题,但是接收到的数据次数多些。 
 
每次重新启动程序后,能接收到的数据次数是固定的。 
 
有人知道这个是什么问题引起的不?大虾指点下吧。谢谢了。 
环境是VC6.0+ACE5.3,MFC Dialog工程。 
 
下面把服务器/客护端处理类贴出来: 
 
// ProactorServer.h: interface for the CProactorServer class. 
// 
////////////////////////////////////////////////////////////////////// 
 
#ifndef _PROACTORSERVER_H__ 
#define _PROACTORSERVER_H__ 
 
#if _MSC_VER > 1000 
#pragma once 
#endif // _MSC_VER > 1000 
#include  "ace/Message_Queue.h" 
#include "ace/Asynch_IO.h" 
#include "ace/Proactor.h" 
#include "ace/Map_Manager.h"  
 
#define WM_READDATA_S    WM_USER + 100 
 
class CProactorServer  : public ACE_Service_Handler 
{ 
public: 
    CProactorServer(); 
    virtual ~CProactorServer(); 
 
     int WriteDate(char *szSendBuf, int nSendCnt); 
     void Release(); 
  
     virtual void open (ACE_HANDLE h, ACE_Message_Block&); 
     virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result); 
     virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result); 
 
    static CProactorServer *GetObj(){return _cur;} 
 private:     
    static CProactorServer *_cur; 
     ACE_Asynch_Read_Stream  reader_; 
     ACE_Asynch_Write_Stream writer_; 
     ACE_Message_Block        *msgblock_; 
}; 
 
#endif // !defined(_PROACTORSERVER_H__) 
 
 
// ProactorServer.cpp: implementation of the CProactorServer class. 
// 
////////////////////////////////////////////////////////////////////// 
 
#include "stdafx.h" 
#include "ProactorServer.h" 
//#include "ace/OS_NS_sys_socket.h" 
 
////////////////////////////////////////////////////////////////////// 
// Construction/Destruction 
////////////////////////////////////////////////////////////////////// 
 
CProactorServer *CProactorServer::_cur; 
CProactorServer::CProactorServer() 
{ 
    msgblock_ = NULL; 
} 
 
CProactorServer::~CProactorServer() 
{ 
    Release(); 
} 
 
void CProactorServer::Release() 
{ 
    if (this->handle () != ACE_INVALID_HANDLE) 
    { 
        this->reader_.cancel(); 
        this->writer_.cancel(); 
        ACE_OS::shutdown(this->handle_, ACE_SHUTDOWN_WRITE); 
        ACE_OS::closesocket (this->handle ()); 
        this->handle (ACE_INVALID_HANDLE); 
        if ( NULL != msgblock_ ) 
            msgblock_->release(); 
    }     
} 
  
 void CProactorServer::open (ACE_HANDLE h, ACE_Message_Block&) 
 { 
     _cur = this; 
     this->handle (h); 
      if (this->reader_.open (*this, h) != 0 ) 
      { 
          ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), 
              ACE_TEXT ("CProactorServer open reader_"))); 
          
          delete this; 
          return; 
      } 
     if (this->writer_.open (*this, h) != 0 ) 
     { 
         delete this; 
         ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), 
             ACE_TEXT ("CProactorServer open write_"))); 
         return; 
     } 
      
     msgblock_ = new ACE_Message_Block(1024); 
     if (this->reader_.read (*msgblock_, msgblock_->space ()) != 0) 
      { 
         ACE_TRACE("Begin read fail\n"); 
          return; 
      } 
      
     ACE_TRACE("Begin open OK \n"); 
 } 
  
 void CProactorServer::handle_read_stream(const ACE_Asynch_Read_Stream::Result &result) 
 { 
     ACE_Message_Block &mb = result.message_block (); 
     if (!result.success () || result.bytes_transferred () == 0) 
     { 
         this->Release(); 
         ACE_TRACE("Client Close. \r\n"); 
         return; 
     } 
      
     mb.copy("");       
      ACE_Message_Block *nmb = mb.clone(); 
  
      AfxGetMainWnd()->PostMessage(WM_READDATA_S, (WPARAM)this, (LPARAM)nmb); 
  
      if (this->reader_.read (*msgblock_, msgblock_->space ()) != 0) 
     { 
         ACE_TRACE("read data failed!"); 
     } 
 } 
  
 void CProactorServer::handle_write_stream(const ACE_Asynch_Write_Stream::Result &result) 
 { 
     ACE_Message_Block &mb = result.message_block (); 
     mb.release(); 
 } 
  
 int CProactorServer::WriteDate(char *szSendBuf, int nSendCnt) 
 { 
     ACE_Message_Block *smb = new ACE_Message_Block(nSendCnt+1); 
     smb->copy(szSendBuf); 
     smb->wr_ptr(nSendCnt); 
     int nResult = this->writer_.write(*smb, smb->length()); 
     if ( nResult != 0) 
     { 
         ACE_TRACE("Write data failed!"); 
     } 
  
     return nResult; 
 } 
 
// ProactorClient.h: interface for the CProactorClient class. 
// 
////////////////////////////////////////////////////////////////////// 
 
#ifndef _PROACTORCLIENT_H__ 
#define _PROACTORCLIENT_H__ 
 
#if _MSC_VER > 1000 
#pragma once 
#endif // _MSC_VER > 1000 
#include  "ace/Message_Queue.h" 
#include "ace/Asynch_IO.h" 
#include "ace/Proactor.h" 
 
#define WM_READDATA_C    WM_USER + 101 
 
class CProactorClient  : public ACE_Service_Handler 
{ 
public: 
    CProactorClient(); 
    virtual ~CProactorClient(); 
 
    int WriteDate(char *szSendBuf, int nSendCnt); 
    void Release(); 
 
    virtual void open (ACE_HANDLE h, ACE_Message_Block&); 
    virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result); 
    virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result); 
 
    static CProactorClient *GetObj(){return cur_;} 
private: 
    static CProactorClient *cur_; 
     
    ACE_Asynch_Read_Stream  reader_; 
    ACE_Asynch_Write_Stream writer_; 
 
    ACE_Message_Block        *msgblock_; 
}; 
 
#endif // !defined(_PROACTORCLIENT_H__) 
 
// ProactorServer.cpp: implementation of the CProactorClient class. 
// 
////////////////////////////////////////////////////////////////////// 
 
#include "stdafx.h" 
#include "ProactorClient.h" 
//#include "ace/OS_NS_sys_socket.h" 
 
////////////////////////////////////////////////////////////////////// 
// Construction/Destruction 
////////////////////////////////////////////////////////////////////// 
CProactorClient *CProactorClient::cur_ = NULL;; 
CProactorClient::CProactorClient() 
{ 
//    msgblock_ = new ACE_Message_Block(buffer,1024); 
} 
 
CProactorClient::~CProactorClient() 
{ 
    Release(); 
} 
 
void CProactorClient::Release() 
{ 
    if (this->handle () != ACE_INVALID_HANDLE) 
    { 
        this->reader_.cancel(); 
        this->writer_.cancel(); 
        ACE_OS::shutdown(this->handle_, ACE_SHUTDOWN_WRITE); 
        ACE_OS::closesocket (this->handle ()); 
        this->handle (ACE_INVALID_HANDLE);     
        if ( NULL != msgblock_ ) 
        { 
            delete msgblock_; 
            msgblock_ = NULL; 
        } 
    }     
} 
 
void CProactorClient::open (ACE_HANDLE h, ACE_Message_Block&) 
{ 
    cur_ = this; 
    this->handle (h); 
     if (this->reader_.open (*this, h) != 0 ) 
     { 
         ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), 
             ACE_TEXT ("CProactorClient open reader_"))); 
         
         delete this; 
         return; 
     } 
      if (this->writer_.open (*this) != 0 ) 
      { 
          delete this; 
          ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), 
              ACE_TEXT ("CProactorClient open write_"))); 
          return; 
      } 
     
    msgblock_ = new ACE_Message_Block(1024); 
     if (this->reader_.read (*msgblock_, msgblock_->space ()) != 0) 
     { 
        ACE_TRACE("Begin read fail\n"); 
         return; 
     } 
     
    ACE_TRACE("Begin open OK \n"); 
} 
 
void CProactorClient::handle_read_stream(const ACE_Asynch_Read_Stream::Result &result) 
{ 
    ACE_Message_Block &mb = result.message_block (); 
    if (!result.success () || result.bytes_transferred () == 0) 
    { 
        this->Release(); 
        ACE_TRACE("Client Close. \r\n");         
        return; 
    } 
     
    mb.copy("");         
    ACE_Message_Block *nmb = mb.clone(); 
    AfxGetMainWnd()->PostMessage(WM_READDATA_C, (WPARAM)this, (LPARAM)nmb); 
 
     if (this->reader_.read (*msgblock_, msgblock_->space ()) != 0) 
    { 
        ACE_TRACE("read data failed!"); 
    } 
} 
 
void CProactorClient::handle_write_stream(const ACE_Asynch_Write_Stream::Result &result) 
{ 
    ACE_Message_Block &mb = result.message_block (); 
    mb.release(); 
} 
 
int CProactorClient::WriteDate(char *szSendBuf, int nSendCnt) 
{ 
    ACE_Message_Block *smb = new ACE_Message_Block( nSendCnt+1); 
    smb->copy(szSendBuf); 
    smb->wr_ptr(nSendCnt); 
    int nResult = this->writer_.write(*smb, smb->length()); 
    if ( nResult != 0) 
    { 
        ACE_TRACE("Write data failed!"); 
    } 
 
    return nResult; 
} 
 
// 对话框对象就把主要操作函数贴下: 
// 连接操作对象: 
 
ACE_Asynch_Acceptor<CProactorServer> m_curAcceptor; 
ACE_Asynch_Connector<CProactorClient> m_curConnector; 
 
// 开始监听 
void CDlgMainProactor::OnBtStartlisten()  
{ 
     UpdateData(TRUE); 
//      if ( m_curAcceptor.open (ACE_INET_Addr (m_nListenPort),//read_size,1)==-1) 
 //             0, 0/*SOMAXCONN, 1, 0, 0, 1, 5*/) == -1) 
     if ( m_curAcceptor.open (ACE_INET_Addr (m_nListenPort),1024,1, 
    SOMAXCONN, 1, 0, 0, 1, 5) == -1) 
     { 
         AfxMessageBox("Listen on port %d failed!", m_nListenPort); 
     } 
} 
 
 
// 开始连接 
void CDlgMainProactor::OnBtConnect()  
{ 
    UpdateData(TRUE); 
    CString strLog; 
    ACE_INET_Addr addr(m_nConnectPort, (char*)(LPCSTR)m_strSvrIPAddr); 
         
    m_curConnector.open(); 
    if (m_curConnector.connect(addr) == -1) 
    { 
         
        strLog.Format("连接服务器%s:%d失败。\r\n", m_strSvrIPAddr, m_nConnectPort); 
    } 
    else 
    { 
        strLog.Format("连接服务器%s:%d成功。\r\n", m_strSvrIPAddr, m_nConnectPort); 
    } 
 
    m_strClientLog += strLog; 
    UpdateData(FALSE); 
} 
 
// 服务器端发送数据 
void CDlgMainProactor::OnBtSvrsend()  
{ 
    UpdateData(TRUE); 
    CProactorServer *curSvr = CProactorServer::GetObj(); 
    curSvr->WriteDate((char*)(LPCSTR)m_strSvrSend, m_strSvrSend.GetLength()); 
} 
 
// 客户端发送数据 
 
void CDlgMainProactor::OnBtClisend()  
{ 
     CProactorClient *cur = CProactorClient::GetObj(); 
     if ( NULL != cur ) 
     { 
         int nLen = m_strClientSendMsg.GetLength(); 
         cur->WriteDate(m_strClientSendMsg.GetBuffer(nLen), nLen); 
     } 
} |