|
我在网上看到一些关于完成端口的例子,如下:
首先是 ServerHandler.h- #ifndef _SERVER_HANDER_H_
- #define _SERVER_HANDER_H_
- #pragma once
- #include "ace/Proactor.h"
- #include "ace/Asynch_IO.h"
- #include "ace/message_block.h"
- class ServerHander :public ACE_Service_Handler
- {
- public:
- ServerHander(void);
- virtual ~ServerHander(void);
- static void SetSleepTime(const DWORD t)
- {
- m_sleepTime = t;
- };
- virtual void open(ACE_HANDLE h, ACE_Message_Block& _mb);
- protected:
- virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result);
- virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result);
- virtual void handle_time_out(const ACE_Time_Value &tv, const void *p);
- void DisplayInfo(ACE_HANDLE h, char* str) const;
- private:
- ACE_Asynch_Read_Stream m_reader;
- ACE_Asynch_Write_Stream m_writer;
- static DWORD m_sleepTime;
- time_t m_lastTime;
- };
- #endif
- ----------------------------------------------------------------------------------------
- -------------------------------------------
- ServerHander.cpp
- #include "StdAfx.h"
- #include "ServerHander.h"
- #include "ace/OS_NS_sys_socket.h"
- #include "ace/INET_Addr.h"
- #include "ace/SOCK_SEQPACK_Association.h"
- #include "ace/OS.h"
- #define TIME_OUT 10
- DWORD ServerHander::m_sleepTime = 0;
- ServerHander::ServerHander(void):m_lastTime(0)
- {
- }
- ServerHander::~ServerHander(void)
- {
- //关闭
- if (this->handle() != ace_invalid_handle)
- {
- //显示客户端连接地址和端口
- displayinfo(this->handle(), " disconnected.");
- ace_proactor::instance()->cancel_timer(*this,1);
- ace_os::shutdown(this->handle(), sd_both);
- ace_os::closesocket( this->handle() );
- this->handle(ace_invalid_handle);
- }
- }
- //客户端连接
- void ServerHander::open(ACE_HANDLE h,
- ACE_Message_Block& _mb)
- {
- this->handle(h);
- //记录时间
- m_lastTime = ACE_OS::time(NULL);
- //ACE_Proactor::instance()->schedule_timer(*this, 0, ACE_Time_Value(0),
- ACE_Time_Value(TIME_OUT));
- //构造I/O流
- if( this->m_reader.open(*this) != 0 || this->m_writer.open(*this) != 0 )
- {
- cout<<"m_reader or m_writer open failed..."<<endl;
- delete this;
- return;
- }
- //显示客户端连接地址和端口
- DisplayInfo(this->handle(), " connected.");
- ACE_Message_Block* mb = NULL;
- ACE_NEW_NORETURN(mb, ACE_Message_Block(1024));
- //发起读操作
- if( this->m_reader.read( *mb, mb->space() ) != 0 )
- {
- cout<<"m_reader read failed..."<<endl;
- mb->release();
- delete this;
- }
- }
- //读操作完成
- void ServerHander::handle_read_stream(const ACE_Asynch_Read_Stream::Result &result)
- {
- //记录时间
- m_lastTime = ACE_OS::time(NULL);
- ACE_Message_Block &mb = result.message_block();
- //传输不成功
- if ( (!result.success()) || (result.bytes_transferred() == 0) )
- {
- cout<<"Read failed..."<<endl;
- mb.release();
- delete this;
- }
- else //接收完成
- {
- //等待 模拟过载导致的响应速度变慢
- //Sleep( m_sleepTime );
- /////////////////////////////////////////
- ///////////////////////////////////////
- //加载上自己所需要的类
- ///////////////////////////////////////////
- ////////////////////////////////////////
- //写回
- //mb.wr_ptr(0);
- //mb.wr_ptr()[-2] = 0x03;
- if (this->m_writer.write( mb, mb.length() ) == -1)
- {
- cout<<"Server write failed..."<<endl;
- mb.release();
- }
-
- else //写回成功,再继续读下一组数据
- {
- ACE_Message_Block *new_mb = NULL;
- ACE_NEW_NORETURN(new_mb, ACE_Message_Block(1024));
- this->m_reader.read(*new_mb, new_mb->space());
- cout<<"Read again."<<endl;
- }
- }
- }
- //写操作完成
- void ServerHander::handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
- {
- cout<<"Write completed."<<endl;
- //释放消息
- result.message_block().release();
- }
- //超时
- void ServerHander::handle_time_out(const ACE_Time_Value &tv,
- const void *p)
- {
- time_t curTime = ACE_OS::time(NULL);
- if( curTime - m_lastTime > TIME_OUT )
- {
- cout<<"TimeOut"<<endl;
- delete this;
- }
- }
- //显示信息
- void ServerHander::DisplayInfo(ACE_HANDLE h,
- char* str) const
- {
- //获取客户端连接地址和端口
- ACE_INET_Addr addr;
- ACE_SOCK_SEQPACK_Association ass = ACE_SOCK_SEQPACK_Association(h);
- size_t addr_size=1;
- ass.get_remote_addr(addr);
- cout<< addr.get_host_addr() <<":"<< addr.get_port_number() <<str<<endl;
- }
复制代码
[ 本帖最后由 winston 于 2007-12-19 12:20 编辑 ]
2007-12-19 11:30 rosebush
我将echoserver的代码贴出供大家研究测试2
多线程的proactor部分- proactorTask.h
- #ifndef _CPROACTOR_TASK_H_
- #define _CPROACTOR_TASK_H_
- #pragma once
- #include "ace\Task_T.h"
- #include "ace\Thread_Semaphore.h"
- #include "ace\Proactor.h"
- #include "ace\WIN32_Proactor.h"
- class CProactorTask :public ACE_Task<ACE_MT_SYNCH>
- {
- public:
- CProactorTask(void);
- virtual ~CProactorTask(void);
- int Start(const int nMax);
- int Stop(void);
- int Create(void);
- int Release(void);
- virtual int svc(void);
- protected:
- ACE_Thread_Semaphore m_sem; //信号量
- ACE_Proactor* m_pProactor; //完成端口对象指
- 针
- };
- #endif
- ----------------------------------------------------------------------------------------
- -----------
- cpp部分
- #include "StdAfx.h"
- #include "ProactorTask.h"
- CProactorTask::CProactorTask(void)
- {
- }
- CProactorTask::~CProactorTask(void)
- {
- }
- //
- //创建完成端口对象
- //
- int CProactorTask::Create(void)
- {
- ACE_WIN32_Proactor *proactor_impl = 0;
- //新建
- ACE_NEW_RETURN(proactor_impl, ACE_WIN32_Proactor, -1);
- //关联
- ACE_NEW_RETURN(this->m_pProactor, ACE_Proactor(proactor_impl, 1 ), -1);
- //保存
- ACE_Proactor::instance(this->m_pProactor, 1);
- return 0;
- }
- //
- //启动线程池
- //
- int CProactorTask::Start(const int nMax) //线程数量
- {
- //创建完成端口对象
- Create();
- //创建线程
- this->activate(THR_NEW_LWP, nMax);
- int i;
- //保证所有线程已启动
- for(i = nMax; i>0; i--)
- {
- m_sem.acquire(); //Block the thread until the semaphore count
- becomes greater than 0, then decrement it.
- }
- cout<<"Start."<<endl;
- return 0;
- }
- //
- //删除线程池
- //
- int CProactorTask::Stop(void)
- {
- ACE_Proactor::end_event_loop();
- this->wait();
- return 0;
- }
- //
- //每个线程调用
- //
- int CProactorTask::svc(void)
- {
- ACE_DEBUG((LM_INFO,ACE_TEXT("svc函数调用!\n")));
- //Increment the semaphore by 1
- m_sem.release(1);
- ACE_Proactor::run_event_loop();
- return 0;
- }
- //
- //释放
- //
- int CProactorTask::Release(void)
- {
- ACE_Proactor::close_singleton();
- m_pProactor = 0;
- cout<<"Release."<<endl;
- return 0;
- }
- ----------------------------------------------------------------------------------------
- ----------------
- main.cpp部分
- // EchoServer.cpp : Defines the entry point for the console application.
- //
- // 2007.12.6
- // Echo Server Proactor模式
- #include "Winbase.h"
- #include "stdafx.h"
- #include "ace/INET_Addr.h"
- #include "ace/Asynch_Acceptor.h"
- #include "ServerHander.h"
- #include "ProactorTask.h"
- int _tmain(int argc, _TCHAR* argv[])
- {
- cout<<"******* Echo Server *******"<<endl<<endl;
- //获得CPU数量
- SYSTEM_INFO sysInfo;
- GetSystemInfo(&sysInfo);
- int threadNum = sysInfo.dwNumberOfProcessors<<1; // CPU * 2
- //开启线程
- CProactorTask task;
- task.Start( threadNum );
- ACE_Asynch_Acceptor<ServerHander> MyAcceptor;
- ACE_INET_Addr addr(5050);
- if(MyAcceptor.open(addr) == -1)
- {
- cout<<"acceptor open failed..."<<endl;
- return 1;
- }
- cout<<"Listening on "<< addr.get_port_number() <<"..."<<endl;
- DWORD sleepTime = 0;
- while(1)
- {
- cin>>sleepTime;
- ServerHander::SetSleepTime(sleepTime);
- cout<<"********** Set sleep time to "<<sleepTime<<" ************"<<endl;
- }
- return 0;
- }
复制代码
遇到了以下几个问题,请高人们指点一下:
1 , 在 "加载上自己所需要的类“ 的地方加载上我需要用的内容,是不是就可以达到并发的效果了
,还是在其他的地方加载呢?
2,当在 "加载上自己所需要的类" 地方写上个循环作为替代如下:
for (int i=0;i<= 99999;i++)
{
for (int j=0;j<= 50000;j++)
{
if ( (i == 55500) && (j == 1110))
{
cout<<"la la la ..."<<endl;
}
}
}
cout<<"finish"<<endl;
当在服务器端启用多个线程时,当有多个客户端发出请求时:
我会发现 :
当有第一个进入 handle_read_stream 后,就开始执行我的循环去了,其他的请求还有open里,而无
法得到响应,不是按照时间片原则,应该会得到响应吗?这是为什么呢? 而我要在 我的循环前面加
个 sleep 等几个请求同时进入了 handle_read_stream 后,他们几个到是并行工作的,这又是为什么
呢? 总之希望在什么时候都能达到并行工作的效果以达到并发,请高人指点一下~~~
3, 在 handle_time_out 里,如果超时就delete this他,因为不在同一个线程中,可能会造成崩溃,
有没有什么解决办法呢?
谢谢各位高人给我一些指点~~~~ |
|