找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 3659|回复: 7

通信模块开发(~~~~持续更新中~~~~)

[复制链接]
发表于 2009-6-3 17:33:57 | 显示全部楼层 |阅读模式
:lol 大家好!:lol
实验室项目要求做一个通信模块,供客户端缓存和服务器端缓存之间高效传递数据,经高层商量决定用ACE实现,并派发到偶身上啦:lol ,
是机会亦是挑战:L  ,因为之前我完全没有接触过ACE,赶紧看了2个星期的书和帖子,做出以下1.01版本,其漏洞百出,目不忍视:'( 。希望高手多多关注,做的不好的地方,还望帮忙指出来,以后我会一直把模块的新进展贴进来。

设计:
客户端:无框架
分2个线程,一个发请求,一个接收数据。
服务器端:proactor框架(其精髓我还未体会,但是高层点名道姓就是它啦)。

[ 本帖最后由 agent 于 2009-6-4 08:41 编辑 ]
 楼主| 发表于 2009-6-3 17:35:30 | 显示全部楼层
// ace_client.cpp : 定义控制台应用程序的入口点。
//版本:1.01
//功能:客户端,发送请求连接信息,若成功,发送数据请求

#include "stdafx.h"
#include "ace/OS.h"
#include <iostream>
#include <string>
#include "ace/LOG_Msg.h"
#include "ace/Task.h"
#include "ace/SOCK_connector.h"
#include "ace/INET_Addr.h"
#include <ofstream>
class WriteThread: public ACE_Task<ACE_MT_SYNCH>
{
private:
ACE_SOCK_Stream _stream;
std::string _requestMsg;
public:
int open(ACE_SOCK_Stream stream,int beginflag,int endflag,int datalength)
{
  _requestMsg = "01000610051000" ;
  _stream = stream;
  activate();
  return 0;
}
int close(u_long)
{
  ACE_DEBUG((LM_DEBUG, "(%T) 数据请求发送完毕!\n"));
  return 0;
}
int svc(void)
{
  _stream.send(_requestMsg.c_str(),_requestMsg.size());
  return 0;
}
};

class ReadThread:public ACE_Task<ACE_MT_SYNCH>
{
private:
void *_buf;
int _length;
ACE_SOCK_Stream _stream;
public:
int open(ACE_SOCK_Stream stream,int dataLength,void *buf)
{
  _length = dataLength;
  _buf = buf;
  _stream = stream;
  activate(THR_NEW_LWP,1);
  return 0;
}
int svc()
{
  _stream.recv(_buf,_length);
  return 0;
}
int close()
{
  _stream.close();
  return 0;
}
};
class Client
{
private:
ACE_SOCK_Stream _client_stream;
ACE_SOCK_Connector _connector;
std::string _userName;
std::string _userPswd;
ACE_INET_Addr _serverAddr;
public:
Client(std::string userName,std::string userPswd):_userName(userName),_userPswd(userPswd){}
int ConnectToServer(std::string userIpPort);
int requestData(int beginflag,int endflag,int length,char *databuf);
int close();
};
int Client::ConnectToServer(std::string userIpPort)
{
_serverAddr.string_to_addr(userIpPort.c_str());
ACE_DEBUG((LM_DEBUG,"(%P|%T)Starting to connect %s:%d\n",
  _serverAddr.get_host_name(),_serverAddr.get_port_number()));
if ((_connector.connect(_client_stream,_serverAddr))==-1)
{
  ACE_ERROR_RETURN((LM_ERROR,"(%P|T)%p\n","Failed to connect!\n"),0);
}
else
{
  ACE_DEBUG((LM_DEBUG,"(%P|%T)Success connect to %s!\n",_serverAddr.get_host_name()));
  
  //用户请求连接,“99”做数据报头
  std::string userStr = "99" + _userName + "*" + _userPswd + "*";
  _client_stream.send_n(userStr.c_str(),userStr.size(),0);
  char recvMsg[20]= "";
  int count = _client_stream.recv(recvMsg,20,0);
  //连接成功返回数据报头为1,否则为0
  if (recvMsg[2]=='0'){
   std::cout<<"不合法用户,请重新登录!\n";
   return 0;
  }else{
   std::cout<<"欢迎登录!\n";
   return 1;
  }
}
}
int Client::requestData(int beginflag,int endflag,int length,char *dataBuf)
{
WriteThread *writeThread = new WriteThread;
ReadThread *readThread = new ReadThread;
writeThread->open(_client_stream,beginflag,endflag,length);
readThread->open(_client_stream,length,(void*)dataBuf);
ACE_Thread_Manager::instance()->wait();
return 0;
}
int Client::close(){
if ((_client_stream.close())==-1)
{
  ACE_ERROR_RETURN((LM_ERROR,"(%P|%T)%s\n","Failed to close"),-1);
}
else
{
  ACE_DEBUG((LM_DEBUG,"关闭连接!\n"));
  return 0;
}
}
int _tmain(int argc, _TCHAR* argv[])
{
ACE::init();
std::string username;
std::string userpswd;
while(1){
std::cout<<"请输入用户名:\n";
std::cin>>username;
std::cout<<"请输入密码:\n";
std::cin>>userpswd;
Client client(username,userpswd);
if (client.ConnectToServer("192.168.0.58:1986"))
{
  client.requestData(6,1005,1000,data);
  std::cout<<data<<"\n";
}
else
{
  std::cout<<"连接服务器失败!";
}
client.close();
}
system("pause");
ACE::fini();
return 0;
}

[ 本帖最后由 agent 于 2009-6-4 17:08 编辑 ]
 楼主| 发表于 2009-6-3 17:36:19 | 显示全部楼层
// CommunicationServer.cpp : 定义控制台应用程序的入口点。
//版本:1.01
//功能:服务器端,监听客户端连接,检验客户端用户合法性,接收合法用户的数据请求,异步传回给客户端

#include "stdafx.h"
#include <iostream>
#include <string>
#include <map>
#include "ace/Proactor.h"
#include "ace/INET_Addr.h"
#include "ace/SOCK_Acceptor.h"
#include "ace/Asynch_Acceptor.h"
#include "ace/Asynch_IO.h"
#include "ace/SOCK_SEQPACK_Association.h"
#include "ace/Message_Block.h"
#include "ace/Task.h"
#define  USER_MSG_LENGTH 40
#define  REQUEST_DATA_MSG_LENGTH 20
class NetThread;
NetThread          *io_thread;
//测试的安全用户
typedef std::map<std::string, std::string>    validUser;
typedef std::map<std::string,std::string>::value_type  valtype;
validUser user;
//网络层主动对象
class NetThread:public ACE_Task_Base
{
public:
NetThread(){}
~NetThread(){}
int open(){
  return this->activate(THR_NEW_LWP,1);
}
int svc(){
  //启用事件分发处理器
  ACE_Proactor::instance()->proactor_run_event_loop();
  return 0;
}
int close(){
  //关闭事件分发处理器
  ACE_Proactor::instance()->proactor_end_event_loop();
  this->wait();
  return 0;
}
};
//创建服务处理器
class CommunicationServer:public ACE_Service_Handler
{
public:
void open(ACE_HANDLE new_handle, ACE_Message_Block &message_block)
{
  this->handle (new_handle);
  if (this->_readStream.open (*this) != 0 || this->_writeStream.open (*this) != 0 )
  {
   delete this;
   return;
  }
  ACE_INET_Addr clientAddr;
  ACE_SOCK_SEQPACK_Association ass = ACE_SOCK_SEQPACK_Association(new_handle);
  ass.get_remote_addr(clientAddr);
  std::cout<<"\nA new connector:"
   <<clientAddr.get_host_name()
   <<"  "
   <<clientAddr.get_port_number()
   <<"\n";
  ACE_Message_Block *rec;
  ACE_NEW_NORETURN(rec,ACE_Message_Block(USER_MSG_LENGTH));
  if(this->initReadStream(rec) != 0)
  {
   return;
  }
}
//异步读完后会调用这个函数
void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result)
{
  ACE_DEBUG ((LM_DEBUG,"handle_read_stream called \n"));
  ACE_Message_Block &mb = result.message_block();
  size_t n = result.bytes_transferred();
  if (!result.success()||result.bytes_transferred()==0)
  {
   mb.release();
   delete this;
  }
  std::string str((char *)mb.rd_ptr(),40);
  std::string headStr = str.substr(0,2);
  if (headStr == "01") //数据请求信息
  {
   //测试传输数据
   std::string temp("0123456789");
   std::string data("01");
   for (int i = 0;i<100;i++)
   {
    data = data + temp;
   }
   ACE_Message_Block *dataBuf;
   ACE_NEW_NORETURN(dataBuf,ACE_Message_Block(1024));
   dataBuf->copy(data.c_str());
   if (0 != this->initWriteStream(*dataBuf,1024))
   {
    return ;
   }
  }  
  else if(headStr == "02")//其他消息类型.........
  {
   std::cout<<"02";
  }
  else if(headStr == "99")//用户请求连接
  {
   int firstx = str.find_first_of('*');
   int lastx = str.find_last_of('*');
   std::string name = str.substr(2,firstx-2);
   std::string pswd = str.substr(firstx+1,lastx-firstx-1);
   std::cout<<"name:"<<name
    <<"\npswd:"<<pswd<<"\n";
   ACE_Message_Block *validMsg;
   ACE_NEW_NORETURN(validMsg,ACE_Message_Block(10));
   if (isValidUser(name,pswd))
   {
    validMsg->copy("991");
    if (0 != this->initWriteStream(*validMsg,10))
    {
     return ;
    }   
    else
    {
     ACE_Message_Block *dataRequest;
     ACE_NEW_NORETURN(dataRequest,ACE_Message_Block(REQUEST_DATA_MSG_LENGTH));
     if(0 != this->initReadStream(dataRequest))
     {
      return;
     }
    }
   }
   else
   {
    validMsg->copy("990");
    if (0 != this->initWriteStream(*validMsg,10))
    {
     return ;
    }
   }
   
  }
  else
  {
   std::cout<<"unknownMSG";
  }
  return;
  }
//异步写完后会调用这个函数
void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
{
  ACE_DEBUG ((LM_DEBUG,"handle_write_stream called \n"));
  result.message_block ().rd_ptr (result.message_block ().rd_ptr () - result.bytes_transferred ());
  ACE_DEBUG ((LM_DEBUG, "******************** \n"));
  ACE_DEBUG ((LM_DEBUG, "%s = %d \n", "bytes_to_write", result.bytes_to_write ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d \n", "handle", result.handle ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d \n", "bytes_transfered", result.bytes_transferred ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d \n", "act", (u_long) result.act ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d \n", "success", result.success ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d \n", "completion_key", (u_long) result.completion_key ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d \n", "error", result.error ()));
  ACE_DEBUG ((LM_DEBUG, "******************** \n"));
  std::string str((char *)result.message_block().rd_ptr(),3);
  if (str == "991")
  {
   ACE_DEBUG((LM_DEBUG,"合法客户,连接成功!\n"));
  }
  else if (str == "990")
  {
   ACE_OS::closesocket(this->handle());
   ACE_DEBUG((LM_DEBUG,"不合法客户,拒绝连接!\n"));
  }
  else
  {
   ACE_DEBUG((LM_DEBUG,"数据发送成功!\n"));
  }
  result.message_block ().release ();
  return ;
  }
~CommunicationServer()
{
  if (this->handle() != ACE_INVALID_HANDLE)
  {
   ACE_OS::closesocket(this->handle());
  }
}
private:
//异步读初始化
int initReadStream(ACE_Message_Block *_mbRecvData)
{
  if (0!=this->_readStream.read(*_mbRecvData,_mbRecvData->space()))
  {
   _mbRecvData->release();
   ACE_ERROR_RETURN((LM_ERROR, "%p\n", "ACE_Asynch_Read_Stream::read"), -1);
   return -1;
  }
  return 0;
}
//异步写初始化
int initWriteStream(ACE_Message_Block &mb ,size_t nBytes)
{
  if (this->_writeStream.write (mb , nBytes ) == -1)
  {
   mb.release ();
   ACE_ERROR_RETURN((LM_ERROR, "%p\n", "ACE_Asynch_Write_File::write"), -1);
   return -1;
  }
  return 0;
}
//检验合法用户
int isValidUser(std::string name,std::string pswd)
{
  user.insert(valtype("py","1235467"));
  user.insert(valtype("cug","12345678"));
  validUser::iterator it = user.find(name);
  if (it != user.end())
  {
   return it->second == pswd?1:0;
  }
  return 0;
}
private:
ACE_Asynch_Write_Stream _writeStream;
ACE_Asynch_Read_Stream _readStream;
};

int _tmain(int argc, _TCHAR* argv[])
{
ACE::init();
ACE_INET_Addr listenAddr(1986,"192.168.0.58");
ACE_OS::printf("Server port :%u",listenAddr.get_port_number());
io_thread = new NetThread ;
//使用接收器和远端进行连接
ACE_Asynch_Acceptor<CommunicationServer> server;
server.open(listenAddr);
io_thread->open();
getchar();
io_thread->close();
ACE::fini();
return 0;
}


//基本框架已经搭建完毕,基本功能完毕,不过没涉及到并发管理和数据压缩。。。。

[ 本帖最后由 agent 于 2009-6-4 17:10 编辑 ]
发表于 2009-6-3 23:00:45 | 显示全部楼层
兄弟,你的proactor代码真是没有理解透啊,还需要加强,bug太多了,n多地方。
我保证你这个程序到处都是异常和内存泄漏
 楼主| 发表于 2009-6-4 08:38:09 | 显示全部楼层

回复 #4 newzai 的帖子

呵呵,谢谢回复,经过修改,暂时已经达到了基本功能,估计肯定还有bug,如果发现还有什么BUG,请一定帮个忙,指出来。。。谢谢。。。

[ 本帖最后由 agent 于 2009-6-4 17:16 编辑 ]
 楼主| 发表于 2009-6-4 17:11:56 | 显示全部楼层
经过一天的修改,现在上面的代码已经能实现基本功能,暂时还没发现bug,不过估计肯定会有。。。。继续学习。。。。。
发表于 2009-6-4 18:12:53 | 显示全部楼层
鼓励一下,继续努力。其实推荐你把整个工程压缩包发送上来,给大家看看 - 排除可能涉及私密的代码,如果可行的话。
这样大家都能调试,帮你解决问题。
 楼主| 发表于 2009-6-8 10:38:18 | 显示全部楼层
现在要用MFC做一个服务器。。还要和别的模块结合。。任务愈来愈艰巨啦。。。。
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-11-23 03:33 , Processed in 0.021801 second(s), 6 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表