csfreebird 发表于 2008-9-28 21:08:42

关于文件传输的解码出错问题

拜读了<<关于ACE_InputCDR和ACE_OutputCDR读写数据时的经验[原创*必读!>>,我在想,我遇到的文件传输解码失败的问题是不是和这个有些关系?
我的麻烦在于文件传输分块传输,但是超过4K字节就会解码出错。我的FileBlock类代表文件传输块,包含文件名和offset等一些必要信息,FileBlock和operator<<、operator>>配合负责序列化成二进制流和反序列化。代码如下:
class FileBlock:public TransferedData
{
public:
ACE_CString fileName_;
ACE_CDR::Char* pData_;
ACE_CDR::ULong dataSize_;
ACE_CDR::ULong offset_;
ACE_CDR::ULong status_;//1存在,0不存在
static const ACE_CDR::ULong blockMaxSize=4*1024;
static const ACE_CDR::ULong exist=1;
static const ACE_CDR::ULong none=0;
FileBlock():TransferedData(TransferedData::fileBlock),pData_(NULL),dataSize_(-1),offset_(-1),status_(-1)
{
}
~FileBlock()
{
if(pData_!=NULL)
{
   delete[] pData_;
}
}
//建议ACE使用的缓冲区初始大小大小
ACE_CDR::ULong size() const
{
return blockMaxSize+64;
}
};

#include "FileBlock.h"

int operator<<(ACE_OutputCDR & cdr,FileBlock const& block)
{
cdr.write_string(block.fileName_);
cdr<<block.status_;
cdr<<block.dataSize_;
ACE_CDR::Boolean result=cdr.write_char_array(block.pData_,block.dataSize_);
cdr.write_ulong(block.offset_);
return cdr.good_bit();
}

int operator>>(ACE_InputCDR & cdr,FileBlock& block)
{
cdr.read_string(block.fileName_);
cdr>>block.status_;
cdr>>block.dataSize_;
block.pData_=new ACE_CDR::Char;
ACE_CDR::Boolean result=cdr.read_char_array(block.pData_,block.dataSize_);
cdr.read_ulong(block.offset_);
return cdr.good_bit();
}


我的问题是这样如果没有定义ACE_LACKS_CDR_ALIGNMENT宏,会不会一定出问题?
我的情况是
1)常量blockMaxSize=4*1024的值如果修改为8*1024或以上,本机解码一定会出错。
2)如果blockMaxSize=4*1024的话,局域网内测试5%左右的概率会出错,出错时,往往是接收端FileBlock的offset_变量获取不正确。
我也试过定义ACE_LACKS_CDR_ALIGNMENT宏重新编译ACE,但是我的工程运行会出错。
我烦恼的是为什么出错不是必然,而是有一定的概率呢?国庆节后我打算彻底解决这个问题,各位熟悉ACE的能不能出点主意呢?

csfreebird 发表于 2008-9-28 21:20:24

参考发送和接收的代码

下面是一个参考文件,包含了发送和接收的代码,我的格式总是Header+Body,
Header格式如下:
数据头中前4个字节存放了一个布尔值,表示本机的字节顺序;
后面四个字节存放了整数0,代表这是数据头;
后面四个字节表示头后面的body的实际长度;
CDRHelper::generateHeader负责这个工作

Body格式如下:
前4个字节存放了一个布尔值,表示本机的字节顺序
然后四个字节是对象的type值
后面若干字节是对象序列化的字节数组,具体格式由各对象决定,并由各对象解析
CDRHelper::generateBody负责这个工作


#pragma once
#include <memory>
#include <vector>
using namespace std;
#include "ace/cdr_stream.h"
#include "ace/asynch_io.h"
#include "FileBlock.h"
#include "FileBlockAnswer.h"
#include "NetworkException.h"
#include "SerializeException.h"
#include "FileException.h"
class CDRHelper
{
public:
//数据头中前4个字节存放了一个布尔值,表示本机的字节顺序;
//后面四个字节存放了整数0,代表这是数据头;
//后面四个字节表示头后面的数据块的实际长度;
//最后是8字节ACE专用于对齐的,不在网络上传输,但是ACE_OutputCDR构造函数需要
static auto_ptr<ACE_OutputCDR> generateHeader(ACE_CDR::ULong bodySize)
{
auto_ptr<ACE_OutputCDR> spHeader(new ACE_OutputCDR(4+4+4+ACE_CDR::MAX_ALIGNMENT));
(*spHeader)<<ACE_OutputCDR::from_boolean(ACE_CDR_BYTE_ORDER);
ACE_CDR::ULong type=0;
(*spHeader)<<type;
(*spHeader)<<bodySize;//数据块的长度=4+4+data.size();
return spHeader;
}

//对象序列化后存放在body对象中,
//前4个字节存放了一个布尔值,表示本机的字节顺序;
//然后四个字节是对象的type值,
//后面若干字节是对象序列化的字节数组,具体格式由各对象决定,并由各对象解析
template<typename Data>
static auto_ptr<ACE_OutputCDR> generateBody(Data const& data)
{
auto_ptr<ACE_OutputCDR> spBody(new ACE_OutputCDR(4+4+data.size()+ACE_CDR::MAX_ALIGNMENT));
(*spBody)<<ACE_OutputCDR::from_boolean(ACE_CDR_BYTE_ORDER);
(*spBody)<<data.type_;
(*spBody)<<data;
return spBody;
}

//生成头和数据,并放入iovec数组中发送,result参数必须包含两个元素
template<typename Data>
static void send(Data const& data,ACE_SOCK_Stream& stream)//throw NetworkException
{
auto_ptr<ACE_OutputCDR> spBody(CDRHelper::generateBody<Data>(data));
auto_ptr<ACE_OutputCDR> spHeader(CDRHelper::generateHeader(static_cast<ACE_CDR::ULong>(spBody->total_length())));
iovec iov;
iov.iov_base=spHeader->begin()->rd_ptr();
iov.iov_len=static_cast<u_long>(spHeader->total_length());
iov.iov_base=spBody->begin()->rd_ptr();
iov.iov_len=static_cast<u_long>(spBody->total_length());
ACE_Time_Value time(4);
ACE_OS::last_error(0);
if(stream.sendv_n(iov,2,&time)==-1)
{
   string errorMsg=NetworkException::format("CDRHelper::send","if(stream.sendv_n(iov,2,&time)==-1)");
   throw NetworkException(errorMsg);
}
}
//1)每次根据队列头部的FileInfo元素信息打开一个文件,读取最多4096字节,然后发送,并等待服务器回答。
//如果服务器回答成功,则修改offset,如果offset已到文件尾,则删除FileInfo对象,然后返回。如果服务器回答失败,则重发,直到服务器回答成功或者通信超时。
//2)如果FileInfo的状态为文件不存在,发送FileBlock告诉服务器文件不存在,不等待服务器回答就返回。
//3)如果FileInfo对象表示的文件不可读或者其他错误,则不发送FileBlock对象,弹出FileInfo对象,继续从1)开始
//4)5秒为通信超时时间,如果通信超时,则抛出异常
static void send(deque<FileInfo>& files,ACE_SOCK_Stream& stream)//throw NetworkException,FileException,SerializeException
{
if(files.empty())
   return;
FileInfo& info=files;
if(info.status_==FileInfo::none)
{
   FileBlock block;
   block.status_=FileBlock::none;
   send<FileBlock>(block,stream);
   return;
}
try
{
   ifstream file(info.fileName_.c_str(),ios::binary|ios::in);
   file.exceptions(ifstream::eofbit|ifstream::failbit|ifstream::badbit);
   file.seekg(info.offset_);
   filesystem::path path(info.fileName_.c_str());
   uintmax_t fileSize=filesystem::file_size(path);
   std::streamsize size=0;
   if(fileSize>=FileBlock::blockMaxSize)
   {
    if((fileSize-info.offset_)>=FileBlock::blockMaxSize)
    {
   size=FileBlock::blockMaxSize;
    }
    else
    {
   size=static_cast<std::streamsize>(fileSize-info.offset_);
    }
   }
   else
   {
    size=static_cast<ACE_CDR::ULong>(fileSize-info.offset_);
   }
   char* pBuffer=new char;
   file.read(pBuffer,size);
   FileBlock block;
   block.fileName_=info.fileName_.c_str();
   string::size_type idx=block.fileName_.rfind('\\');
   block.fileName_=block.fileName_.substr(idx+1);
   block.pData_=pBuffer;
   block.dataSize_=size;
   block.offset_=info.offset_;
   block.status_=FileBlock::exist;
   file.close();
   FileBlockAnswer answer;
   answer.recordResult_=false;
   while(!answer.recordResult_)
   {
    if(block.offset_>6341732)
    {
   int i=0;
    }
    send<FileBlock>(block,stream);
    CDRHelper::receiveData<FileBlockAnswer>(stream,answer);
    if(answer.recordResult_)
    {
   info.offset_+=size;
   if(info.offset_>=fileSize)
   {
      filesystem::remove(filesystem::path(info.fileName_.c_str()));
      ACE_CString::size_type idx=info.fileName_.rfind('.');
      ACE_CString desName=info.fileName_.substr(0,idx);
      desName+=".des";
      filesystem::remove(filesystem::path(desName.c_str()));
      files.pop_front();
   }
   return;
    }
    else
    {
   answer.recordResult_=false;
    }
   }
}
catch(fstream::failure const& e)
{
   string msg("CDRHelper::send function failed:");
   msg+=e.what();
   throw FileException(msg);
}
}
template<typename Data>
static void sendAsynch(Data const& data,ACE_Asynch_Write_Stream& stream)//throw NetworkException
{
auto_ptr<ACE_OutputCDR> spBody(CDRHelper::generateBody<Data>(data));
auto_ptr<ACE_OutputCDR> spHeader(CDRHelper::generateHeader(static_cast<ACE_CDR::ULong>(spBody->total_length())));
ACE_Message_Block* pHeader=spHeader->begin()->clone();
ACE_Message_Block* pBody=spBody->begin()->clone();
pHeader->cont(pBody);
int size=stream.writev(*pHeader,pHeader->total_size());
if(size!=1)
{
   string errorMsg=NetworkException::format("sendAsy","if(stream.writev(*pHeader,pHeader->total_size())!=0)");
   pHeader->release();
   throw NetworkException(errorMsg);
}
}
template<typename Data>
static void receiveData(ACE_SOCK_Stream& stream,Data& data)//throw NetworkException,SerializeException
{
ACE_Message_Block block1(12);
ACE_CDR::mb_align(&block1);
ACE_OS::last_error(0);
if(stream.recv_n(block1.wr_ptr(),12)==12)//receive the header of CDR
{
   //parse the cdr header
   block1.wr_ptr(12);
   ACE_InputCDR cdr(&block1);
   ACE_CDR::Boolean byteOrder;
   cdr>>static_cast<ACE_InputCDR::to_boolean>(byteOrder);
   cdr.reset_byte_order(byteOrder);
   ACE_CDR::ULong type=-1;
   cdr>>type;
   if(type==0)//如果是header的话,则发起读body的操作
   {
    ACE_CDR::ULong bodyLength;
    cdr>>bodyLength;
    ACE_Message_Block block2(bodyLength);
    ACE_CDR::mb_align(&block2);
    if(stream.recv_n(block2.wr_ptr(),bodyLength)==bodyLength)
    {
   block2.wr_ptr(bodyLength);
   ACE_InputCDR cdr(&block2);
   ACE_CDR::Boolean byteOrder;
   cdr>>static_cast<ACE_InputCDR::to_boolean>(byteOrder);
   cdr.reset_byte_order(byteOrder);
   ACE_CDR::ULong type;
   cdr>>type;
   if(type==data.type_)
   {
      cdr>>data;
   }
   else//错误处理
   {
      string msg=SerializeException::format("CDRHelper::receiveData","can not receive correct body data");
      throw SerializeException(msg);
   }
    }
    else
    {
   string errorMsg=NetworkException::format("receiveData(ACE_SOCK_Stream& stream,Data& data)","if(stream.recv_n(block1.wr_ptr(),12)==12)");
   throw NetworkException(errorMsg);
    }
   }
   else//错误处理
   {
    string msg=SerializeException::format("CDRHelper::receiveData","can not receive correct head data");
    throw SerializeException(msg);
   }
}
else
{
   string errorMsg=NetworkException::format("receiveData(ACE_SOCK_Stream& stream,Data& data)","if(stream.recv_n(block1.wr_ptr(),12)==12)");
   throw NetworkException(errorMsg);
}
}
};

[ 本帖最后由 csfreebird 于 2008-9-28 21:24 编辑 ]

csfreebird 发表于 2008-10-11 12:03:56

最新进展:本机传输10K大小数据问题解决

经过这段时间的仔细检查,修改了自己的很多代码,但是问题仍然没有解决,最后在客户端和服务端加了很多日志,仔细跟踪收发数据的流程。ACE的日志宏还是不错的。&lt;br /&gt;一直怀疑是自己的问题,终于发现本机传送大数据的错误原因是proactor框架。文档中所说handle_read_stream函数会在系统完成读数据后被调用,结果的确回调了,但是我发起的异步读10266字节数据的操作,总是只能被完成10211,少了55个字节。
所以我每次判断是否读完了我要的数据,如果没有读完,要发起再读,最后要将多次读取获得的ACE_Message_Block对象链接在一起。
至于另一个局域网间两台机器传送文件的问题,还没有测试,或许已经好了,或许还有别的原因。

[ 本帖最后由 csfreebird 于 2008-10-11 12:05 编辑 ]

csfreebird 发表于 2008-10-15 10:54:39

结论

局域网内两台机器传输文件的测试结果表明同样是handle_read_stream的方法并不能保证一定会在接收到所有字节后才会被调用。问题已经查明。希望后来者在使用proactor框架的时候能够看到我这篇文章,我现在每次发送80k数据都试过了,非常好。注意,我的ACE编译时config.h文件加了几个宏,用于支持stl和禁止ACE_OutputCDR使用字节对齐和cdr格式。
#define ACE_HAS_STANDARD_CPP_LIBRARY 1
#define ACE_CDR_IGNORE_ALIGNMENT
#define ACE_LACKS_CDR_ALIGNMENT
#include "ace/config-win32.h"

winston 发表于 2008-10-15 13:40:19

把你的问题给漏掉了。刚看到。

“测试结果表明同样是handle_read_stream的方法并不能保证一定会在接收到所有字节后才会被调用”
谁保证了?ACE是API的完美封装,API什么样,ACE改变不了。异步模式决定了,肯定不是你原来预想的样子。所以,先读读网络编程基础再认识ACE,会更深刻。
页: [1]
查看完整版本: 关于文件传输的解码出错问题