找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 4050|回复: 7

关于完成端口的问题,请各位高人们指点一下

[复制链接]
发表于 2009-3-9 10:00:17 | 显示全部楼层 |阅读模式
我在网上看到一些关于完成端口的例子,如下:
首先是 ServerHandler.h
  1. #ifndef _SERVER_HANDER_H_
  2. #define _SERVER_HANDER_H_
  3. #pragma once
  4. #include "ace/Proactor.h"
  5. #include "ace/Asynch_IO.h"
  6. #include "ace/message_block.h"
  7. class ServerHander :public ACE_Service_Handler
  8. {
  9. public:
  10.         ServerHander(void);
  11.         virtual ~ServerHander(void);
  12.         static void SetSleepTime(const DWORD t)
  13.         {
  14.                 m_sleepTime = t;
  15.         };
  16.         virtual void open(ACE_HANDLE h, ACE_Message_Block& _mb);
  17. protected:
  18.         virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result);
  19.         virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result);
  20.         virtual void  handle_time_out(const ACE_Time_Value &tv, const void *p);
  21.         void DisplayInfo(ACE_HANDLE h, char* str) const;
  22. private:
  23.         ACE_Asynch_Read_Stream        m_reader;
  24.         ACE_Asynch_Write_Stream m_writer;
  25.         static DWORD                        m_sleepTime;
  26.         time_t                                        m_lastTime;
  27. };
  28. #endif
  29. ----------------------------------------------------------------------------------------
  30. -------------------------------------------
  31. ServerHander.cpp
  32. #include "StdAfx.h"
  33. #include "ServerHander.h"
  34. #include "ace/OS_NS_sys_socket.h"
  35. #include "ace/INET_Addr.h"
  36. #include "ace/SOCK_SEQPACK_Association.h"
  37. #include "ace/OS.h"
  38. #define TIME_OUT 10
  39. DWORD ServerHander::m_sleepTime = 0;
  40. ServerHander::ServerHander(void):m_lastTime(0)
  41. {
  42. }
  43. ServerHander::~ServerHander(void)
  44. {
  45.         //关闭
  46.         if (this->handle() != ace_invalid_handle)               
  47.         {
  48.                 //显示客户端连接地址和端口
  49.                 displayinfo(this->handle(), " disconnected.");
  50.                 ace_proactor::instance()->cancel_timer(*this,1);
  51.                 ace_os::shutdown(this->handle(), sd_both);
  52.                 ace_os::closesocket( this->handle() );
  53.                 this->handle(ace_invalid_handle);
  54.         }
  55. }
  56. //客户端连接
  57. void ServerHander::open(ACE_HANDLE h,
  58.                                                 ACE_Message_Block& _mb)
  59. {
  60.         this->handle(h);
  61.         //记录时间
  62.         m_lastTime = ACE_OS::time(NULL);
  63.         //ACE_Proactor::instance()->schedule_timer(*this, 0, ACE_Time_Value(0),
  64. ACE_Time_Value(TIME_OUT));
  65.         //构造I/O流
  66.         if( this->m_reader.open(*this) != 0 || this->m_writer.open(*this) != 0 )
  67.         {
  68.                 cout<<"m_reader or m_writer open failed..."<<endl;
  69.                 delete this;
  70.                 return;
  71.         }
  72.         //显示客户端连接地址和端口        
  73.         DisplayInfo(this->handle(), " connected.");
  74.         ACE_Message_Block* mb = NULL;
  75.         ACE_NEW_NORETURN(mb, ACE_Message_Block(1024));        
  76.         //发起读操作
  77.         if( this->m_reader.read( *mb, mb->space() ) != 0 )
  78.         {
  79.                 cout<<"m_reader read failed..."<<endl;
  80.                 mb->release();
  81.                 delete this;
  82.         }
  83. }
  84. //读操作完成
  85. void ServerHander::handle_read_stream(const ACE_Asynch_Read_Stream::Result &result)
  86. {
  87.         //记录时间
  88.         m_lastTime = ACE_OS::time(NULL);
  89.         ACE_Message_Block &mb = result.message_block();
  90.         //传输不成功
  91.         if ( (!result.success()) || (result.bytes_transferred() == 0) )
  92.         {
  93.                 cout<<"Read failed..."<<endl;
  94.                 mb.release();
  95.                 delete this;  
  96.         }
  97.         else        //接收完成
  98.         {
  99.                 //等待 模拟过载导致的响应速度变慢
  100.                 //Sleep( m_sleepTime );
  101.                /////////////////////////////////////////
  102.                ///////////////////////////////////////
  103.                //加载上自己所需要的类
  104.               ///////////////////////////////////////////
  105.               ////////////////////////////////////////
  106.                 //写回
  107.                 //mb.wr_ptr(0);
  108.                 //mb.wr_ptr()[-2] = 0x03;
  109.                 if (this->m_writer.write( mb, mb.length() ) == -1)
  110.                 {
  111.                         cout<<"Server write failed..."<<endl;
  112.                         mb.release();
  113.                 }
  114.                
  115.                 else        //写回成功,再继续读下一组数据
  116.                 {
  117.                         ACE_Message_Block *new_mb = NULL;
  118.                         ACE_NEW_NORETURN(new_mb, ACE_Message_Block(1024));
  119.                         this->m_reader.read(*new_mb, new_mb->space());
  120.                         cout<<"Read again."<<endl;
  121.                 }
  122.         }
  123. }
  124. //写操作完成
  125. void ServerHander::handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
  126. {
  127.         cout<<"Write completed."<<endl;
  128.         //释放消息
  129.         result.message_block().release();
  130. }
  131. //超时
  132. void ServerHander::handle_time_out(const ACE_Time_Value &tv,
  133.                                                                         const void *p)
  134. {
  135.         time_t curTime = ACE_OS::time(NULL);
  136.         if( curTime - m_lastTime > TIME_OUT )
  137.         {
  138.                 cout<<"TimeOut"<<endl;
  139.                 delete this;
  140.         }
  141. }
  142. //显示信息
  143. void ServerHander::DisplayInfo(ACE_HANDLE h,
  144.                                                            char* str) const
  145. {
  146.         //获取客户端连接地址和端口
  147.         ACE_INET_Addr addr;
  148.         ACE_SOCK_SEQPACK_Association ass = ACE_SOCK_SEQPACK_Association(h);
  149.         size_t addr_size=1;
  150.         ass.get_remote_addr(addr);
  151.         cout<< addr.get_host_addr() <<":"<< addr.get_port_number() <<str<<endl;
  152. }
复制代码

[ 本帖最后由 winston 于 2007-12-19 12:20 编辑 ]
2007-12-19 11:30 rosebush
我将echoserver的代码贴出供大家研究测试2

多线程的proactor部分
  1. proactorTask.h
  2. #ifndef _CPROACTOR_TASK_H_
  3. #define _CPROACTOR_TASK_H_
  4. #pragma once
  5. #include "ace\Task_T.h"
  6. #include "ace\Thread_Semaphore.h"
  7. #include "ace\Proactor.h"
  8. #include "ace\WIN32_Proactor.h"
  9. class CProactorTask :public ACE_Task<ACE_MT_SYNCH>
  10. {
  11. public:
  12.         CProactorTask(void);
  13.         virtual ~CProactorTask(void);
  14.         int Start(const int nMax);
  15.         int Stop(void);
  16.         int Create(void);
  17.         int Release(void);
  18.         virtual int svc(void);
  19. protected:
  20.         ACE_Thread_Semaphore        m_sem;                                //信号量
  21.         ACE_Proactor*                        m_pProactor;                //完成端口对象指
  22. };
  23. #endif
  24. ----------------------------------------------------------------------------------------
  25. -----------
  26. cpp部分
  27. #include "StdAfx.h"
  28. #include "ProactorTask.h"
  29. CProactorTask::CProactorTask(void)
  30. {
  31. }
  32. CProactorTask::~CProactorTask(void)
  33. {
  34. }
  35. //
  36. //创建完成端口对象
  37. //
  38. int CProactorTask::Create(void)
  39. {
  40.         ACE_WIN32_Proactor *proactor_impl = 0;
  41.         //新建
  42.         ACE_NEW_RETURN(proactor_impl, ACE_WIN32_Proactor, -1);
  43.         //关联
  44.         ACE_NEW_RETURN(this->m_pProactor, ACE_Proactor(proactor_impl, 1 ), -1);
  45.         //保存
  46.         ACE_Proactor::instance(this->m_pProactor, 1);
  47.         return 0;
  48. }
  49. //
  50. //启动线程池
  51. //
  52. int CProactorTask::Start(const int nMax)        //线程数量
  53. {
  54.         //创建完成端口对象
  55.         Create();
  56.         //创建线程
  57.         this->activate(THR_NEW_LWP, nMax);
  58.         int i;
  59.         //保证所有线程已启动
  60.         for(i = nMax; i>0; i--)
  61.         {
  62.                 m_sem.acquire();        //Block the thread until the semaphore count
  63. becomes greater than 0, then decrement it.
  64.         }
  65.         cout<<"Start."<<endl;
  66.         return 0;
  67. }
  68. //
  69. //删除线程池
  70. //
  71. int CProactorTask::Stop(void)
  72. {
  73.         ACE_Proactor::end_event_loop();
  74.         this->wait();
  75.         return 0;
  76. }
  77. //
  78. //每个线程调用
  79. //
  80. int CProactorTask::svc(void)
  81. {
  82.         ACE_DEBUG((LM_INFO,ACE_TEXT("svc函数调用!\n")));
  83.         //Increment the semaphore by 1
  84.         m_sem.release(1);
  85.         ACE_Proactor::run_event_loop();
  86.         return 0;
  87. }
  88. //
  89. //释放
  90. //
  91. int CProactorTask::Release(void)
  92. {
  93.         ACE_Proactor::close_singleton();
  94.         m_pProactor = 0;
  95.         cout<<"Release."<<endl;
  96.         return 0;
  97. }
  98. ----------------------------------------------------------------------------------------
  99. ----------------
  100. main.cpp部分
  101. // EchoServer.cpp : Defines the entry point for the console application.
  102. //
  103. // 2007.12.6
  104. // Echo Server Proactor模式
  105. #include "Winbase.h"
  106. #include "stdafx.h"
  107. #include "ace/INET_Addr.h"
  108. #include "ace/Asynch_Acceptor.h"
  109. #include "ServerHander.h"
  110. #include "ProactorTask.h"
  111. int _tmain(int argc, _TCHAR* argv[])
  112. {
  113.         cout<<"******* Echo Server *******"<<endl<<endl;
  114.         //获得CPU数量
  115.         SYSTEM_INFO sysInfo;
  116.         GetSystemInfo(&sysInfo);
  117.         int threadNum = sysInfo.dwNumberOfProcessors<<1;        // CPU * 2
  118.         //开启线程
  119.         CProactorTask task;
  120.         task.Start( threadNum );
  121.         ACE_Asynch_Acceptor<ServerHander> MyAcceptor;
  122.         ACE_INET_Addr addr(5050);
  123.         if(MyAcceptor.open(addr) == -1)
  124.         {
  125.                 cout<<"acceptor open failed..."<<endl;
  126.                 return 1;
  127.         }
  128.         cout<<"Listening on "<< addr.get_port_number() <<"..."<<endl;
  129.         DWORD sleepTime = 0;
  130.         while(1)
  131.         {
  132.                 cin>>sleepTime;
  133.                 ServerHander::SetSleepTime(sleepTime);
  134.                 cout<<"********** Set sleep time to "<<sleepTime<<" ************"<<endl;
  135.         }
  136.         return 0;
  137. }
复制代码


遇到了以下几个问题,请高人们指点一下:
1 , 在 "加载上自己所需要的类“ 的地方加载上我需要用的内容,是不是就可以达到并发的效果了

,还是在其他的地方加载呢?

2,当在 "加载上自己所需要的类" 地方写上个循环作为替代如下:
                for (int i=0;i<= 99999;i++)
                {
                        for (int j=0;j<= 50000;j++)
                        {
                               
                                if ( (i == 55500) && (j  == 1110))
                                {
                                        cout<<"la la la ..."<<endl;
                                }
                        }
                }
                cout<<"finish"<<endl;

当在服务器端启用多个线程时,当有多个客户端发出请求时:
我会发现 :
当有第一个进入 handle_read_stream 后,就开始执行我的循环去了,其他的请求还有open里,而无

法得到响应,不是按照时间片原则,应该会得到响应吗?这是为什么呢? 而我要在 我的循环前面加

个 sleep 等几个请求同时进入了 handle_read_stream 后,他们几个到是并行工作的,这又是为什么

呢? 总之希望在什么时候都能达到并行工作的效果以达到并发,请高人指点一下~~~

3, 在 handle_time_out 里,如果超时就delete this他,因为不在同一个线程中,可能会造成崩溃,

有没有什么解决办法呢?

谢谢各位高人给我一些指点~~~~
发表于 2009-3-9 10:39:36 | 显示全部楼层
1、并发是Proactor提供的。ServerHander 只是数据的处理,不管并发。
2、写个死循环,占满了CPU,OS怎么给你切换?
3、这个问题是因为你同步工作没做好。做好同步,就不会出任何问题。
 楼主| 发表于 2009-3-9 10:56:50 | 显示全部楼层

回复 #2 winston 的帖子

那请问您我该把我所需要用的类放到哪里呢?
怎么做才能达到并发的效果呢?
能不能给个例子呢,谢谢~~~
 楼主| 发表于 2009-3-9 13:22:08 | 显示全部楼层
请高人们赐教一下~~~
发表于 2009-3-9 13:47:05 | 显示全部楼层
是这样的,在你标记的地方加载你的处理类,当然不是一个good idea啦,
如版主所说,你把CPU都占满了,完成端口上哪调度去。

一般的做法是,开一个task的线程池,在你标记的地方,把你从网络接受过来的
数据包扔到线程队列里去处理,在线程队列内部,加载上你自己实现的报文解析类,处理类等等
这样才可以达到并发的效果。

delete崩溃的问题肯定是你自己没有使用正确的加锁保护,
这个得根据实际情况具体分析了。
发表于 2009-3-9 15:06:47 | 显示全部楼层
modern 说了我想表达的一切,正解!
 楼主| 发表于 2009-3-10 09:33:52 | 显示全部楼层

回复 #5 modern 的帖子

谢谢高人指点,回都研究一下,再向您们讨教~~~
发表于 2009-3-10 09:36:39 | 显示全部楼层

新手上路

望大家多多指导
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-11-23 03:32 , Processed in 0.012887 second(s), 6 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表