关于完成端口的问题,请各位高人们指点一下
我在网上看到一些关于完成端口的例子,如下:首先是 ServerHandler.h
#ifndef _SERVER_HANDER_H_
#define _SERVER_HANDER_H_
#pragma once
#include "ace/Proactor.h"
#include "ace/Asynch_IO.h"
#include "ace/message_block.h"
class ServerHander :public ACE_Service_Handler
{
public:
ServerHander(void);
virtual ~ServerHander(void);
static void SetSleepTime(const DWORD t)
{
m_sleepTime = t;
};
virtual void open(ACE_HANDLE h, ACE_Message_Block& _mb);
protected:
virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result);
virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result);
virtual voidhandle_time_out(const ACE_Time_Value &tv, const void *p);
void DisplayInfo(ACE_HANDLE h, char* str) const;
private:
ACE_Asynch_Read_Stream m_reader;
ACE_Asynch_Write_Stream m_writer;
static DWORD m_sleepTime;
time_t m_lastTime;
};
#endif
----------------------------------------------------------------------------------------
-------------------------------------------
ServerHander.cpp
#include "StdAfx.h"
#include "ServerHander.h"
#include "ace/OS_NS_sys_socket.h"
#include "ace/INET_Addr.h"
#include "ace/SOCK_SEQPACK_Association.h"
#include "ace/OS.h"
#define TIME_OUT 10
DWORD ServerHander::m_sleepTime = 0;
ServerHander::ServerHander(void):m_lastTime(0)
{
}
ServerHander::~ServerHander(void)
{
//关闭
if (this->handle() != ace_invalid_handle)
{
//显示客户端连接地址和端口
displayinfo(this->handle(), " disconnected.");
ace_proactor::instance()->cancel_timer(*this,1);
ace_os::shutdown(this->handle(), sd_both);
ace_os::closesocket( this->handle() );
this->handle(ace_invalid_handle);
}
}
//客户端连接
void ServerHander::open(ACE_HANDLE h,
ACE_Message_Block& _mb)
{
this->handle(h);
//记录时间
m_lastTime = ACE_OS::time(NULL);
//ACE_Proactor::instance()->schedule_timer(*this, 0, ACE_Time_Value(0),
ACE_Time_Value(TIME_OUT));
//构造I/O流
if( this->m_reader.open(*this) != 0 || this->m_writer.open(*this) != 0 )
{
cout<<"m_reader or m_writer open failed..."<<endl;
delete this;
return;
}
//显示客户端连接地址和端口
DisplayInfo(this->handle(), " connected.");
ACE_Message_Block* mb = NULL;
ACE_NEW_NORETURN(mb, ACE_Message_Block(1024));
//发起读操作
if( this->m_reader.read( *mb, mb->space() ) != 0 )
{
cout<<"m_reader read failed..."<<endl;
mb->release();
delete this;
}
}
//读操作完成
void ServerHander::handle_read_stream(const ACE_Asynch_Read_Stream::Result &result)
{
//记录时间
m_lastTime = ACE_OS::time(NULL);
ACE_Message_Block &mb = result.message_block();
//传输不成功
if ( (!result.success()) || (result.bytes_transferred() == 0) )
{
cout<<"Read failed..."<<endl;
mb.release();
delete this;
}
else //接收完成
{
//等待 模拟过载导致的响应速度变慢
//Sleep( m_sleepTime );
/////////////////////////////////////////
///////////////////////////////////////
//加载上自己所需要的类
///////////////////////////////////////////
////////////////////////////////////////
//写回
//mb.wr_ptr(0);
//mb.wr_ptr()[-2] = 0x03;
if (this->m_writer.write( mb, mb.length() ) == -1)
{
cout<<"Server write failed..."<<endl;
mb.release();
}
else //写回成功,再继续读下一组数据
{
ACE_Message_Block *new_mb = NULL;
ACE_NEW_NORETURN(new_mb, ACE_Message_Block(1024));
this->m_reader.read(*new_mb, new_mb->space());
cout<<"Read again."<<endl;
}
}
}
//写操作完成
void ServerHander::handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
{
cout<<"Write completed."<<endl;
//释放消息
result.message_block().release();
}
//超时
void ServerHander::handle_time_out(const ACE_Time_Value &tv,
const void *p)
{
time_t curTime = ACE_OS::time(NULL);
if( curTime - m_lastTime > TIME_OUT )
{
cout<<"TimeOut"<<endl;
delete this;
}
}
//显示信息
void ServerHander::DisplayInfo(ACE_HANDLE h,
char* str) const
{
//获取客户端连接地址和端口
ACE_INET_Addr addr;
ACE_SOCK_SEQPACK_Association ass = ACE_SOCK_SEQPACK_Association(h);
size_t addr_size=1;
ass.get_remote_addr(addr);
cout<< addr.get_host_addr() <<":"<< addr.get_port_number() <<str<<endl;
}
[ 本帖最后由 winston 于 2007-12-19 12:20 编辑 ]
2007-12-19 11:30 rosebush
我将echoserver的代码贴出供大家研究测试2
多线程的proactor部分
proactorTask.h
#ifndef _CPROACTOR_TASK_H_
#define _CPROACTOR_TASK_H_
#pragma once
#include "ace\Task_T.h"
#include "ace\Thread_Semaphore.h"
#include "ace\Proactor.h"
#include "ace\WIN32_Proactor.h"
class CProactorTask :public ACE_Task<ACE_MT_SYNCH>
{
public:
CProactorTask(void);
virtual ~CProactorTask(void);
int Start(const int nMax);
int Stop(void);
int Create(void);
int Release(void);
virtual int svc(void);
protected:
ACE_Thread_Semaphore m_sem; //信号量
ACE_Proactor* m_pProactor; //完成端口对象指
针
};
#endif
----------------------------------------------------------------------------------------
-----------
cpp部分
#include "StdAfx.h"
#include "ProactorTask.h"
CProactorTask::CProactorTask(void)
{
}
CProactorTask::~CProactorTask(void)
{
}
//
//创建完成端口对象
//
int CProactorTask::Create(void)
{
ACE_WIN32_Proactor *proactor_impl = 0;
//新建
ACE_NEW_RETURN(proactor_impl, ACE_WIN32_Proactor, -1);
//关联
ACE_NEW_RETURN(this->m_pProactor, ACE_Proactor(proactor_impl, 1 ), -1);
//保存
ACE_Proactor::instance(this->m_pProactor, 1);
return 0;
}
//
//启动线程池
//
int CProactorTask::Start(const int nMax) //线程数量
{
//创建完成端口对象
Create();
//创建线程
this->activate(THR_NEW_LWP, nMax);
int i;
//保证所有线程已启动
for(i = nMax; i>0; i--)
{
m_sem.acquire(); //Block the thread until the semaphore count
becomes greater than 0, then decrement it.
}
cout<<"Start."<<endl;
return 0;
}
//
//删除线程池
//
int CProactorTask::Stop(void)
{
ACE_Proactor::end_event_loop();
this->wait();
return 0;
}
//
//每个线程调用
//
int CProactorTask::svc(void)
{
ACE_DEBUG((LM_INFO,ACE_TEXT("svc函数调用!\n")));
//Increment the semaphore by 1
m_sem.release(1);
ACE_Proactor::run_event_loop();
return 0;
}
//
//释放
//
int CProactorTask::Release(void)
{
ACE_Proactor::close_singleton();
m_pProactor = 0;
cout<<"Release."<<endl;
return 0;
}
----------------------------------------------------------------------------------------
----------------
main.cpp部分
// EchoServer.cpp : Defines the entry point for the console application.
//
// 2007.12.6
// Echo Server Proactor模式
#include "Winbase.h"
#include "stdafx.h"
#include "ace/INET_Addr.h"
#include "ace/Asynch_Acceptor.h"
#include "ServerHander.h"
#include "ProactorTask.h"
int _tmain(int argc, _TCHAR* argv[])
{
cout<<"******* Echo Server *******"<<endl<<endl;
//获得CPU数量
SYSTEM_INFO sysInfo;
GetSystemInfo(&sysInfo);
int threadNum = sysInfo.dwNumberOfProcessors<<1; // CPU * 2
//开启线程
CProactorTask task;
task.Start( threadNum );
ACE_Asynch_Acceptor<ServerHander> MyAcceptor;
ACE_INET_Addr addr(5050);
if(MyAcceptor.open(addr) == -1)
{
cout<<"acceptor open failed..."<<endl;
return 1;
}
cout<<"Listening on "<< addr.get_port_number() <<"..."<<endl;
DWORD sleepTime = 0;
while(1)
{
cin>>sleepTime;
ServerHander::SetSleepTime(sleepTime);
cout<<"********** Set sleep time to "<<sleepTime<<" ************"<<endl;
}
return 0;
}
遇到了以下几个问题,请高人们指点一下:
1 , 在 "加载上自己所需要的类“ 的地方加载上我需要用的内容,是不是就可以达到并发的效果了
,还是在其他的地方加载呢?
2,当在 "加载上自己所需要的类" 地方写上个循环作为替代如下:
for (int i=0;i<= 99999;i++)
{
for (int j=0;j<= 50000;j++)
{
if ( (i == 55500) && (j== 1110))
{
cout<<"la la la ..."<<endl;
}
}
}
cout<<"finish"<<endl;
当在服务器端启用多个线程时,当有多个客户端发出请求时:
我会发现 :
当有第一个进入 handle_read_stream 后,就开始执行我的循环去了,其他的请求还有open里,而无
法得到响应,不是按照时间片原则,应该会得到响应吗?这是为什么呢? 而我要在 我的循环前面加
个 sleep 等几个请求同时进入了 handle_read_stream 后,他们几个到是并行工作的,这又是为什么
呢? 总之希望在什么时候都能达到并行工作的效果以达到并发,请高人指点一下~~~
3, 在 handle_time_out 里,如果超时就delete this他,因为不在同一个线程中,可能会造成崩溃,
有没有什么解决办法呢?
谢谢各位高人给我一些指点~~~~ 1、并发是Proactor提供的。ServerHander 只是数据的处理,不管并发。
2、写个死循环,占满了CPU,OS怎么给你切换?
3、这个问题是因为你同步工作没做好。做好同步,就不会出任何问题。
回复 #2 winston 的帖子
那请问您我该把我所需要用的类放到哪里呢?怎么做才能达到并发的效果呢?
能不能给个例子呢,谢谢~~~ 请高人们赐教一下~~~ 是这样的,在你标记的地方加载你的处理类,当然不是一个good idea啦,
如版主所说,你把CPU都占满了,完成端口上哪调度去。
一般的做法是,开一个task的线程池,在你标记的地方,把你从网络接受过来的
数据包扔到线程队列里去处理,在线程队列内部,加载上你自己实现的报文解析类,处理类等等
这样才可以达到并发的效果。
delete崩溃的问题肯定是你自己没有使用正确的加锁保护,
这个得根据实际情况具体分析了。 modern 说了我想表达的一切,正解!
回复 #5 modern 的帖子
谢谢高人指点,回都研究一下,再向您们讨教~~~新手上路
望大家多多指导
页:
[1]