找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 3654|回复: 5

新手搞Proactor出问题鸟~

[复制链接]
发表于 2008-8-4 09:28:10 | 显示全部楼层 |阅读模式
ClientHandler.h
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#ifndef CLIENT_HANDLER_H
#define CLIENT_HANDLER_H
#include <ace/OS.h>
#include <ace/Asynch_IO.h>
#include <ace/Atomic_Op.h>
#include <ace/Message_Block.h>
#include <ace/INet_Addr.h>
#include <ace/Recursive_Thread_Mutex.h>

class ClientAcceptor;

class ClientHandler : public ACE_Service_Handler
{
public:
ClientHandler(){}
~ClientHandler(){}
void Init();

virtual void open(ACE_HANDLE new_handle, ACE_Message_Block &message_block);
virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result);
virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result);
virtual void handle_time_out(const ACE_Time_Value &tv, const void *act = 0);
int initiate_read_stream();
int initiate_write_stream(ACE_Message_Block &mb, size_t numBytes);

private:
ACE_Asynch_Read_Stream mReader;
ACE_Asynch_Write_Stream mWriter;
ClientAcceptor*   mAcceptor;
char     mRemoteAddress[MAXHOSTNAMELEN];
u_short     mRemotePort;
ACE_Message_Block*  mReadMsgBlock;
time_t     mLastNetIO;
ACE_Recursive_Thread_Mutex mLock;
};
#endif

ClientHandler.cpp
///////////////////////////////////////////////////////////////////////////////////////////////////
#include <ace/CDR_Stream.h>
#include "ClientHandler.h"
#include "Packet.h"
#include "ClientManager.h"
#include "MsgProcessTask.h"


void ClientHandler::open(ACE_HANDLE new_handle, ACE_Message_Block &message_block)
{
mLastNetIO = ACE_OS::time(NULL);
mIOCount = 0;
if (mReader.open(*this, this->handle()) == -1)
{
  ACE_ERROR((LM_ERROR, ACE_TEXT("<%P|%t>ACE_Asynch_Read_Stream::open\n")));
}
else if (mWriter.open(*this, this->handle()) == -1)
{
  ACE_ERROR((LM_ERROR, ACE_TEXT("<%P|%t>ACE_Asynch_Write_Stream::open\n")));
}
else
{
  ACE_DEBUG((LM_DEBUG, ACE_TEXT("<%P|%t>ClientHandler::open, Remote ip is : %s\n"), mRemoteAddress));
  initiate_read_stream();
}
}
void ClientHandler::handle_read_stream(const ACE_Asynch_Read_Stream::Result &result)
{
size_t bytesRecved = result.bytes_transferred();
if (result.success() && (bytesRecved != 0))
{
  ACE_DEBUG((LM_DEBUG, ACE_TEXT("<%P|%t>ClientHandler::handle_read_stream complete : %d\n"), bytesRecved));
  if (mReadMsgBlock->length() < sizeof(PacketHeader)) //消息头尚未接收完全
  {
   mReader.read(*mReadMsgBlock, mReadMsgBlock->space());
   return;
  }
  PacketHeader *header = (PacketHeader *) (mReadMsgBlock->rd_ptr());
  ACE_Message_Block * mb = mReadMsgBlock->cont(); //cont()为数据信息
  if (!mb)
  {
   // 只是接收完长度信息
   ACE_NEW (mb, ACE_Message_Block(header->mClientHeader.mDataLength));
   mReadMsgBlock->cont (mb);
  }
  if (mb->length() == header->mClientHeader.mDataLength) // 数据已接收完,再继续接收下一个数据包
  {
   MsgProcessTaskSingleton::instance()->putq(mb);
   initiate_read_stream();
   return;
  }
  else // 否则继续接收该数据包
  {
   mReader.read (*mb, mb->space ());
  }
}
else
{

}
}
void ClientHandler::handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
{
}
int ClientHandler::initiate_read_stream()
{
ACE_Guard<ACE_Recursive_Thread_Mutex> locker(mLock);
//先接收包头
int headerSize = sizeof(PacketHeader);
ACE_HANDLE handle = this->handle();
mReadMsgBlock->copy((const char*)&handle, sizeof(ACE_HANDLE));
if (mReader.read(*mReadMsgBlock, mReadMsgBlock->space()) == -1)
{
  ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("<%P|%t>ACE_Asynch_Read_Stream::read")), -1);
}

mIOCount ++;
return 0;
}
int ClientHandler::initiate_write_stream(ACE_Message_Block &mb, size_t numBytes)
{
ACE_Guard<ACE_Recursive_Thread_Mutex> locker(mLock);
if (mWriter.write(mb, numBytes) == -1)
{
  ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("<%P|%t>ACE_Asynch_Write_Stream::write")), -1);
}
  return 0;
}
void ClientHandler::handle_time_out(const ACE_Time_Value &tv, const void *act)
{
ACE_DEBUG((LM_DEBUG, ACE_TEXT("<%P|%t>ClientHandler::handle_time_out")));
}

感觉handle_read_stream有问题,客户端循环发100次消息,服务器端只收到4,5次,而且客户端关闭了handle_read_stream也不会断点,请高人指点~
 楼主| 发表于 2008-8-4 09:31:35 | 显示全部楼层

EventLoopTask.h

#ifndef EVENT_LOOP_TASK_H
#define EVENT_LOOP_TASK_H

#include <ace/Task.h>

class EventLoopTask : public ACE_Task<ACE_MT_SYNCH>
{
public:
        int Open(int numThreads);{
                    return activate(THR_NEW_LWP, numThreads);
                }

        void Close() {
            ACE_Proactor::instance()->proactor_end_event_loop();
                    wait();
                }

        virtual int svc() {
            while(1)
            {
                        ACE_Proactor::instance()->proactor_run_event_loop();
            }
            return 0;
                }
};
#endi
 楼主| 发表于 2008-8-4 09:34:23 | 显示全部楼层

MsgProcessTask.h

#ifndef NET_PROCESS_TASK_H
#define NET_PROCESS_TASK_H

#include <ace/Task.h>
#include <ace/Singleton.h>

class MsgProcessTask : public ACE_Task<ACE_MT_SYNCH>
{
public:
        int Open(int numThreads) {
            mNumThreads = numThreads;
            return activate(THR_NEW_LWP, numThreads);
        }
        void Close() {
            ACE_Message_Block* hangup;
            for (int i=0; i<mNumThreads; i++)
            {
                ACE_NEW_NORETURN(hangup, ACE_Message_Block(0, ACE_Message_Block::MB_HANGUP));
                putq(hangup);
            }
        }


        virtual int svc() {
           ACE_Message_Block* msgBlock;
           while (getq(msgBlock) != -1) //如果没有消息会阻塞在这里
           {
                if (msgBlock->msg_type() == ACE_Message_Block::MB_HANGUP)
               {
                ACE_DEBUG((LM_DEBUG, ACE_TEXT("<%P|%t>MsgProcessTask.thread hangup\n")));
                msgBlock->release();
                break;
               }
               else
               {
                //TODO: 处理消息
                ACE_HEX_DUMP((LM_TRACE, msgBlock->base(), msgBlock->length()));
                msgBlock->release();
               }
            }
            return 0;
        }
private:
        int mNumThreads;
};

typedef ACE_Singleton<MsgProcessTask, ACE_Null_Mutex> MsgProcessTaskSingleton;


#endif

[ 本帖最后由 glen_dai 于 2008-8-4 09:39 编辑 ]
 楼主| 发表于 2008-8-4 09:39:03 | 显示全部楼层

main.cpp

int ACE_TMAIN (int, ACE_TCHAR *[])
{
        ServerConfigSingleton::instance()->Init();

        ClientAcceptor acceptor;
        acceptor.Start(ServerConfigSingleton::instance()->GetServerPort());

        MsgProcessTaskSingleton::instance()->Open(ServerConfigSingleton::instance()->GetNumThreads());
        EventLoopTask evtLoopTask;
        evtLoopTask.Open(ServerConfigSingleton::instance()->GetNumThreads());

        char c;
        cout << "Press any key to exit" << endl;
        cin >> c;

        evtLoopTask.Close();
        MsgProcessTaskSingleton::instance()->Close();
        return 0;
}

省略了部分代码,我想问题应该在我发的这些代码里面
发表于 2008-8-4 12:39:54 | 显示全部楼层
感觉handle_read_stream有问题,客户端循环发100次消息,服务器端只收到4,5次,而且客户端关闭了handle_read_stream也不会断点,请高人指点~

----------------------
数据是否完整?数据完整的话,就是正确的。客户端发送的次数和服务器端收取的次数,不对等很正常啊,因为TCP是流协议,会有流控措施。 客户端关闭,服务器端一般是不会立刻知道的。这是TCP/IP的设计而已。加心跳支持即可。
 楼主| 发表于 2008-8-5 10:02:52 | 显示全部楼层
谢谢楼上,数据是完整的,我没有正确处理。
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-12-23 18:06 , Processed in 0.075012 second(s), 6 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表