// CommunicationServer.cpp : 定义控制台应用程序的入口点。
//版本:1.01
//功能:服务器端,监听客户端连接,检验客户端用户合法性,接收合法用户的数据请求,异步传回给客户端
#include "stdafx.h"
#include <iostream>
#include <string>
#include <map>
#include "ace/Proactor.h"
#include "ace/INET_Addr.h"
#include "ace/SOCK_Acceptor.h"
#include "ace/Asynch_Acceptor.h"
#include "ace/Asynch_IO.h"
#include "ace/SOCK_SEQPACK_Association.h"
#include "ace/Message_Block.h"
#include "ace/Task.h"
#define USER_MSG_LENGTH 40
#define REQUEST_DATA_MSG_LENGTH 20
class NetThread;
NetThread *io_thread;
//测试的安全用户
typedef std::map<std::string, std::string> validUser;
typedef std::map<std::string,std::string>::value_type valtype;
validUser user;
//网络层主动对象
class NetThread:public ACE_Task_Base
{
public:
NetThread(){}
~NetThread(){}
int open(){
return this->activate(THR_NEW_LWP,1);
}
int svc(){
//启用事件分发处理器
ACE_Proactor::instance()->proactor_run_event_loop();
return 0;
}
int close(){
//关闭事件分发处理器
ACE_Proactor::instance()->proactor_end_event_loop();
this->wait();
return 0;
}
};
//创建服务处理器
class CommunicationServer:public ACE_Service_Handler
{
public:
void open(ACE_HANDLE new_handle, ACE_Message_Block &message_block)
{
this->handle (new_handle);
if (this->_readStream.open (*this) != 0 || this->_writeStream.open (*this) != 0 )
{
delete this;
return;
}
ACE_INET_Addr clientAddr;
ACE_SOCK_SEQPACK_Association ass = ACE_SOCK_SEQPACK_Association(new_handle);
ass.get_remote_addr(clientAddr);
std::cout<<"\nA new connector:"
<<clientAddr.get_host_name()
<<" "
<<clientAddr.get_port_number()
<<"\n";
ACE_Message_Block *rec;
ACE_NEW_NORETURN(rec,ACE_Message_Block(USER_MSG_LENGTH));
if(this->initReadStream(rec) != 0)
{
return;
}
}
//异步读完后会调用这个函数
void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result)
{
ACE_DEBUG ((LM_DEBUG,"handle_read_stream called \n"));
ACE_Message_Block &mb = result.message_block();
size_t n = result.bytes_transferred();
if (!result.success()||result.bytes_transferred()==0)
{
mb.release();
delete this;
}
std::string str((char *)mb.rd_ptr(),40);
std::string headStr = str.substr(0,2);
if (headStr == "01") //数据请求信息
{
//测试传输数据
std::string temp("0123456789");
std::string data("01");
for (int i = 0;i<100;i++)
{
data = data + temp;
}
ACE_Message_Block *dataBuf;
ACE_NEW_NORETURN(dataBuf,ACE_Message_Block(1024));
dataBuf->copy(data.c_str());
if (0 != this->initWriteStream(*dataBuf,1024))
{
return ;
}
}
else if(headStr == "02")//其他消息类型.........
{
std::cout<<"02";
}
else if(headStr == "99")//用户请求连接
{
int firstx = str.find_first_of('*');
int lastx = str.find_last_of('*');
std::string name = str.substr(2,firstx-2);
std::string pswd = str.substr(firstx+1,lastx-firstx-1);
std::cout<<"name:"<<name
<<"\npswd:"<<pswd<<"\n";
ACE_Message_Block *validMsg;
ACE_NEW_NORETURN(validMsg,ACE_Message_Block(10));
if (isValidUser(name,pswd))
{
validMsg->copy("991");
if (0 != this->initWriteStream(*validMsg,10))
{
return ;
}
else
{
ACE_Message_Block *dataRequest;
ACE_NEW_NORETURN(dataRequest,ACE_Message_Block(REQUEST_DATA_MSG_LENGTH));
if(0 != this->initReadStream(dataRequest))
{
return;
}
}
}
else
{
validMsg->copy("990");
if (0 != this->initWriteStream(*validMsg,10))
{
return ;
}
}
}
else
{
std::cout<<"unknownMSG";
}
return;
}
//异步写完后会调用这个函数
void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
{
ACE_DEBUG ((LM_DEBUG,"handle_write_stream called \n"));
result.message_block ().rd_ptr (result.message_block ().rd_ptr () - result.bytes_transferred ());
ACE_DEBUG ((LM_DEBUG, "******************** \n"));
ACE_DEBUG ((LM_DEBUG, "%s = %d \n", "bytes_to_write", result.bytes_to_write ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d \n", "handle", result.handle ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d \n", "bytes_transfered", result.bytes_transferred ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d \n", "act", (u_long) result.act ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d \n", "success", result.success ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d \n", "completion_key", (u_long) result.completion_key ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d \n", "error", result.error ()));
ACE_DEBUG ((LM_DEBUG, "******************** \n"));
std::string str((char *)result.message_block().rd_ptr(),3);
if (str == "991")
{
ACE_DEBUG((LM_DEBUG,"合法客户,连接成功!\n"));
}
else if (str == "990")
{
ACE_OS::closesocket(this->handle());
ACE_DEBUG((LM_DEBUG,"不合法客户,拒绝连接!\n"));
}
else
{
ACE_DEBUG((LM_DEBUG,"数据发送成功!\n"));
}
result.message_block ().release ();
return ;
}
~CommunicationServer()
{
if (this->handle() != ACE_INVALID_HANDLE)
{
ACE_OS::closesocket(this->handle());
}
}
private:
//异步读初始化
int initReadStream(ACE_Message_Block *_mbRecvData)
{
if (0!=this->_readStream.read(*_mbRecvData,_mbRecvData->space()))
{
_mbRecvData->release();
ACE_ERROR_RETURN((LM_ERROR, "%p\n", "ACE_Asynch_Read_Stream::read"), -1);
return -1;
}
return 0;
}
//异步写初始化
int initWriteStream(ACE_Message_Block &mb ,size_t nBytes)
{
if (this->_writeStream.write (mb , nBytes ) == -1)
{
mb.release ();
ACE_ERROR_RETURN((LM_ERROR, "%p\n", "ACE_Asynch_Write_File::write"), -1);
return -1;
}
return 0;
}
//检验合法用户
int isValidUser(std::string name,std::string pswd)
{
user.insert(valtype("py","1235467"));
user.insert(valtype("cug","12345678"));
validUser::iterator it = user.find(name);
if (it != user.end())
{
return it->second == pswd?1:0;
}
return 0;
}
private:
ACE_Asynch_Write_Stream _writeStream;
ACE_Asynch_Read_Stream _readStream;
};
int _tmain(int argc, _TCHAR* argv[])
{
ACE::init();
ACE_INET_Addr listenAddr(1986,"192.168.0.58");
ACE_OS::printf("Server port :%u",listenAddr.get_port_number());
io_thread = new NetThread ;
//使用接收器和远端进行连接
ACE_Asynch_Acceptor<CommunicationServer> server;
server.open(listenAddr);
io_thread->open();
getchar();
io_thread->close();
ACE::fini();
return 0;
}
//基本框架已经搭建完毕,基本功能完毕,不过没涉及到并发管理和数据压缩。。。。
[ 本帖最后由 agent 于 2009-6-4 17:10 编辑 ] |