通信模块开发(~~~~持续更新中~~~~)
:lol 大家好!:lol实验室项目要求做一个通信模块,供客户端缓存和服务器端缓存之间高效传递数据,经高层商量决定用ACE实现,并派发到偶身上啦:lol ,
是机会亦是挑战:L,因为之前我完全没有接触过ACE,赶紧看了2个星期的书和帖子,做出以下1.01版本,其漏洞百出,目不忍视:'( 。希望高手多多关注,做的不好的地方,还望帮忙指出来,以后我会一直把模块的新进展贴进来。
设计:
客户端:无框架
分2个线程,一个发请求,一个接收数据。
服务器端:proactor框架(其精髓我还未体会,但是高层点名道姓就是它啦)。
[ 本帖最后由 agent 于 2009-6-4 08:41 编辑 ] // ace_client.cpp : 定义控制台应用程序的入口点。
//版本:1.01
//功能:客户端,发送请求连接信息,若成功,发送数据请求
#include "stdafx.h"
#include "ace/OS.h"
#include <iostream>
#include <string>
#include "ace/LOG_Msg.h"
#include "ace/Task.h"
#include "ace/SOCK_connector.h"
#include "ace/INET_Addr.h"
#include <ofstream>
class WriteThread: public ACE_Task<ACE_MT_SYNCH>
{
private:
ACE_SOCK_Stream _stream;
std::string _requestMsg;
public:
int open(ACE_SOCK_Stream stream,int beginflag,int endflag,int datalength)
{
_requestMsg = "01000610051000" ;
_stream = stream;
activate();
return 0;
}
int close(u_long)
{
ACE_DEBUG((LM_DEBUG, "(%T) 数据请求发送完毕!\n"));
return 0;
}
int svc(void)
{
_stream.send(_requestMsg.c_str(),_requestMsg.size());
return 0;
}
};
class ReadThread:public ACE_Task<ACE_MT_SYNCH>
{
private:
void *_buf;
int _length;
ACE_SOCK_Stream _stream;
public:
int open(ACE_SOCK_Stream stream,int dataLength,void *buf)
{
_length = dataLength;
_buf = buf;
_stream = stream;
activate(THR_NEW_LWP,1);
return 0;
}
int svc()
{
_stream.recv(_buf,_length);
return 0;
}
int close()
{
_stream.close();
return 0;
}
};
class Client
{
private:
ACE_SOCK_Stream _client_stream;
ACE_SOCK_Connector _connector;
std::string _userName;
std::string _userPswd;
ACE_INET_Addr _serverAddr;
public:
Client(std::string userName,std::string userPswd):_userName(userName),_userPswd(userPswd){}
int ConnectToServer(std::string userIpPort);
int requestData(int beginflag,int endflag,int length,char *databuf);
int close();
};
int Client::ConnectToServer(std::string userIpPort)
{
_serverAddr.string_to_addr(userIpPort.c_str());
ACE_DEBUG((LM_DEBUG,"(%P|%T)Starting to connect %s:%d\n",
_serverAddr.get_host_name(),_serverAddr.get_port_number()));
if ((_connector.connect(_client_stream,_serverAddr))==-1)
{
ACE_ERROR_RETURN((LM_ERROR,"(%P|T)%p\n","Failed to connect!\n"),0);
}
else
{
ACE_DEBUG((LM_DEBUG,"(%P|%T)Success connect to %s!\n",_serverAddr.get_host_name()));
//用户请求连接,“99”做数据报头
std::string userStr = "99" + _userName + "*" + _userPswd + "*";
_client_stream.send_n(userStr.c_str(),userStr.size(),0);
char recvMsg= "";
int count = _client_stream.recv(recvMsg,20,0);
//连接成功返回数据报头为1,否则为0
if (recvMsg=='0'){
std::cout<<"不合法用户,请重新登录!\n";
return 0;
}else{
std::cout<<"欢迎登录!\n";
return 1;
}
}
}
int Client::requestData(int beginflag,int endflag,int length,char *dataBuf)
{
WriteThread *writeThread = new WriteThread;
ReadThread *readThread = new ReadThread;
writeThread->open(_client_stream,beginflag,endflag,length);
readThread->open(_client_stream,length,(void*)dataBuf);
ACE_Thread_Manager::instance()->wait();
return 0;
}
int Client::close(){
if ((_client_stream.close())==-1)
{
ACE_ERROR_RETURN((LM_ERROR,"(%P|%T)%s\n","Failed to close"),-1);
}
else
{
ACE_DEBUG((LM_DEBUG,"关闭连接!\n"));
return 0;
}
}
int _tmain(int argc, _TCHAR* argv[])
{
ACE::init();
std::string username;
std::string userpswd;
while(1){
std::cout<<"请输入用户名:\n";
std::cin>>username;
std::cout<<"请输入密码:\n";
std::cin>>userpswd;
Client client(username,userpswd);
if (client.ConnectToServer("192.168.0.58:1986"))
{
client.requestData(6,1005,1000,data);
std::cout<<data<<"\n";
}
else
{
std::cout<<"连接服务器失败!";
}
client.close();
}
system("pause");
ACE::fini();
return 0;
}
[ 本帖最后由 agent 于 2009-6-4 17:08 编辑 ] // CommunicationServer.cpp : 定义控制台应用程序的入口点。
//版本:1.01
//功能:服务器端,监听客户端连接,检验客户端用户合法性,接收合法用户的数据请求,异步传回给客户端
#include "stdafx.h"
#include <iostream>
#include <string>
#include <map>
#include "ace/Proactor.h"
#include "ace/INET_Addr.h"
#include "ace/SOCK_Acceptor.h"
#include "ace/Asynch_Acceptor.h"
#include "ace/Asynch_IO.h"
#include "ace/SOCK_SEQPACK_Association.h"
#include "ace/Message_Block.h"
#include "ace/Task.h"
#defineUSER_MSG_LENGTH 40
#defineREQUEST_DATA_MSG_LENGTH 20
class NetThread;
NetThread *io_thread;
//测试的安全用户
typedef std::map<std::string, std::string> validUser;
typedef std::map<std::string,std::string>::value_typevaltype;
validUser user;
//网络层主动对象
class NetThread:public ACE_Task_Base
{
public:
NetThread(){}
~NetThread(){}
int open(){
return this->activate(THR_NEW_LWP,1);
}
int svc(){
//启用事件分发处理器
ACE_Proactor::instance()->proactor_run_event_loop();
return 0;
}
int close(){
//关闭事件分发处理器
ACE_Proactor::instance()->proactor_end_event_loop();
this->wait();
return 0;
}
};
//创建服务处理器
class CommunicationServer:public ACE_Service_Handler
{
public:
void open(ACE_HANDLE new_handle, ACE_Message_Block &message_block)
{
this->handle (new_handle);
if (this->_readStream.open (*this) != 0 || this->_writeStream.open (*this) != 0 )
{
delete this;
return;
}
ACE_INET_Addr clientAddr;
ACE_SOCK_SEQPACK_Association ass = ACE_SOCK_SEQPACK_Association(new_handle);
ass.get_remote_addr(clientAddr);
std::cout<<"\nA new connector:"
<<clientAddr.get_host_name()
<<""
<<clientAddr.get_port_number()
<<"\n";
ACE_Message_Block *rec;
ACE_NEW_NORETURN(rec,ACE_Message_Block(USER_MSG_LENGTH));
if(this->initReadStream(rec) != 0)
{
return;
}
}
//异步读完后会调用这个函数
void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result)
{
ACE_DEBUG ((LM_DEBUG,"handle_read_stream called \n"));
ACE_Message_Block &mb = result.message_block();
size_t n = result.bytes_transferred();
if (!result.success()||result.bytes_transferred()==0)
{
mb.release();
delete this;
}
std::string str((char *)mb.rd_ptr(),40);
std::string headStr = str.substr(0,2);
if (headStr == "01") //数据请求信息
{
//测试传输数据
std::string temp("0123456789");
std::string data("01");
for (int i = 0;i<100;i++)
{
data = data + temp;
}
ACE_Message_Block *dataBuf;
ACE_NEW_NORETURN(dataBuf,ACE_Message_Block(1024));
dataBuf->copy(data.c_str());
if (0 != this->initWriteStream(*dataBuf,1024))
{
return ;
}
}
else if(headStr == "02")//其他消息类型.........
{
std::cout<<"02";
}
else if(headStr == "99")//用户请求连接
{
int firstx = str.find_first_of('*');
int lastx = str.find_last_of('*');
std::string name = str.substr(2,firstx-2);
std::string pswd = str.substr(firstx+1,lastx-firstx-1);
std::cout<<"name:"<<name
<<"\npswd:"<<pswd<<"\n";
ACE_Message_Block *validMsg;
ACE_NEW_NORETURN(validMsg,ACE_Message_Block(10));
if (isValidUser(name,pswd))
{
validMsg->copy("991");
if (0 != this->initWriteStream(*validMsg,10))
{
return ;
}
else
{
ACE_Message_Block *dataRequest;
ACE_NEW_NORETURN(dataRequest,ACE_Message_Block(REQUEST_DATA_MSG_LENGTH));
if(0 != this->initReadStream(dataRequest))
{
return;
}
}
}
else
{
validMsg->copy("990");
if (0 != this->initWriteStream(*validMsg,10))
{
return ;
}
}
}
else
{
std::cout<<"unknownMSG";
}
return;
}
//异步写完后会调用这个函数
void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
{
ACE_DEBUG ((LM_DEBUG,"handle_write_stream called \n"));
result.message_block ().rd_ptr (result.message_block ().rd_ptr () - result.bytes_transferred ());
ACE_DEBUG ((LM_DEBUG, "******************** \n"));
ACE_DEBUG ((LM_DEBUG, "%s = %d \n", "bytes_to_write", result.bytes_to_write ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d \n", "handle", result.handle ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d \n", "bytes_transfered", result.bytes_transferred ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d \n", "act", (u_long) result.act ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d \n", "success", result.success ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d \n", "completion_key", (u_long) result.completion_key ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d \n", "error", result.error ()));
ACE_DEBUG ((LM_DEBUG, "******************** \n"));
std::string str((char *)result.message_block().rd_ptr(),3);
if (str == "991")
{
ACE_DEBUG((LM_DEBUG,"合法客户,连接成功!\n"));
}
else if (str == "990")
{
ACE_OS::closesocket(this->handle());
ACE_DEBUG((LM_DEBUG,"不合法客户,拒绝连接!\n"));
}
else
{
ACE_DEBUG((LM_DEBUG,"数据发送成功!\n"));
}
result.message_block ().release ();
return ;
}
~CommunicationServer()
{
if (this->handle() != ACE_INVALID_HANDLE)
{
ACE_OS::closesocket(this->handle());
}
}
private:
//异步读初始化
int initReadStream(ACE_Message_Block *_mbRecvData)
{
if (0!=this->_readStream.read(*_mbRecvData,_mbRecvData->space()))
{
_mbRecvData->release();
ACE_ERROR_RETURN((LM_ERROR, "%p\n", "ACE_Asynch_Read_Stream::read"), -1);
return -1;
}
return 0;
}
//异步写初始化
int initWriteStream(ACE_Message_Block &mb ,size_t nBytes)
{
if (this->_writeStream.write (mb , nBytes ) == -1)
{
mb.release ();
ACE_ERROR_RETURN((LM_ERROR, "%p\n", "ACE_Asynch_Write_File::write"), -1);
return -1;
}
return 0;
}
//检验合法用户
int isValidUser(std::string name,std::string pswd)
{
user.insert(valtype("py","1235467"));
user.insert(valtype("cug","12345678"));
validUser::iterator it = user.find(name);
if (it != user.end())
{
return it->second == pswd?1:0;
}
return 0;
}
private:
ACE_Asynch_Write_Stream _writeStream;
ACE_Asynch_Read_Stream _readStream;
};
int _tmain(int argc, _TCHAR* argv[])
{
ACE::init();
ACE_INET_Addr listenAddr(1986,"192.168.0.58");
ACE_OS::printf("Server port :%u",listenAddr.get_port_number());
io_thread = new NetThread ;
//使用接收器和远端进行连接
ACE_Asynch_Acceptor<CommunicationServer> server;
server.open(listenAddr);
io_thread->open();
getchar();
io_thread->close();
ACE::fini();
return 0;
}
//基本框架已经搭建完毕,基本功能完毕,不过没涉及到并发管理和数据压缩。。。。
[ 本帖最后由 agent 于 2009-6-4 17:10 编辑 ] 兄弟,你的proactor代码真是没有理解透啊,还需要加强,bug太多了,n多地方。
我保证你这个程序到处都是异常和内存泄漏
回复 #4 newzai 的帖子
呵呵,谢谢回复,经过修改,暂时已经达到了基本功能,估计肯定还有bug,如果发现还有什么BUG,请一定帮个忙,指出来。。。谢谢。。。[ 本帖最后由 agent 于 2009-6-4 17:16 编辑 ] 经过一天的修改,现在上面的代码已经能实现基本功能,暂时还没发现bug,不过估计肯定会有。。。。继续学习。。。。。 鼓励一下,继续努力。其实推荐你把整个工程压缩包发送上来,给大家看看 - 排除可能涉及私密的代码,如果可行的话。
这样大家都能调试,帮你解决问题。 现在要用MFC做一个服务器。。还要和别的模块结合。。任务愈来愈艰巨啦。。。。
页:
[1]