找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 6944|回复: 13

Proactor 模式下数据发送与接收的问题。

[复制链接]
发表于 2008-7-7 17:22:25 | 显示全部楼层 |阅读模式
Proactor模式下,分别从ACE_Service_Handler派生出2个类来做服务器处理和客户端处理类。
建立一个连接后服务器端接收了36次数据后,再也收接收不到数据,跟踪进去,客户端那边的数据handle_write_stream()这个函数都调用了,但是服务器端的handle_read_stream()没有进去。

客户端也存在这样的问题,但是接收到的数据次数多些。

每次重新启动程序后,能接收到的数据次数是固定的。

有人知道这个是什么问题引起的不?大虾指点下吧。谢谢了。
环境是VC6.0+ACE5.3,MFC Dialog工程。

下面把服务器/客护端处理类贴出来:

// ProactorServer.h: interface for the CProactorServer class.
//
//////////////////////////////////////////////////////////////////////

#ifndef _PROACTORSERVER_H__
#define _PROACTORSERVER_H__

#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000
#include  "ace/Message_Queue.h"
#include "ace/Asynch_IO.h"
#include "ace/Proactor.h"
#include "ace/Map_Manager.h"

#define WM_READDATA_S    WM_USER + 100

class CProactorServer  : public ACE_Service_Handler
{
public:
    CProactorServer();
    virtual ~CProactorServer();

     int WriteDate(char *szSendBuf, int nSendCnt);
     void Release();

     virtual void open (ACE_HANDLE h, ACE_Message_Block&);
     virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result);
     virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result);

    static CProactorServer *GetObj(){return _cur;}
private:   
    static CProactorServer *_cur;
     ACE_Asynch_Read_Stream  reader_;
     ACE_Asynch_Write_Stream writer_;
     ACE_Message_Block        *msgblock_;
};

#endif // !defined(_PROACTORSERVER_H__)


// ProactorServer.cpp: implementation of the CProactorServer class.
//
//////////////////////////////////////////////////////////////////////

#include "stdafx.h"
#include "ProactorServer.h"
//#include "ace/OS_NS_sys_socket.h"

//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////

CProactorServer *CProactorServer::_cur;
CProactorServer::CProactorServer()
{
    msgblock_ = NULL;
}

CProactorServer::~CProactorServer()
{
    Release();
}

void CProactorServer::Release()
{
    if (this->handle () != ACE_INVALID_HANDLE)
    {
        this->reader_.cancel();
        this->writer_.cancel();
        ACE_OS::shutdown(this->handle_, ACE_SHUTDOWN_WRITE);
        ACE_OS::closesocket (this->handle ());
        this->handle (ACE_INVALID_HANDLE);
        if ( NULL != msgblock_ )
            msgblock_->release();
    }   
}

void CProactorServer::open (ACE_HANDLE h, ACE_Message_Block&)
{
     _cur = this;
     this->handle (h);
      if (this->reader_.open (*this, h) != 0 )
      {
          ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),
              ACE_TEXT ("CProactorServer open reader_")));
         
          delete this;
          return;
      }
     if (this->writer_.open (*this, h) != 0 )
     {
         delete this;
         ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),
             ACE_TEXT ("CProactorServer open write_")));
         return;
     }
     
     msgblock_ = new ACE_Message_Block(1024);
     if (this->reader_.read (*msgblock_, msgblock_->space ()) != 0)
      {
         ACE_TRACE("Begin read fail\n");
          return;
      }
     
     ACE_TRACE("Begin open OK \n");
}

void CProactorServer::handle_read_stream(const ACE_Asynch_Read_Stream::Result &result)
{
     ACE_Message_Block &mb = result.message_block ();
     if (!result.success () || result.bytes_transferred () == 0)
     {
         this->Release();
         ACE_TRACE("Client Close. \r\n");
         return;
     }
     
     mb.copy("");      
      ACE_Message_Block *nmb = mb.clone();

      AfxGetMainWnd()->PostMessage(WM_READDATA_S, (WPARAM)this, (LPARAM)nmb);

      if (this->reader_.read (*msgblock_, msgblock_->space ()) != 0)
     {
         ACE_TRACE("read data failed!");
     }
}

void CProactorServer::handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
{
     ACE_Message_Block &mb = result.message_block ();
     mb.release();
}

int CProactorServer::WriteDate(char *szSendBuf, int nSendCnt)
{
     ACE_Message_Block *smb = new ACE_Message_Block(nSendCnt+1);
     smb->copy(szSendBuf);
     smb->wr_ptr(nSendCnt);
     int nResult = this->writer_.write(*smb, smb->length());
     if ( nResult != 0)
     {
         ACE_TRACE("Write data failed!");
     }

     return nResult;
}

// ProactorClient.h: interface for the CProactorClient class.
//
//////////////////////////////////////////////////////////////////////

#ifndef _PROACTORCLIENT_H__
#define _PROACTORCLIENT_H__

#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000
#include  "ace/Message_Queue.h"
#include "ace/Asynch_IO.h"
#include "ace/Proactor.h"

#define WM_READDATA_C    WM_USER + 101

class CProactorClient  : public ACE_Service_Handler
{
public:
    CProactorClient();
    virtual ~CProactorClient();

    int WriteDate(char *szSendBuf, int nSendCnt);
    void Release();

    virtual void open (ACE_HANDLE h, ACE_Message_Block&);
    virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result);
    virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result);

    static CProactorClient *GetObj(){return cur_;}
private:
    static CProactorClient *cur_;
   
    ACE_Asynch_Read_Stream  reader_;
    ACE_Asynch_Write_Stream writer_;

    ACE_Message_Block        *msgblock_;
};

#endif // !defined(_PROACTORCLIENT_H__)

// ProactorServer.cpp: implementation of the CProactorClient class.
//
//////////////////////////////////////////////////////////////////////

#include "stdafx.h"
#include "ProactorClient.h"
//#include "ace/OS_NS_sys_socket.h"

//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////
CProactorClient *CProactorClient::cur_ = NULL;;
CProactorClient::CProactorClient()
{
//    msgblock_ = new ACE_Message_Block(buffer,1024);
}

CProactorClient::~CProactorClient()
{
    Release();
}

void CProactorClient::Release()
{
    if (this->handle () != ACE_INVALID_HANDLE)
    {
        this->reader_.cancel();
        this->writer_.cancel();
        ACE_OS::shutdown(this->handle_, ACE_SHUTDOWN_WRITE);
        ACE_OS::closesocket (this->handle ());
        this->handle (ACE_INVALID_HANDLE);   
        if ( NULL != msgblock_ )
        {
            delete msgblock_;
            msgblock_ = NULL;
        }
    }   
}

void CProactorClient::open (ACE_HANDLE h, ACE_Message_Block&)
{
    cur_ = this;
    this->handle (h);
     if (this->reader_.open (*this, h) != 0 )
     {
         ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),
             ACE_TEXT ("CProactorClient open reader_")));
        
         delete this;
         return;
     }
      if (this->writer_.open (*this) != 0 )
      {
          delete this;
          ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),
              ACE_TEXT ("CProactorClient open write_")));
          return;
      }
   
    msgblock_ = new ACE_Message_Block(1024);
     if (this->reader_.read (*msgblock_, msgblock_->space ()) != 0)
     {
        ACE_TRACE("Begin read fail\n");
         return;
     }
   
    ACE_TRACE("Begin open OK \n");
}

void CProactorClient::handle_read_stream(const ACE_Asynch_Read_Stream::Result &result)
{
    ACE_Message_Block &mb = result.message_block ();
    if (!result.success () || result.bytes_transferred () == 0)
    {
        this->Release();
        ACE_TRACE("Client Close. \r\n");        
        return;
    }
   
    mb.copy("");        
    ACE_Message_Block *nmb = mb.clone();
    AfxGetMainWnd()->PostMessage(WM_READDATA_C, (WPARAM)this, (LPARAM)nmb);

     if (this->reader_.read (*msgblock_, msgblock_->space ()) != 0)
    {
        ACE_TRACE("read data failed!");
    }
}

void CProactorClient::handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
{
    ACE_Message_Block &mb = result.message_block ();
    mb.release();
}

int CProactorClient::WriteDate(char *szSendBuf, int nSendCnt)
{
    ACE_Message_Block *smb = new ACE_Message_Block( nSendCnt+1);
    smb->copy(szSendBuf);
    smb->wr_ptr(nSendCnt);
    int nResult = this->writer_.write(*smb, smb->length());
    if ( nResult != 0)
    {
        ACE_TRACE("Write data failed!");
    }

    return nResult;
}

// 对话框对象就把主要操作函数贴下:
// 连接操作对象:

ACE_Asynch_Acceptor<CProactorServer> m_curAcceptor;
ACE_Asynch_Connector<CProactorClient> m_curConnector;

// 开始监听
void CDlgMainProactor::OnBtStartlisten()
{
     UpdateData(TRUE);
//      if ( m_curAcceptor.open (ACE_INET_Addr (m_nListenPort),//read_size,1)==-1)
//             0, 0/*SOMAXCONN, 1, 0, 0, 1, 5*/) == -1)
     if ( m_curAcceptor.open (ACE_INET_Addr (m_nListenPort),1024,1,
    SOMAXCONN, 1, 0, 0, 1, 5) == -1)
     {
         AfxMessageBox("Listen on port %d failed!", m_nListenPort);
     }
}


// 开始连接
void CDlgMainProactor::OnBtConnect()
{
    UpdateData(TRUE);
    CString strLog;
    ACE_INET_Addr addr(m_nConnectPort, (char*)(LPCSTR)m_strSvrIPAddr);
        
    m_curConnector.open();
    if (m_curConnector.connect(addr) == -1)
    {
        
        strLog.Format("连接服务器%s:%d失败。\r\n", m_strSvrIPAddr, m_nConnectPort);
    }
    else
    {
        strLog.Format("连接服务器%s:%d成功。\r\n", m_strSvrIPAddr, m_nConnectPort);
    }

    m_strClientLog += strLog;
    UpdateData(FALSE);
}

// 服务器端发送数据
void CDlgMainProactor::OnBtSvrsend()
{
    UpdateData(TRUE);
    CProactorServer *curSvr = CProactorServer::GetObj();
    curSvr->WriteDate((char*)(LPCSTR)m_strSvrSend, m_strSvrSend.GetLength());
}

// 客户端发送数据

void CDlgMainProactor::OnBtClisend()
{
     CProactorClient *cur = CProactorClient::GetObj();
     if ( NULL != cur )
     {
         int nLen = m_strClientSendMsg.GetLength();
         cur->WriteDate(m_strClientSendMsg.GetBuffer(nLen), nLen);
     }
}
发表于 2008-7-7 21:35:38 | 显示全部楼层
void CProactorServer::handle_read_stream(const ACE_Asynch_Read_Stream::Result &result)
void CProactorServer::handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
我自己判断,这两个函数有点问题,你没有处理收发不完整的情况。
另外,判断收不到的事情,可以用网络探测软件来追踪。如Ethereal等。
 楼主| 发表于 2008-7-8 10:36:43 | 显示全部楼层
利用天网防火墙监控网络传输的数据,发现发送与接收的数据相等。

就算是handle_read_stream()没有调用的时候网络中的数据也已经完成了。

应该是代码中哪里处理不正确,导致ACE库没有响应handle_read_stream(),但是这个问题我该如何去考虑呢?

还有就是能否提供收发不完整的情况的处理例子?
 楼主| 发表于 2008-7-8 11:04:04 | 显示全部楼层
问题原因发现了:
msgblock_->reset();
if (this->reader_.read (*msgblock_, msgblock_->space ()) != 0)

这样修改就可以了,消息块使用完了。
这个问题解决了,详细操作了一会,又发现一个新的问题:
就是第一次按发送按钮的时候的数据没有接收到。。。

再仔细看了看,连接建立后,Server类的open()函数还没被调用,只有当client类调用一次write操作后,Server类的open()函数才会被调用,为什么会这样子的呢?
发表于 2008-7-8 11:15:57 | 显示全部楼层
连接建立没有那么快,需要等待连接完成。
 楼主| 发表于 2008-7-8 17:45:45 | 显示全部楼层
感觉不是连接没有建立完成。

connect()后,CProactorClient类的open()马上就执行了。CProactorServer类里面的open()怎么等就是没执行,
只有CProactorClient对象执行一次write操作后,CProactorServer里面的open()才调用。是不是这write操作后才会产生一个CProactorServer对象。。。。。。

从ACE_Asynch_Acceptor派生了一个类,重载了handle_accept()函数,发现connect()时,并没有执行handle_accept(),只有当CProactorClient对象进行了一个write操作后,才会调用到handle_accept()。

为什么会这样呢?

看ACE代码,看得晕头转向。。。
发表于 2008-7-8 19:39:56 | 显示全部楼层
不懂ACE到一定程度,看ACE代码没用,要看卷1、卷2和ACE程序员指南。
你的代码有问题。CProactorServer正常情况下,在建立连接后,会产生一个对象,调用open方法。
另外,用不着自己改写handle_accept(),那是系统自己做的。
直接用异步接收器调用即可,会自动生成CProactorServer。
 楼主| 发表于 2008-7-9 00:32:29 | 显示全部楼层
楼上的,拜托说点有价值的东东好不。您第一个回答对我帮助很大,使我少走了不少弯路,但上面的回答就有点。。。。。。

如果一个熟悉Proactor的应该很容易就能发现问题在那里了。

PS :用不着自己改写handle_accept(), handle_accept()还虚函数来的,如果应用有需要,为什么不重载?
再PS: 我这里重载handle_accept()是为了确认connect()的时候有没有掉用该函数,直接跟踪到ACE代码里面去调试不是很方便。

代码有问题是肯定的了,问题就是这个问题出在哪里呢?
 楼主| 发表于 2008-7-9 09:33:49 | 显示全部楼层
感谢CCTV,感谢acejoy,感谢winston。 @_@

问题慢慢查了。

希望acejoy.com能越办越好,能吸引些ACE高手来露下脸过来支持下。
如果大家都是为了问问题而来这里的话就没意思了。到最后问题都没人回答。
发表于 2008-7-9 09:57:46 | 显示全部楼层
写了一半,浏览器死锁了。
看情况,你还不理解Proactor - 没读那几本书?,Proactor是一整套体系,随便贴点代码,就想找到隐藏的问题,这个是很困难的,不然只能是不断猜测。最好有工程文件。
ACEJOY是个公益论坛,大家回答问题,纯粹是出于对ACE系统的热爱和发扬助人为乐的精神,没有义务一定要解决别人的问题。主要还是靠自己的,这里只能提供辅助的帮助。
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-12-23 13:21 , Processed in 0.026618 second(s), 5 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表