|
楼主 |
发表于 2008-9-28 21:20:24
|
显示全部楼层
参考发送和接收的代码
下面是一个参考文件,包含了发送和接收的代码,我的格式总是Header+Body,
Header格式如下:
数据头中前4个字节存放了一个布尔值,表示本机的字节顺序;
后面四个字节存放了整数0,代表这是数据头;
后面四个字节表示头后面的body的实际长度;
CDRHelper::generateHeader负责这个工作
Body格式如下:
前4个字节存放了一个布尔值,表示本机的字节顺序
然后四个字节是对象的type值
后面若干字节是对象序列化的字节数组,具体格式由各对象决定,并由各对象解析
CDRHelper::generateBody负责这个工作
#pragma once
#include <memory>
#include <vector>
using namespace std;
#include "ace/cdr_stream.h"
#include "ace/asynch_io.h"
#include "FileBlock.h"
#include "FileBlockAnswer.h"
#include "NetworkException.h"
#include "SerializeException.h"
#include "FileException.h"
class CDRHelper
{
public:
//数据头中前4个字节存放了一个布尔值,表示本机的字节顺序;
//后面四个字节存放了整数0,代表这是数据头;
//后面四个字节表示头后面的数据块的实际长度;
//最后是8字节ACE专用于对齐的,不在网络上传输,但是ACE_OutputCDR构造函数需要
static auto_ptr<ACE_OutputCDR> generateHeader(ACE_CDR::ULong bodySize)
{
auto_ptr<ACE_OutputCDR> spHeader(new ACE_OutputCDR(4+4+4+ACE_CDR::MAX_ALIGNMENT));
(*spHeader)<<ACE_OutputCDR::from_boolean(ACE_CDR_BYTE_ORDER);
ACE_CDR::ULong type=0;
(*spHeader)<<type;
(*spHeader)<<bodySize;//数据块的长度=4+4+data.size();
return spHeader;
}
//对象序列化后存放在body对象中,
//前4个字节存放了一个布尔值,表示本机的字节顺序;
//然后四个字节是对象的type值,
//后面若干字节是对象序列化的字节数组,具体格式由各对象决定,并由各对象解析
template<typename Data>
static auto_ptr<ACE_OutputCDR> generateBody(Data const& data)
{
auto_ptr<ACE_OutputCDR> spBody(new ACE_OutputCDR(4+4+data.size()+ACE_CDR::MAX_ALIGNMENT));
(*spBody)<<ACE_OutputCDR::from_boolean(ACE_CDR_BYTE_ORDER);
(*spBody)<<data.type_;
(*spBody)<<data;
return spBody;
}
//生成头和数据,并放入iovec数组中发送,result参数必须包含两个元素
template<typename Data>
static void send(Data const& data,ACE_SOCK_Stream& stream)//throw NetworkException
{
auto_ptr<ACE_OutputCDR> spBody(CDRHelper::generateBody<Data>(data));
auto_ptr<ACE_OutputCDR> spHeader(CDRHelper::generateHeader(static_cast<ACE_CDR::ULong>(spBody->total_length())));
iovec iov[2];
iov[0].iov_base=spHeader->begin()->rd_ptr();
iov[0].iov_len=static_cast<u_long>(spHeader->total_length());
iov[1].iov_base=spBody->begin()->rd_ptr();
iov[1].iov_len=static_cast<u_long>(spBody->total_length());
ACE_Time_Value time(4);
ACE_OS::last_error(0);
if(stream.sendv_n(iov,2,&time)==-1)
{
string errorMsg=NetworkException::format("CDRHelper::send","if(stream.sendv_n(iov,2,&time)==-1)");
throw NetworkException(errorMsg);
}
}
//1)每次根据队列头部的FileInfo元素信息打开一个文件,读取最多4096字节,然后发送,并等待服务器回答。
//如果服务器回答成功,则修改offset,如果offset已到文件尾,则删除FileInfo对象,然后返回。如果服务器回答失败,则重发,直到服务器回答成功或者通信超时。
//2)如果FileInfo的状态为文件不存在,发送FileBlock告诉服务器文件不存在,不等待服务器回答就返回。
//3)如果FileInfo对象表示的文件不可读或者其他错误,则不发送FileBlock对象,弹出FileInfo对象,继续从1)开始
//4)5秒为通信超时时间,如果通信超时,则抛出异常
static void send(deque<FileInfo>& files,ACE_SOCK_Stream& stream)//throw NetworkException,FileException,SerializeException
{
if(files.empty())
return;
FileInfo& info=files[0];
if(info.status_==FileInfo::none)
{
FileBlock block;
block.status_=FileBlock::none;
send<FileBlock>(block,stream);
return;
}
try
{
ifstream file(info.fileName_.c_str(),ios::binary|ios::in);
file.exceptions(ifstream::eofbit|ifstream::failbit|ifstream::badbit);
file.seekg(info.offset_);
filesystem::path path(info.fileName_.c_str());
uintmax_t fileSize=filesystem::file_size(path);
std::streamsize size=0;
if(fileSize>=FileBlock::blockMaxSize)
{
if((fileSize-info.offset_)>=FileBlock::blockMaxSize)
{
size=FileBlock::blockMaxSize;
}
else
{
size=static_cast<std::streamsize>(fileSize-info.offset_);
}
}
else
{
size=static_cast<ACE_CDR::ULong>(fileSize-info.offset_);
}
char* pBuffer=new char[size];
file.read(pBuffer,size);
FileBlock block;
block.fileName_=info.fileName_.c_str();
string::size_type idx=block.fileName_.rfind('\\');
block.fileName_=block.fileName_.substr(idx+1);
block.pData_=pBuffer;
block.dataSize_=size;
block.offset_=info.offset_;
block.status_=FileBlock::exist;
file.close();
FileBlockAnswer answer;
answer.recordResult_=false;
while(!answer.recordResult_)
{
if(block.offset_>6341732)
{
int i=0;
}
send<FileBlock>(block,stream);
CDRHelper::receiveData<FileBlockAnswer>(stream,answer);
if(answer.recordResult_)
{
info.offset_+=size;
if(info.offset_>=fileSize)
{
filesystem::remove(filesystem::path(info.fileName_.c_str()));
ACE_CString::size_type idx=info.fileName_.rfind('.');
ACE_CString desName=info.fileName_.substr(0,idx);
desName+=".des";
filesystem::remove(filesystem::path(desName.c_str()));
files.pop_front();
}
return;
}
else
{
answer.recordResult_=false;
}
}
}
catch(fstream::failure const& e)
{
string msg("CDRHelper::send function failed:");
msg+=e.what();
throw FileException(msg);
}
}
template<typename Data>
static void sendAsynch(Data const& data,ACE_Asynch_Write_Stream& stream)//throw NetworkException
{
auto_ptr<ACE_OutputCDR> spBody(CDRHelper::generateBody<Data>(data));
auto_ptr<ACE_OutputCDR> spHeader(CDRHelper::generateHeader(static_cast<ACE_CDR::ULong>(spBody->total_length())));
ACE_Message_Block* pHeader=spHeader->begin()->clone();
ACE_Message_Block* pBody=spBody->begin()->clone();
pHeader->cont(pBody);
int size=stream.writev(*pHeader,pHeader->total_size());
if(size!=1)
{
string errorMsg=NetworkException::format("sendAsy","if(stream.writev(*pHeader,pHeader->total_size())!=0)");
pHeader->release();
throw NetworkException(errorMsg);
}
}
template<typename Data>
static void receiveData(ACE_SOCK_Stream& stream,Data& data)//throw NetworkException,SerializeException
{
ACE_Message_Block block1(12);
ACE_CDR::mb_align(&block1);
ACE_OS::last_error(0);
if(stream.recv_n(block1.wr_ptr(),12)==12)//receive the header of CDR
{
//parse the cdr header
block1.wr_ptr(12);
ACE_InputCDR cdr(&block1);
ACE_CDR::Boolean byteOrder;
cdr>>static_cast<ACE_InputCDR::to_boolean>(byteOrder);
cdr.reset_byte_order(byteOrder);
ACE_CDR::ULong type=-1;
cdr>>type;
if(type==0)//如果是header的话,则发起读body的操作
{
ACE_CDR::ULong bodyLength;
cdr>>bodyLength;
ACE_Message_Block block2(bodyLength);
ACE_CDR::mb_align(&block2);
if(stream.recv_n(block2.wr_ptr(),bodyLength)==bodyLength)
{
block2.wr_ptr(bodyLength);
ACE_InputCDR cdr(&block2);
ACE_CDR::Boolean byteOrder;
cdr>>static_cast<ACE_InputCDR::to_boolean>(byteOrder);
cdr.reset_byte_order(byteOrder);
ACE_CDR::ULong type;
cdr>>type;
if(type==data.type_)
{
cdr>>data;
}
else//错误处理
{
string msg=SerializeException::format("CDRHelper::receiveData","can not receive correct body data");
throw SerializeException(msg);
}
}
else
{
string errorMsg=NetworkException::format("receiveData(ACE_SOCK_Stream& stream,Data& data)","if(stream.recv_n(block1.wr_ptr(),12)==12)");
throw NetworkException(errorMsg);
}
}
else//错误处理
{
string msg=SerializeException::format("CDRHelper::receiveData","can not receive correct head data");
throw SerializeException(msg);
}
}
else
{
string errorMsg=NetworkException::format("receiveData(ACE_SOCK_Stream& stream,Data& data)","if(stream.recv_n(block1.wr_ptr(),12)==12)");
throw NetworkException(errorMsg);
}
}
};
[ 本帖最后由 csfreebird 于 2008-9-28 21:24 编辑 ] |
|