Proactor模式下,分别从ACE_Service_Handler派生出2个类来做服务器处理和客户端处理类。
建立一个连接后服务器端接收了36次数据后,再也收接收不到数据,跟踪进去,客户端那边的数据handle_write_stream()这个函数都调用了,但是服务器端的handle_read_stream()没有进去。
客户端也存在这样的问题,但是接收到的数据次数多些。
每次重新启动程序后,能接收到的数据次数是固定的。
有人知道这个是什么问题引起的不?大虾指点下吧。谢谢了。
环境是VC6.0+ACE5.3,MFC Dialog工程。
下面把服务器/客护端处理类贴出来:
// ProactorServer.h: interface for the CProactorServer class.
//
//////////////////////////////////////////////////////////////////////
#ifndef _PROACTORSERVER_H__
#define _PROACTORSERVER_H__
#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000
#include "ace/Message_Queue.h"
#include "ace/Asynch_IO.h"
#include "ace/Proactor.h"
#include "ace/Map_Manager.h"
#define WM_READDATA_S WM_USER + 100
class CProactorServer : public ACE_Service_Handler
{
public:
CProactorServer();
virtual ~CProactorServer();
int WriteDate(char *szSendBuf, int nSendCnt);
void Release();
virtual void open (ACE_HANDLE h, ACE_Message_Block&);
virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result);
virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result);
static CProactorServer *GetObj(){return _cur;}
private:
static CProactorServer *_cur;
ACE_Asynch_Read_Stream reader_;
ACE_Asynch_Write_Stream writer_;
ACE_Message_Block *msgblock_;
};
#endif // !defined(_PROACTORSERVER_H__)
// ProactorServer.cpp: implementation of the CProactorServer class.
//
//////////////////////////////////////////////////////////////////////
#include "stdafx.h"
#include "ProactorServer.h"
//#include "ace/OS_NS_sys_socket.h"
//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////
CProactorServer *CProactorServer::_cur;
CProactorServer::CProactorServer()
{
msgblock_ = NULL;
}
CProactorServer::~CProactorServer()
{
Release();
}
void CProactorServer::Release()
{
if (this->handle () != ACE_INVALID_HANDLE)
{
this->reader_.cancel();
this->writer_.cancel();
ACE_OS::shutdown(this->handle_, ACE_SHUTDOWN_WRITE);
ACE_OS::closesocket (this->handle ());
this->handle (ACE_INVALID_HANDLE);
if ( NULL != msgblock_ )
msgblock_->release();
}
}
void CProactorServer::open (ACE_HANDLE h, ACE_Message_Block&)
{
_cur = this;
this->handle (h);
if (this->reader_.open (*this, h) != 0 )
{
ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),
ACE_TEXT ("CProactorServer open reader_")));
delete this;
return;
}
if (this->writer_.open (*this, h) != 0 )
{
delete this;
ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),
ACE_TEXT ("CProactorServer open write_")));
return;
}
msgblock_ = new ACE_Message_Block(1024);
if (this->reader_.read (*msgblock_, msgblock_->space ()) != 0)
{
ACE_TRACE("Begin read fail\n");
return;
}
ACE_TRACE("Begin open OK \n");
}
void CProactorServer::handle_read_stream(const ACE_Asynch_Read_Stream::Result &result)
{
ACE_Message_Block &mb = result.message_block ();
if (!result.success () || result.bytes_transferred () == 0)
{
this->Release();
ACE_TRACE("Client Close. \r\n");
return;
}
mb.copy("");
ACE_Message_Block *nmb = mb.clone();
AfxGetMainWnd()->PostMessage(WM_READDATA_S, (WPARAM)this, (LPARAM)nmb);
if (this->reader_.read (*msgblock_, msgblock_->space ()) != 0)
{
ACE_TRACE("read data failed!");
}
}
void CProactorServer::handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
{
ACE_Message_Block &mb = result.message_block ();
mb.release();
}
int CProactorServer::WriteDate(char *szSendBuf, int nSendCnt)
{
ACE_Message_Block *smb = new ACE_Message_Block(nSendCnt+1);
smb->copy(szSendBuf);
smb->wr_ptr(nSendCnt);
int nResult = this->writer_.write(*smb, smb->length());
if ( nResult != 0)
{
ACE_TRACE("Write data failed!");
}
return nResult;
}
// ProactorClient.h: interface for the CProactorClient class.
//
//////////////////////////////////////////////////////////////////////
#ifndef _PROACTORCLIENT_H__
#define _PROACTORCLIENT_H__
#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000
#include "ace/Message_Queue.h"
#include "ace/Asynch_IO.h"
#include "ace/Proactor.h"
#define WM_READDATA_C WM_USER + 101
class CProactorClient : public ACE_Service_Handler
{
public:
CProactorClient();
virtual ~CProactorClient();
int WriteDate(char *szSendBuf, int nSendCnt);
void Release();
virtual void open (ACE_HANDLE h, ACE_Message_Block&);
virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result);
virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result);
static CProactorClient *GetObj(){return cur_;}
private:
static CProactorClient *cur_;
ACE_Asynch_Read_Stream reader_;
ACE_Asynch_Write_Stream writer_;
ACE_Message_Block *msgblock_;
};
#endif // !defined(_PROACTORCLIENT_H__)
// ProactorServer.cpp: implementation of the CProactorClient class.
//
//////////////////////////////////////////////////////////////////////
#include "stdafx.h"
#include "ProactorClient.h"
//#include "ace/OS_NS_sys_socket.h"
//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////
CProactorClient *CProactorClient::cur_ = NULL;;
CProactorClient::CProactorClient()
{
// msgblock_ = new ACE_Message_Block(buffer,1024);
}
CProactorClient::~CProactorClient()
{
Release();
}
void CProactorClient::Release()
{
if (this->handle () != ACE_INVALID_HANDLE)
{
this->reader_.cancel();
this->writer_.cancel();
ACE_OS::shutdown(this->handle_, ACE_SHUTDOWN_WRITE);
ACE_OS::closesocket (this->handle ());
this->handle (ACE_INVALID_HANDLE);
if ( NULL != msgblock_ )
{
delete msgblock_;
msgblock_ = NULL;
}
}
}
void CProactorClient::open (ACE_HANDLE h, ACE_Message_Block&)
{
cur_ = this;
this->handle (h);
if (this->reader_.open (*this, h) != 0 )
{
ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),
ACE_TEXT ("CProactorClient open reader_")));
delete this;
return;
}
if (this->writer_.open (*this) != 0 )
{
delete this;
ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),
ACE_TEXT ("CProactorClient open write_")));
return;
}
msgblock_ = new ACE_Message_Block(1024);
if (this->reader_.read (*msgblock_, msgblock_->space ()) != 0)
{
ACE_TRACE("Begin read fail\n");
return;
}
ACE_TRACE("Begin open OK \n");
}
void CProactorClient::handle_read_stream(const ACE_Asynch_Read_Stream::Result &result)
{
ACE_Message_Block &mb = result.message_block ();
if (!result.success () || result.bytes_transferred () == 0)
{
this->Release();
ACE_TRACE("Client Close. \r\n");
return;
}
mb.copy("");
ACE_Message_Block *nmb = mb.clone();
AfxGetMainWnd()->PostMessage(WM_READDATA_C, (WPARAM)this, (LPARAM)nmb);
if (this->reader_.read (*msgblock_, msgblock_->space ()) != 0)
{
ACE_TRACE("read data failed!");
}
}
void CProactorClient::handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
{
ACE_Message_Block &mb = result.message_block ();
mb.release();
}
int CProactorClient::WriteDate(char *szSendBuf, int nSendCnt)
{
ACE_Message_Block *smb = new ACE_Message_Block( nSendCnt+1);
smb->copy(szSendBuf);
smb->wr_ptr(nSendCnt);
int nResult = this->writer_.write(*smb, smb->length());
if ( nResult != 0)
{
ACE_TRACE("Write data failed!");
}
return nResult;
}
// 对话框对象就把主要操作函数贴下:
// 连接操作对象:
ACE_Asynch_Acceptor<CProactorServer> m_curAcceptor;
ACE_Asynch_Connector<CProactorClient> m_curConnector;
// 开始监听
void CDlgMainProactor::OnBtStartlisten()
{
UpdateData(TRUE);
// if ( m_curAcceptor.open (ACE_INET_Addr (m_nListenPort),//read_size,1)==-1)
// 0, 0/*SOMAXCONN, 1, 0, 0, 1, 5*/) == -1)
if ( m_curAcceptor.open (ACE_INET_Addr (m_nListenPort),1024,1,
SOMAXCONN, 1, 0, 0, 1, 5) == -1)
{
AfxMessageBox("Listen on port %d failed!", m_nListenPort);
}
}
// 开始连接
void CDlgMainProactor::OnBtConnect()
{
UpdateData(TRUE);
CString strLog;
ACE_INET_Addr addr(m_nConnectPort, (char*)(LPCSTR)m_strSvrIPAddr);
m_curConnector.open();
if (m_curConnector.connect(addr) == -1)
{
strLog.Format("连接服务器%s:%d失败。\r\n", m_strSvrIPAddr, m_nConnectPort);
}
else
{
strLog.Format("连接服务器%s:%d成功。\r\n", m_strSvrIPAddr, m_nConnectPort);
}
m_strClientLog += strLog;
UpdateData(FALSE);
}
// 服务器端发送数据
void CDlgMainProactor::OnBtSvrsend()
{
UpdateData(TRUE);
CProactorServer *curSvr = CProactorServer::GetObj();
curSvr->WriteDate((char*)(LPCSTR)m_strSvrSend, m_strSvrSend.GetLength());
}
// 客户端发送数据
void CDlgMainProactor::OnBtClisend()
{
CProactorClient *cur = CProactorClient::GetObj();
if ( NULL != cur )
{
int nLen = m_strClientSendMsg.GetLength();
cur->WriteDate(m_strClientSendMsg.GetBuffer(nLen), nLen);
}
} |