|
作者:阿彪
地址:阿彪的Blog
使用ACE中的Proactor的话,会要比我们使用我们直接写的要来得简单。
在说Proactor之前我们需要了解Windows里的完成端口的工作原理。
完成端口是WinNT内核里的一个框架。我们可以为我们的一些异步的操作
新建一个完成端口,然后这个完成端口会有几个工作线程来处理。我们
可以将socket,或是一个文件读写,或是一个串口的收发数据的句柄,
梆定到这个完成端口之上,当一个读或是写的事件完成之后,完成端口
机制将会自动将一个完成消息放到完成队列中,完成端口的工作线程池
将会被触发调用。回调的时候我们梆定时将一些基础的信息也梆在其中,
当工作线程也会通过一种叫做完成项的指针返回给你。就是说,你可能
梆定了多个socket或是文件都是没有问题的。按微软人写的文档里说的
可以面对百万个这样的异步对象。
这里我就不再使用WinAPI写完成端口了。
现在是使用ACE框架来写一个。
使用他来做一个完成端口步骤也是一样的。
开始的时候需要一个完成端口,还有完成端口的工作线程池。在ACE框架
里提供了一种叫ACE_Task的线程池模块类
和一般的线程类一样,它的工作时调用的函数是
virtual int svc (void);
只是如何使用呢。无非是开启线程与关闭线程两个操作。
在此类中定义一个ACE_Thread_Semaphore sem_;变量
然后开户n个线程的过程就是这样的:
int Proactor_Task::star(int nMax)
{
...
this->activate (THR_NEW_LWP, nMax);
for (;nMax>0;nMax--)
{
sem_.acquire();
}
return 0;
}
一个是创建,二个是一个一个的触发。让这一些线程都工作.
当然工作线程都要释放自己:
int Proactor_Task::svc()
{
ACE_DEBUG((LM_INFO,ACE_TEXT("svc函数调用!\n")));
sem_.release(1);
...
return 0;
}
好了。这个线程池开始工作了。接下来,我们要做将完成端口对象给创建出来:
在这个线程池里定义一个完成端口对象指针:
ACE_Proactor * proactor_;
创建的过程是这样的。
//是在Win32下,就使用这个Proactor的实现
ACE_WIN32_Proactor *proactor_impl = new ACE_WIN32_Proactor(); //新建proactor的实现
proactor_=new ACE_Proactor(proactor_impl,1); //与proactor关联
ACE_Proactor::instance (this->proactor_, 1); //将新建出来的proactor保存在静态框架里
如何删除呢。
ACE_Proactor::end_event_loop();
this->wait();
之后来写线程池里的函数
ACE_Proactor::run_event_loop();
只要写一句就OK了。
这就完成了一个完成端口对象的创建过程。我们只要做一下封装就OK了。
给它一个工作线程的大小。之后它就会自动的新建一个完成端口在ACE_Proactor::instance里。
接下来我们要做Acceptor与recv。
实计上ACE里已经为我们写好了。它们就是:
ACE_Asynch_Acceptor类ACE_Service_Handler类
class Accepte : public ACE_Asynch_Acceptor<Receive>
class Receive : public ACE_Service_Handler
这样一继承,工作就已经完成了。
如果我们想得到这一些网络事件的话,可以做一些继承就OK了。
他们内部调用的过程是这样的:
当有一个新的用户连接上来之后。
Accepte会有一个函数回调。
virtual HANDLER *make_handler (void);
这个函数里,我们必需写一个new Receive对象。
new完成之后Receive的open函数将会回调
open函数调用的时候,此接收对象的socket句柄就得到了。我们就在这个时候需要将一个
读、写的流梆定在其中。还有就是做一步异步的接收数据。这个如果你写过重叠方式的话就会
比较的了解,这也叫做异步的I/O的投递。等这个recv完成之后就会回调。
好了。这就算完成了。现在把代码贴出来。
我不知道如何做一个下载点。不好意思只有大家自己复制下来
我使用的VC6.0,ACE5.4.1的编程环境-
- // Accepte.h: interface for the Accepte class.
- //
- //////////////////////////////////////////////////////////////////////
- #if !defined(AFX_ACCEPTE_H__DCEC809D_E5D2_48D1_A8A7_C9FD3C4D7C15__INCLUDED_)
- #define AFX_ACCEPTE_H__DCEC809D_E5D2_48D1_A8A7_C9FD3C4D7C15__INCLUDED_
- #if _MSC_VER > 1000
- #pragma once
- #endif // _MSC_VER > 1000
- #include <ace/Asynch_Acceptor.h>
- #include "Receive.h"
- class Accepte : public ACE_Asynch_Acceptor<Receive>
- {
- public:
- Receive* make_handler (void);
- Accepte();
- virtual ~Accepte();
- };
- #endif // !defined(AFX_ACCEPTE_H__DCEC809D_E5D2_48D1_A8A7_C9FD3C4D7C15__INCLUDED_)
- // Accepte.cpp: implementation of the Accepte class.
- //
- //////////////////////////////////////////////////////////////////////
- #include "Accepte.h"
- //////////////////////////////////////////////////////////////////////
- // Construction/Destruction
- //////////////////////////////////////////////////////////////////////
- Accepte::Accepte()
- {
- }
- Accepte::~Accepte()
- {
- }
- Receive* Accepte::make_handler(void)
- {
- return new Receive();
- }
- // Proactor_Task.h: interface for the Proactor_Task class.
- //
- //////////////////////////////////////////////////////////////////////
- #if !defined(AFX_PROACTOR_TASK_H__12F37C95_9872_4923_89A2_5A59AE7AC1FD__INCLUDED_)
- #define AFX_PROACTOR_TASK_H__12F37C95_9872_4923_89A2_5A59AE7AC1FD__INCLUDED_
- #if _MSC_VER > 1000
- #pragma once
- #endif // _MSC_VER > 1000
- #include "ace\Task_T.h"
- #include "ace\Thread_Semaphore.h"
- #include "ace\Proactor.h"
- #include "ace\WIN32_Proactor.h"
- class Proactor_Task : public ACE_Task<ACE_MT_SYNCH>
- {
- public:
- Proactor_Task();
- virtual ~Proactor_Task();
- int star(int nMax);
- int stop();
- virtual int svc (void);
- int create_proactor();
- int release_proactor();
- ACE_Thread_Semaphore sem_;
- ACE_Proactor * proactor_;
- };
- #endif // !defined(AFX_PROACTOR_TASK_H__12F37C95_9872_4923_89A2_5A59AE7AC1FD__INCLUDED_)
- // Proactor_Task.cpp: implementation of the Proactor_Task class.
- //
- //////////////////////////////////////////////////////////////////////
- #include "Proactor_Task.h"
- //////////////////////////////////////////////////////////////////////
- // Construction/Destruction
- //////////////////////////////////////////////////////////////////////
- Proactor_Task::Proactor_Task()
- {
- }
- Proactor_Task::~Proactor_Task()
- {
- }
- int Proactor_Task::star(int nMax)
- {
- create_proactor();
- this->activate (THR_NEW_LWP, nMax);
- for (;nMax>0;nMax--)
- {
- sem_.acquire();
- }
- return 0;
- }
- int Proactor_Task::stop()
- {
- ACE_Proactor::end_event_loop();
- this->wait();
- return 0;
- }
- int Proactor_Task::release_proactor()
- {
- ACE_Proactor::close_singleton ();
- proactor_ = 0;
- return 0;
- }
- int Proactor_Task::create_proactor()
- {
- ACE_WIN32_Proactor *proactor_impl = 0;
- ACE_NEW_RETURN (proactor_impl,
- ACE_WIN32_Proactor,
- -1);
- // always delete implementation 1 , not !(proactor_impl == 0)
- ACE_NEW_RETURN (this->proactor_,
- ACE_Proactor (proactor_impl, 1 ),
- -1);
- // Set new singleton and delete it in close_singleton()
- ACE_Proactor::instance (this->proactor_, 1);
- return 0;
- }
- int Proactor_Task::svc()
- {
- ACE_DEBUG((LM_INFO,ACE_TEXT("svc函数调用!\n")));
- sem_.release(1);
- ACE_Proactor::run_event_loop();
- return 0;
- }
- // Receive.h: interface for the Receive class.
- //
- //////////////////////////////////////////////////////////////////////
- #if !defined(AFX_RECEIVE_H__0E7EF8C0_465F_4D9C_8A29_0C2A0F1EAFFE__INCLUDED_)
- #define AFX_RECEIVE_H__0E7EF8C0_465F_4D9C_8A29_0C2A0F1EAFFE__INCLUDED_
- #if _MSC_VER > 1000
- #pragma once
- #endif // _MSC_VER > 1000
- #include <ace/Asynch_io.h>
- #include <ace/Message_Block.h>
- #include <ace/Log_Msg.h>
- #include <ace/OS_Memory.h>
- class Receive : public ACE_Service_Handler
- {
- public:
- Receive();
- virtual ~Receive()
- {
- if (this->handle() != ACE_INVALID_HANDLE )
- {
- closesocket(SOCKET(this->handle()));
- }
- }
- virtual void open(ACE_HANDLE h,ACE_Message_Block& );
- virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result);
- virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result);
- private:
- ACE_Asynch_Write_Stream write_;
- ACE_Asynch_Read_Stream reader_;
- };
- #endif // !defined(AFX_RECEIVE_H__0E7EF8C0_465F_4D9C_8A29_0C2A0F1EAFFE__INCLUDED_)
- // Receive.cpp: implementation of the Receive class.
- //
- //////////////////////////////////////////////////////////////////////
- #include "Receive.h"
- //////////////////////////////////////////////////////////////////////
- // Construction/Destruction
- //////////////////////////////////////////////////////////////////////
- Receive::Receive()
- {
- }
- void Receive::open(ACE_HANDLE h,ACE_Message_Block& )
- {
- this->handle(h);
- if (this->write_.open(*this)!=0 ||
- this->reader_.open(*this) != 0 )
- {
- delete this;
- return ;
- }
- ACE_Message_Block *mb;
- ACE_NEW_NORETURN(mb,ACE_Message_Block(1024));
- if ( this->reader_.read(*mb,mb->space()) != 0)
- {
- ACE_ERROR((LM_ERROR,ACE_TEXT(" (%t) error information %p.")));
- mb->release();
- delete this;
- return;
- }
- }
- void Receive::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
- {
- ACE_Message_Block &mb = result.message_block();
- if ( !result.success() || result.bytes_transferred() == 0)
- {
- mb.release();
- delete this;
- }
- else
- {
- ACE_Message_Block* new_mb;
- ACE_NEW_NORETURN(new_mb,ACE_Message_Block(1024));
- this->reader_.read(*new_mb,new_mb->space());
- }
- return ;
- }
- void Receive::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
- {
- result.message_block().release();
- return ;
- }
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- //main.cpp
- #ifdef _DEBUG
- #pragma comment(lib,"aced")
- #else
- #pragma comment(lib,"ace")
- #endif
- #include <ace\ace.h>
- #include "Accepte.h"
- #include "Proactor_Task.h"
- int ACE_TMAIN(int ,char*[])
- {
- Proactor_Task task;
- task.star(3);
- Accepte accepte;
- accepte.open(ACE_INET_Addr (2222), 0, 1,ACE_DEFAULT_BACKLOG,1,ACE_Proactor::instance());
- int nExit=0;
- while (nExit==0)
- scanf("%d",&nExit);
- return 0;
- }
复制代码 |
|