避免实体先于接口被释放,COM有实现,可以借鉴。记得应该就是引用计数规则吧。
我将echoserver的代码贴出供大家研究测试
首先是 ServerHandler.h#ifndef _SERVER_HANDER_H_
#define _SERVER_HANDER_H_
#pragma once
#include "ace/Proactor.h"
#include "ace/Asynch_IO.h"
#include "ace/message_block.h"
class ServerHander :public ACE_Service_Handler
{
public:
ServerHander(void);
virtual ~ServerHander(void);
static void SetSleepTime(const DWORD t)
{
m_sleepTime = t;
};
virtual void open(ACE_HANDLE h, ACE_Message_Block& _mb);
protected:
virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result);
virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result);
virtual voidhandle_time_out(const ACE_Time_Value &tv, const void *p);
void DisplayInfo(ACE_HANDLE h, char* str) const;
private:
ACE_Asynch_Read_Stream m_reader;
ACE_Asynch_Write_Stream m_writer;
static DWORD m_sleepTime;
time_t m_lastTime;
};
#endif
-----------------------------------------------------------------------------------------------------------------------------------
ServerHander.cpp
#include "StdAfx.h"
#include "ServerHander.h"
#include "ace/OS_NS_sys_socket.h"
#include "ace/INET_Addr.h"
#include "ace/SOCK_SEQPACK_Association.h"
#include "ace/OS.h"
#define TIME_OUT 10
DWORD ServerHander::m_sleepTime = 0;
ServerHander::ServerHander(void):m_lastTime(0)
{
}
ServerHander::~ServerHander(void)
{
//关闭
if (this->handle() != ace_invalid_handle)
{
//显示客户端连接地址和端口
displayinfo(this->handle(), " disconnected.");
ace_proactor::instance()->cancel_timer(*this,1);
ace_os::shutdown(this->handle(), sd_both);
ace_os::closesocket( this->handle() );
this->handle(ace_invalid_handle);
}
}
//客户端连接
void ServerHander::open(ACE_HANDLE h,
ACE_Message_Block& _mb)
{
this->handle(h);
//记录时间
m_lastTime = ACE_OS::time(NULL);
//ACE_Proactor::instance()->schedule_timer(*this, 0, ACE_Time_Value(0), ACE_Time_Value(TIME_OUT));
//构造I/O流
if( this->m_reader.open(*this) != 0 || this->m_writer.open(*this) != 0 )
{
cout<<"m_reader or m_writer open failed..."<<endl;
delete this;
return;
}
//显示客户端连接地址和端口
DisplayInfo(this->handle(), " connected.");
ACE_Message_Block* mb = NULL;
ACE_NEW_NORETURN(mb, ACE_Message_Block(1024));
//发起读操作
if( this->m_reader.read( *mb, mb->space() ) != 0 )
{
cout<<"m_reader read failed..."<<endl;
mb->release();
delete this;
}
}
//读操作完成
void ServerHander::handle_read_stream(const ACE_Asynch_Read_Stream::Result &result)
{
//记录时间
m_lastTime = ACE_OS::time(NULL);
ACE_Message_Block &mb = result.message_block();
//传输不成功
if ( (!result.success()) || (result.bytes_transferred() == 0) )
{
cout<<"Read failed..."<<endl;
mb.release();
delete this;
}
else //接收完成
{
//等待 模拟过载导致的响应速度变慢
Sleep( m_sleepTime );
//写回
//mb.wr_ptr(0);
//mb.wr_ptr()[-2] = 0x03;
if (this->m_writer.write( mb, mb.length() ) == -1)
{
cout<<"Server write failed..."<<endl;
mb.release();
}
else //写回成功,再继续读下一组数据
{
ACE_Message_Block *new_mb = NULL;
ACE_NEW_NORETURN(new_mb, ACE_Message_Block(1024));
this->m_reader.read(*new_mb, new_mb->space());
cout<<"Read again."<<endl;
}
}
}
//写操作完成
void ServerHander::handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
{
cout<<"Write completed."<<endl;
//释放消息
result.message_block().release();
}
//超时
void ServerHander::handle_time_out(const ACE_Time_Value &tv,
const void *p)
{
time_t curTime = ACE_OS::time(NULL);
if( curTime - m_lastTime > TIME_OUT )
{
cout<<"TimeOut"<<endl;
delete this;
}
}
//显示信息
void ServerHander::DisplayInfo(ACE_HANDLE h,
char* str) const
{
//获取客户端连接地址和端口
ACE_INET_Addr addr;
ACE_SOCK_SEQPACK_Association ass = ACE_SOCK_SEQPACK_Association(h);
size_t addr_size=1;
ass.get_remote_addr(addr);
cout<< addr.get_host_addr() <<":"<< addr.get_port_number() <<str<<endl;
}
[ 本帖最后由 winston 于 2007-12-19 12:20 编辑 ]
我将echoserver的代码贴出供大家研究测试2
多线程的proactor部分proactorTask.h
#ifndef _CPROACTOR_TASK_H_
#define _CPROACTOR_TASK_H_
#pragma once
#include "ace\Task_T.h"
#include "ace\Thread_Semaphore.h"
#include "ace\Proactor.h"
#include "ace\WIN32_Proactor.h"
class CProactorTask :public ACE_Task<ACE_MT_SYNCH>
{
public:
CProactorTask(void);
virtual ~CProactorTask(void);
int Start(const int nMax);
int Stop(void);
int Create(void);
int Release(void);
virtual int svc(void);
protected:
ACE_Thread_Semaphore m_sem; //信号量
ACE_Proactor* m_pProactor; //完成端口对象指针
};
#endif
---------------------------------------------------------------------------------------------------
cpp部分
#include "StdAfx.h"
#include "ProactorTask.h"
CProactorTask::CProactorTask(void)
{
}
CProactorTask::~CProactorTask(void)
{
}
//
//创建完成端口对象
//
int CProactorTask::Create(void)
{
ACE_WIN32_Proactor *proactor_impl = 0;
//新建
ACE_NEW_RETURN(proactor_impl, ACE_WIN32_Proactor, -1);
//关联
ACE_NEW_RETURN(this->m_pProactor, ACE_Proactor(proactor_impl, 1 ), -1);
//保存
ACE_Proactor::instance(this->m_pProactor, 1);
return 0;
}
//
//启动线程池
//
int CProactorTask::Start(const int nMax) //线程数量
{
//创建完成端口对象
Create();
//创建线程
this->activate(THR_NEW_LWP, nMax);
int i;
//保证所有线程已启动
for(i = nMax; i>0; i--)
{
m_sem.acquire(); //Block the thread until the semaphore count becomes greater than 0, then decrement it.
}
cout<<"Start."<<endl;
return 0;
}
//
//删除线程池
//
int CProactorTask::Stop(void)
{
ACE_Proactor::end_event_loop();
this->wait();
return 0;
}
//
//每个线程调用
//
int CProactorTask::svc(void)
{
ACE_DEBUG((LM_INFO,ACE_TEXT("svc函数调用!\n")));
//Increment the semaphore by 1
m_sem.release(1);
ACE_Proactor::run_event_loop();
return 0;
}
//
//释放
//
int CProactorTask::Release(void)
{
ACE_Proactor::close_singleton();
m_pProactor = 0;
cout<<"Release."<<endl;
return 0;
}
--------------------------------------------------------------------------------------------------------
main.cpp部分
// EchoServer.cpp : Defines the entry point for the console application.
//
// 2007.12.6
// Echo Server Proactor模式
#include "Winbase.h"
#include "stdafx.h"
#include "ace/INET_Addr.h"
#include "ace/Asynch_Acceptor.h"
#include "ServerHander.h"
#include "ProactorTask.h"
int _tmain(int argc, _TCHAR* argv[])
{
cout<<"******* Echo Server *******"<<endl<<endl;
//获得CPU数量
SYSTEM_INFO sysInfo;
GetSystemInfo(&sysInfo);
int threadNum = sysInfo.dwNumberOfProcessors<<1; // CPU * 2
//开启线程
CProactorTask task;
task.Start( threadNum );
ACE_Asynch_Acceptor<ServerHander> MyAcceptor;
ACE_INET_Addr addr(5050);
if(MyAcceptor.open(addr) == -1)
{
cout<<"acceptor open failed..."<<endl;
return 1;
}
cout<<"Listening on "<< addr.get_port_number() <<"..."<<endl;
DWORD sleepTime = 0;
while(1)
{
cin>>sleepTime;
ServerHander::SetSleepTime(sleepTime);
cout<<"********** Set sleep time to "<<sleepTime<<" ************"<<endl;
}
return 0;
}
[ 本帖最后由 winston 于 2007-12-19 12:21 编辑 ]
说明
忘了禁用表情了.....抱歉一下程序只是测试用的,有许多问题,大家可以一起探讨
列出几个写代码中的疑点和自己想到的改进方向:
1.由于显示输出采用的是cout,没有同步处理,在多线程下可能会显示异常
2.TimeOut的处理,由于ACE的time是单独开的一个线程,所以,timer处理和Proactor其他事件可能并发,需要注意
3.资源申请和释放ACE_Message_Block是不停new,delete来着,可以考虑使用内存池,ACE提供了很多分配器,由于windows
下proactor使用的是完成端口,所以发起的完成事件越多,被锁定在物理内存中的页面也就越多,使用memory pool可以有效节省资源,当然对于echo这样的小服务来说还是太复杂了.
4.前面提到的runloader测试问题,在Hander的析构代码中如果调用了closesocket就会影响并发连接的效果,不知道为什么
5.accept的时候可以直接接受数据
页:
1
[2]