找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 5860|回复: 3

创建一个可正常结束的Proactor服务器

[复制链接]
发表于 2008-9-10 22:37:33 | 显示全部楼层 |阅读模式
作者:Qinglan
这是APG上关于Proactor使用的例子,去掉了trace信息,做了一点小修改:

  1. #include <ace/Os_main.h>
  2. #include <ace/Asynch_Acceptor.h>
  3. #include <ace/Proactor.h>
  4. #define LISTEN_PORT 5222
  5. class HA_Proactive_Service : public ACE_Service_Handler
  6. {
  7. public:
  8.        ~HA_Proactive_Service ()
  9.        {
  10.               if (this->handle () != ACE_INVALID_HANDLE)
  11.                      ACE_OS::closesocket (this->handle ());
  12.        }
  13.        virtual void open (ACE_HANDLE h, ACE_Message_Block&)
  14.        {
  15.               this->handle (h);
  16.               if (this->reader_.open (*this) != 0 || this->writer_.open (*this) != 0   )
  17.               {
  18.                      delete this;
  19.                      return;
  20.               }
  21.               ACE_Message_Block *mb;
  22.               ACE_NEW_NORETURN (mb, ACE_Message_Block (1024));
  23.               if (this->reader_.read (*mb, mb->space ()) != 0)
  24.               {
  25.                      mb->release ();
  26.                      delete this;
  27.                      return;
  28.               }
  29.        }
  30.        virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
  31.        {
  32.               ACE_Message_Block &mb = result.message_block ();
  33.               if (!result.success () || result.bytes_transferred () == 0)
  34.               {
  35.                      mb.release ();
  36.                      delete this;
  37.               }
  38.               else
  39.               {
  40.                      ACE_DEBUG ((LM_DEBUG, ACE_TEXT("Received Data : %c\n"), *mb.rd_ptr()));
  41.                      mb.release();
  42.                      ACE_Message_Block *new_mb;
  43.                      ACE_NEW_NORETURN (new_mb, ACE_Message_Block (1024));
  44.                      this->reader_.read (*new_mb, new_mb->space ());
  45.               }
  46.        }
  47.        virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
  48.        { result.message_block ().release (); }
  49. private:
  50.        ACE_Asynch_Read_Stream reader_;
  51.        ACE_Asynch_Write_Stream writer_;
  52. };
  53. class HA_Proactive_Acceptor : public ACE_Asynch_Acceptor<HA_Proactive_Service>
  54. {
  55. };
  56. int ACE_TMAIN (int, ACE_TCHAR *[])
  57. {
  58.        ACE_INET_Addr listen_addr( LISTEN_PORT );
  59.        HA_Proactive_Acceptor aio_acceptor;
  60.        if (0 != aio_acceptor.open (listen_addr, 0, 0, 5, 1, 0, 0))
  61.               ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("acceptor open")), 1);
  62.        ACE_Proactor::instance ()->proactor_run_event_loop ();
  63.        return 0;
  64. }
复制代码

程序在5222端口上监听,对于每个已建立的连接,服务器将所有收到的数据打印到控制台上。这就是整个程序所实现的功能。
但是这个程序存在一个很大的问题:没有办法正常结束。当运行这个程序后,只能强制结束这个进程,因为程序没有提供从proactor事件循环中退出来的方法。
让程序能够正常退出是很有必要的,比如在进行内存泄漏的检测时,如果程序没有办法正常退出,那么内存泄漏的检测工作将很难进行。

为了实现能够正常关闭程序的目的,需要稍微修改一下上面的程序实现。
这次程序将会有两个线程在运行,一个是Proactor事件循环线程,另一个则为控制线程。在控制线程中,要能够关闭Proactor线程,然后将程序干净地退出。
得益于ACE强大的封装,所做的改动并不大:

  1. class ProactorThread : public ACE_Task_Base
  2. {
  3. public:
  4.        int open()
  5.        {
  6.               return this->activate();
  7.        }
  8.        int close()
  9.        {
  10.               ACE_Proactor::instance()->proactor_end_event_loop();
  11.               this->wait();
  12.               return 0;
  13.        }
  14.        virtual int svc()
  15.        {
  16.               ACE_INET_Addr listen_addr( LISTEN_PORT );
  17.               HA_Proactive_Acceptor aio_acceptor;
  18.               if (0 != aio_acceptor.open (listen_addr, 0, 0, 5, 1, 0, 0))
  19.                      ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("acceptor open")), 1);
  20.               ACE_Proactor::instance ()->proactor_run_event_loop ();
  21.               ACE_DEBUG ((LM_DEBUG, ACE_TEXT("Thread ended\n")));
  22.               return 0;
  23.        }
  24. };
  25. int ACE_TMAIN (int, ACE_TCHAR *[])
  26. {
  27.        ProactorThread thread;
  28.        thread.open();
  29.        int x;
  30.        std::cin >> x;
  31.        thread.close();
  32.        return 0;
  33. }
复制代码

仅仅只是添加了一个新的类,用于封装Proactor事件循环线程,另外修改了程序的主函数。这次程序将首先创建一个Proactor事件循环线程,然后主线程等待用户输入任意数后结束Proactor线程,在Proactor线程正常退出后主线程退出。

注意这里有两个要点:
1.当执行proactor_end_event_loop()之后,Proactor线程是从proactor_run_event_loop()处退出,所以会接着执行下面的代码,打印出” Thread ended”信息。
2.主线程必须使用wait()等待Proactor线程正常结束后再退出,否则如果主线程退出后,Proactor线程仍有一些资源释放工作未完成,那么就会存在资源未正确释放的问题。
 楼主| 发表于 2008-9-10 22:37:51 | 显示全部楼层
更深入的处理可以是这样的:

       int x;
       std::cin >> x;
改为注册成信号,用户按ctrl+c,向系统插入一个特殊的ACE_Message_Block的实例,各线程收到后,均正常结束,在ACE_Stream框架中,常用这种办法.
这处结束过程还可以把一些未处理的消息写入磁盘,下次启动后载入后接着处理.
发表于 2009-9-29 11:27:50 | 显示全部楼层
我把这个程序移植到MFC里面的时候,在ProactorThread中aio_acceptor.open的时候跳转不到HA_Proactive_Service中,为什么?
发表于 2009-9-29 22:37:54 | 显示全部楼层
错误是什么?
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-11-23 17:35 , Processed in 0.015870 second(s), 5 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表