Proactor 模式下数据发送与接收的问题。
Proactor模式下,分别从ACE_Service_Handler派生出2个类来做服务器处理和客户端处理类。建立一个连接后服务器端接收了36次数据后,再也收接收不到数据,跟踪进去,客户端那边的数据handle_write_stream()这个函数都调用了,但是服务器端的handle_read_stream()没有进去。
客户端也存在这样的问题,但是接收到的数据次数多些。
每次重新启动程序后,能接收到的数据次数是固定的。
有人知道这个是什么问题引起的不?大虾指点下吧。谢谢了。
环境是VC6.0+ACE5.3,MFC Dialog工程。
下面把服务器/客护端处理类贴出来:
// ProactorServer.h: interface for the CProactorServer class.
//
//////////////////////////////////////////////////////////////////////
#ifndef _PROACTORSERVER_H__
#define _PROACTORSERVER_H__
#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000
#include"ace/Message_Queue.h"
#include "ace/Asynch_IO.h"
#include "ace/Proactor.h"
#include "ace/Map_Manager.h"
#define WM_READDATA_S WM_USER + 100
class CProactorServer: public ACE_Service_Handler
{
public:
CProactorServer();
virtual ~CProactorServer();
int WriteDate(char *szSendBuf, int nSendCnt);
void Release();
virtual void open (ACE_HANDLE h, ACE_Message_Block&);
virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result);
virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result);
static CProactorServer *GetObj(){return _cur;}
private:
static CProactorServer *_cur;
ACE_Asynch_Read_Streamreader_;
ACE_Asynch_Write_Stream writer_;
ACE_Message_Block *msgblock_;
};
#endif // !defined(_PROACTORSERVER_H__)
// ProactorServer.cpp: implementation of the CProactorServer class.
//
//////////////////////////////////////////////////////////////////////
#include "stdafx.h"
#include "ProactorServer.h"
//#include "ace/OS_NS_sys_socket.h"
//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////
CProactorServer *CProactorServer::_cur;
CProactorServer::CProactorServer()
{
msgblock_ = NULL;
}
CProactorServer::~CProactorServer()
{
Release();
}
void CProactorServer::Release()
{
if (this->handle () != ACE_INVALID_HANDLE)
{
this->reader_.cancel();
this->writer_.cancel();
ACE_OS::shutdown(this->handle_, ACE_SHUTDOWN_WRITE);
ACE_OS::closesocket (this->handle ());
this->handle (ACE_INVALID_HANDLE);
if ( NULL != msgblock_ )
msgblock_->release();
}
}
void CProactorServer::open (ACE_HANDLE h, ACE_Message_Block&)
{
_cur = this;
this->handle (h);
if (this->reader_.open (*this, h) != 0 )
{
ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),
ACE_TEXT ("CProactorServer open reader_")));
delete this;
return;
}
if (this->writer_.open (*this, h) != 0 )
{
delete this;
ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),
ACE_TEXT ("CProactorServer open write_")));
return;
}
msgblock_ = new ACE_Message_Block(1024);
if (this->reader_.read (*msgblock_, msgblock_->space ()) != 0)
{
ACE_TRACE("Begin read fail\n");
return;
}
ACE_TRACE("Begin open OK \n");
}
void CProactorServer::handle_read_stream(const ACE_Asynch_Read_Stream::Result &result)
{
ACE_Message_Block &mb = result.message_block ();
if (!result.success () || result.bytes_transferred () == 0)
{
this->Release();
ACE_TRACE("Client Close. \r\n");
return;
}
mb.copy("");
ACE_Message_Block *nmb = mb.clone();
AfxGetMainWnd()->PostMessage(WM_READDATA_S, (WPARAM)this, (LPARAM)nmb);
if (this->reader_.read (*msgblock_, msgblock_->space ()) != 0)
{
ACE_TRACE("read data failed!");
}
}
void CProactorServer::handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
{
ACE_Message_Block &mb = result.message_block ();
mb.release();
}
int CProactorServer::WriteDate(char *szSendBuf, int nSendCnt)
{
ACE_Message_Block *smb = new ACE_Message_Block(nSendCnt+1);
smb->copy(szSendBuf);
smb->wr_ptr(nSendCnt);
int nResult = this->writer_.write(*smb, smb->length());
if ( nResult != 0)
{
ACE_TRACE("Write data failed!");
}
return nResult;
}
// ProactorClient.h: interface for the CProactorClient class.
//
//////////////////////////////////////////////////////////////////////
#ifndef _PROACTORCLIENT_H__
#define _PROACTORCLIENT_H__
#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000
#include"ace/Message_Queue.h"
#include "ace/Asynch_IO.h"
#include "ace/Proactor.h"
#define WM_READDATA_C WM_USER + 101
class CProactorClient: public ACE_Service_Handler
{
public:
CProactorClient();
virtual ~CProactorClient();
int WriteDate(char *szSendBuf, int nSendCnt);
void Release();
virtual void open (ACE_HANDLE h, ACE_Message_Block&);
virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result);
virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result);
static CProactorClient *GetObj(){return cur_;}
private:
static CProactorClient *cur_;
ACE_Asynch_Read_Streamreader_;
ACE_Asynch_Write_Stream writer_;
ACE_Message_Block *msgblock_;
};
#endif // !defined(_PROACTORCLIENT_H__)
// ProactorServer.cpp: implementation of the CProactorClient class.
//
//////////////////////////////////////////////////////////////////////
#include "stdafx.h"
#include "ProactorClient.h"
//#include "ace/OS_NS_sys_socket.h"
//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////
CProactorClient *CProactorClient::cur_ = NULL;;
CProactorClient::CProactorClient()
{
// msgblock_ = new ACE_Message_Block(buffer,1024);
}
CProactorClient::~CProactorClient()
{
Release();
}
void CProactorClient::Release()
{
if (this->handle () != ACE_INVALID_HANDLE)
{
this->reader_.cancel();
this->writer_.cancel();
ACE_OS::shutdown(this->handle_, ACE_SHUTDOWN_WRITE);
ACE_OS::closesocket (this->handle ());
this->handle (ACE_INVALID_HANDLE);
if ( NULL != msgblock_ )
{
delete msgblock_;
msgblock_ = NULL;
}
}
}
void CProactorClient::open (ACE_HANDLE h, ACE_Message_Block&)
{
cur_ = this;
this->handle (h);
if (this->reader_.open (*this, h) != 0 )
{
ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),
ACE_TEXT ("CProactorClient open reader_")));
delete this;
return;
}
if (this->writer_.open (*this) != 0 )
{
delete this;
ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),
ACE_TEXT ("CProactorClient open write_")));
return;
}
msgblock_ = new ACE_Message_Block(1024);
if (this->reader_.read (*msgblock_, msgblock_->space ()) != 0)
{
ACE_TRACE("Begin read fail\n");
return;
}
ACE_TRACE("Begin open OK \n");
}
void CProactorClient::handle_read_stream(const ACE_Asynch_Read_Stream::Result &result)
{
ACE_Message_Block &mb = result.message_block ();
if (!result.success () || result.bytes_transferred () == 0)
{
this->Release();
ACE_TRACE("Client Close. \r\n");
return;
}
mb.copy("");
ACE_Message_Block *nmb = mb.clone();
AfxGetMainWnd()->PostMessage(WM_READDATA_C, (WPARAM)this, (LPARAM)nmb);
if (this->reader_.read (*msgblock_, msgblock_->space ()) != 0)
{
ACE_TRACE("read data failed!");
}
}
void CProactorClient::handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
{
ACE_Message_Block &mb = result.message_block ();
mb.release();
}
int CProactorClient::WriteDate(char *szSendBuf, int nSendCnt)
{
ACE_Message_Block *smb = new ACE_Message_Block( nSendCnt+1);
smb->copy(szSendBuf);
smb->wr_ptr(nSendCnt);
int nResult = this->writer_.write(*smb, smb->length());
if ( nResult != 0)
{
ACE_TRACE("Write data failed!");
}
return nResult;
}
// 对话框对象就把主要操作函数贴下:
// 连接操作对象:
ACE_Asynch_Acceptor<CProactorServer> m_curAcceptor;
ACE_Asynch_Connector<CProactorClient> m_curConnector;
// 开始监听
void CDlgMainProactor::OnBtStartlisten()
{
UpdateData(TRUE);
// if ( m_curAcceptor.open (ACE_INET_Addr (m_nListenPort),//read_size,1)==-1)
// 0, 0/*SOMAXCONN, 1, 0, 0, 1, 5*/) == -1)
if ( m_curAcceptor.open (ACE_INET_Addr (m_nListenPort),1024,1,
SOMAXCONN, 1, 0, 0, 1, 5) == -1)
{
AfxMessageBox("Listen on port %d failed!", m_nListenPort);
}
}
// 开始连接
void CDlgMainProactor::OnBtConnect()
{
UpdateData(TRUE);
CString strLog;
ACE_INET_Addr addr(m_nConnectPort, (char*)(LPCSTR)m_strSvrIPAddr);
m_curConnector.open();
if (m_curConnector.connect(addr) == -1)
{
strLog.Format("连接服务器%s:%d失败。\r\n", m_strSvrIPAddr, m_nConnectPort);
}
else
{
strLog.Format("连接服务器%s:%d成功。\r\n", m_strSvrIPAddr, m_nConnectPort);
}
m_strClientLog += strLog;
UpdateData(FALSE);
}
// 服务器端发送数据
void CDlgMainProactor::OnBtSvrsend()
{
UpdateData(TRUE);
CProactorServer *curSvr = CProactorServer::GetObj();
curSvr->WriteDate((char*)(LPCSTR)m_strSvrSend, m_strSvrSend.GetLength());
}
// 客户端发送数据
void CDlgMainProactor::OnBtClisend()
{
CProactorClient *cur = CProactorClient::GetObj();
if ( NULL != cur )
{
int nLen = m_strClientSendMsg.GetLength();
cur->WriteDate(m_strClientSendMsg.GetBuffer(nLen), nLen);
}
} void CProactorServer::handle_read_stream(const ACE_Asynch_Read_Stream::Result &result)
void CProactorServer::handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
我自己判断,这两个函数有点问题,你没有处理收发不完整的情况。
另外,判断收不到的事情,可以用网络探测软件来追踪。如Ethereal等。 利用天网防火墙监控网络传输的数据,发现发送与接收的数据相等。
就算是handle_read_stream()没有调用的时候网络中的数据也已经完成了。
应该是代码中哪里处理不正确,导致ACE库没有响应handle_read_stream(),但是这个问题我该如何去考虑呢?
还有就是能否提供收发不完整的情况的处理例子? 问题原因发现了:
msgblock_->reset();
if (this->reader_.read (*msgblock_, msgblock_->space ()) != 0)
这样修改就可以了,消息块使用完了。
这个问题解决了,详细操作了一会,又发现一个新的问题:
就是第一次按发送按钮的时候的数据没有接收到。。。
再仔细看了看,连接建立后,Server类的open()函数还没被调用,只有当client类调用一次write操作后,Server类的open()函数才会被调用,为什么会这样子的呢? 连接建立没有那么快,需要等待连接完成。 感觉不是连接没有建立完成。
connect()后,CProactorClient类的open()马上就执行了。CProactorServer类里面的open()怎么等就是没执行,
只有CProactorClient对象执行一次write操作后,CProactorServer里面的open()才调用。是不是这write操作后才会产生一个CProactorServer对象。。。。。。
从ACE_Asynch_Acceptor派生了一个类,重载了handle_accept()函数,发现connect()时,并没有执行handle_accept(),只有当CProactorClient对象进行了一个write操作后,才会调用到handle_accept()。
为什么会这样呢?
看ACE代码,看得晕头转向。。。 不懂ACE到一定程度,看ACE代码没用,要看卷1、卷2和ACE程序员指南。
你的代码有问题。CProactorServer正常情况下,在建立连接后,会产生一个对象,调用open方法。
另外,用不着自己改写handle_accept(),那是系统自己做的。
直接用异步接收器调用即可,会自动生成CProactorServer。 楼上的,拜托说点有价值的东东好不。您第一个回答对我帮助很大,使我少走了不少弯路,但上面的回答就有点。。。。。。
如果一个熟悉Proactor的应该很容易就能发现问题在那里了。
PS :用不着自己改写handle_accept(), handle_accept()还虚函数来的,如果应用有需要,为什么不重载?
再PS: 我这里重载handle_accept()是为了确认connect()的时候有没有掉用该函数,直接跟踪到ACE代码里面去调试不是很方便。
代码有问题是肯定的了,问题就是这个问题出在哪里呢? 感谢CCTV,感谢acejoy,感谢winston。 @_@
问题慢慢查了。
希望acejoy.com能越办越好,能吸引些ACE高手来露下脸过来支持下。
如果大家都是为了问问题而来这里的话就没意思了。到最后问题都没人回答。 写了一半,浏览器死锁了。
看情况,你还不理解Proactor - 没读那几本书?,Proactor是一整套体系,随便贴点代码,就想找到隐藏的问题,这个是很困难的,不然只能是不断猜测。最好有工程文件。
ACEJOY是个公益论坛,大家回答问题,纯粹是出于对ACE系统的热爱和发扬助人为乐的精神,没有义务一定要解决别人的问题。主要还是靠自己的,这里只能提供辅助的帮助。
页:
[1]
2