找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 4940|回复: 2

Client用iovec填充数据发送成功。Server端recv_n接收数据错误

[复制链接]
发表于 2009-12-16 00:53:07 | 显示全部楼层 |阅读模式
用vc.net编译的。用下面的客户端发送数据,客户端int LoggingClient::send(const ACE_Log_Record &logRecord)函数里
      iovec iov[2];
      iov[0].iov_base = header.begin()->rd_ptr();
      iov[0].iov_len  = 8;
      iov[1].iov_base = payLoad.begin()->rd_ptr();
      iov[1].iov_len  = length;
      return loggingPeer_.send_n(iov, 2)填充数据正确
发送数据也成功,而下面的服务端代码总是被阻塞在LoggingHandler类中的recvLogRecord函数中的if ( peer_.recv_n(payload->wr_ptr(), 8) == 8 )处,显示接收到的数据不正确。但是我换成如下的接收方式

iovec iov[2];
peer_.recv_n(iov, 2);
接收到的数据内容还是不正确。为什么呢?

请高手们帮忙解答,谢谢!

客户端,服务端代码如下,谢谢指教,也找不到询问的朋友,愿大家指导?
服务端代码如下:
#include <ace\Event_Handler.h>
#include <ace\INET_Addr.h>
#include <ace\Reactor.h>
#include <ace\SOCK_Acceptor.h>
#include <ace\SOCK_Stream.h>
#include <ace\Log_Record.h>
#include <ace\Log_Msg.h>
#include <ace\Message_Block.h>
#include <ace\OS.h>
#include <ace\FILE.h>
#include <ace\FILE_IO.h>
#include <ace\CDR_Base.h>
#include <iostream>
#include <ace\CDR_Stream.h>
#include <ace\FILE_Connector.h>
#include <ace\Thread_Manager.h>
using namespace std;
#define PORT 20000
class LoggingHandler
{
public:
LoggingHandler( ACE_FILE_IO &logFile )
  :logFile_(logFile)
{
}

LoggingHandler( ACE_FILE_IO &logFile, ACE_HANDLE handle )
  :logFile_(logFile)
{
     peer_.set_handle(handle);
}

LoggingHandler( ACE_FILE_IO &logFile, const ACE_SOCK_Stream &loggingPeer )
  :logFile_(logFile), peer_(loggingPeer)
{
}

int close()
{
     return peer_.close();
}

int recvLogRecord( ACE_Message_Block *&mb );
int writeLogRecord( ACE_Message_Block *mb );
int logRecrod();

ACE_SOCK_Stream &peer()
{
     return peer_;
}

private:
// Reference to a log file
ACE_FILE_IO &logFile_;
// Connect to the client
ACE_SOCK_Stream peer_;
};

int LoggingHandler::recvLogRecord( ACE_Message_Block *&mb )
{
     ACE_INET_Addr remoteAddr;
     peer_.get_remote_addr(remoteAddr);
     mb = new ACE_Message_Block(MAXHOSTNAMELEN + 1);

     remoteAddr.get_host_name( mb->wr_ptr(), MAXHOSTNAMELEN );
     mb->wr_ptr(strlen(mb->wr_ptr()) + 1);

     ACE_Message_Block *payload = new ACE_Message_Block(ACE_DEFAULT_CDR_BUFSIZE);
     ACE_CDR::mb_align(payload);

     // 用iovec当缓冲区去接收数据结果还是不对,这是为什么?
   // 底下的客户端代码正确,调试发现数据填充也正确。
   // iovec iov[2];
   // peer_.recv_n(iov, 2);

    if ( peer_.recv_n(payload->wr_ptr(), 8) == 8 )
     {
           payload->wr_ptr(8);
           ACE_InputCDR cdr(payload);
           ACE_CDR::Boolean byteOrder;
           cdr>>ACE_InputCDR::to_boolean(byteOrder);
           ACE_CDR::ULong length = 0;
           cdr>>length;
           payload->size(length + 8 + ACE_CDR::MAX_ALIGNMENT);

          if ( peer_.recv_n(payload->wr_ptr(), length) > 0 )
          {
               payload->wr_ptr(length);
               mb->cont(payload);
               return length;
           }
     }

     payload->release();
     mb->release();
     payload = NULL;
     mb = NULL;

     return -1;
}


int LoggingHandler::writeLogRecord( ACE_Message_Block *mb )
{
     if( logFile_.send_n(mb) == -1 )
     {
         return -1;
     }
     if (ACE::debug())
    {
        ACE_InputCDR cdr(mb->cont());
        ACE_CDR::Boolean byteOrder;
        ACE_CDR::ULong len;
        cdr >> ACE_InputCDR::to_boolean(byteOrder);
        cdr.reset_byte_order(byteOrder);
        cdr >> len;
        ACE_Log_Record log_recrod;
        cdr >> log_recrod;
        log_recrod.print(mb->rd_ptr(), 1, cerr);
     }

     return mb->total_length();
}

int LoggingHandler::logRecrod()
{
     ACE_Message_Block *mb = 0;
     if (recvLogRecord(mb) == -1)
     {
          return -1;
      }
      else
      {
          int result = writeLogRecord(mb);
          mb->release();
          return result == -1 ? -1 : 0;
       }
}

class LoggingEventHandler : public ACE_Event_Handler
{
public:
// Initialize the base class and logging handler
LoggingEventHandler(ACE_Reactor *r)
  :ACE_Event_Handler(r), loggingHandler_(logFile_)
{
}

virtual ~LoggingEventHandler()
{
  // No-op destructor
}

// Activate the object
virtual int open();
// Called by reactor when logging events arrive
virtual int handle_input(ACE_HANDLE fd = ACE_INVALID_HANDLE);

// Called when this object was destroyed
virtual int handle_close(ACE_HANDLE handle = ACE_INVALID_HANDLE, ACE_Reactor_Mask closeMask = 0);

// Return socket handle of the contained<Logging_Handler>
virtual ACE_HANDLE get_handle()const
{
      LoggingHandler &h = const_cast<LoggingHandler &>(loggingHandler_);//.peer().get_handle();
      return h.peer().get_handle();
}

// Get a reference to the contained<ACE_SOCK_Stream>
ACE_SOCK_Stream &peer()
{
      return loggingHandler_.peer();
}

// Return a reference to the <logFile>
const ACE_FILE_IO &logFile()const
{
      return logFile_;
}

private:
     // File where log records are written.
     ACE_FILE_IO logFile_;
     // Connection to remote peer.
     LoggingHandler loggingHandler_;
};

int LoggingEventHandler::open()
{
     static const char LOGFILE_SUBFIX[] = ".log";
     char fileName[MAXHOSTNAMELEN+sizeof(LOGFILE_SUBFIX)];
     ACE_INET_Addr remoteAddr;
     loggingHandler_.peer().get_remote_addr(remoteAddr);
     remoteAddr.get_host_name(fileName, MAXHOSTNAMELEN);
     ACE_OS::strcat(fileName, LOGFILE_SUBFIX);
     ACE_DEBUG((LM_DEBUG, "Create a log file %s\n", fileName));
     ACE_FILE_Connector connector;
     connector.connect(logFile_, ACE_FILE_Addr(fileName), 0, ACE_Addr::sap_any, 0, O_RDWR|O_CREAT|O_APPEND, ACE_DEFAULT_FILE_PERMS);

     return reactor()->register_handler(this, ACE_Event_Handler::READ_MASK);
}

int LoggingEventHandler::handle_input(ACE_HANDLE handle)
{
     if (loggingHandler_.logRecrod() == -1)
     {
           ACE_DEBUG((LM_DEBUG, "Write log record failed!\n"));
           return -1;
     }
     else
     {
         ACE_DEBUG((LM_DEBUG, "Write log record succeed!\n"));
         return 0;
     }
     return 0;
}
int LoggingEventHandler::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask closeMask)
{
     loggingHandler_.close();
     logFile_.close();
     delete this;
     return 0;
}

class LoggingAcceptor : public ACE_Event_Handler
{
public:
// constructor
LoggingAcceptor(ACE_Reactor *r = ACE_Reactor::instance())
  :ACE_Event_Handler(r)
{
}

// Initialization method
virtual int open(const ACE_INET_Addr &localAddr);
virtual int handle_input(ACE_HANDLE fd = ACE_INVALID_HANDLE);
virtual int handle_close(ACE_HANDLE fd = ACE_INVALID_HANDLE, ACE_Reactor_Mask closeMask = 0);

virtual ACE_HANDLE get_handle()const
{
      return acceptor_.get_handle();
}

ACE_SOCK_Acceptor &acceptor()
{
      return acceptor_;
}

protected:
~LoggingAcceptor()
{
  // No-op destructor
}

private:
     ACE_SOCK_Acceptor acceptor_;
};

int LoggingAcceptor::open(const ACE_INET_Addr &localAddr)
{
     if ( acceptor_.open(localAddr) == -1 )
     {
         ACE_DEBUG((LM_DEBUG, "Open acceptor failed\n"));
         return -1;
     }

     ACE_DEBUG((LM_DEBUG, "Open the server to accept the connectors!\n"));
     return reactor()->register_handler(this, ACE_Event_Handler::ACCEPT_MASK);
}

int LoggingAcceptor::handle_input(ACE_HANDLE handle)
{
     LoggingEventHandler *peerHandle = NULL;
     ACE_NEW_RETURN(peerHandle, LoggingEventHandler(reactor()), -1);

     if ( acceptor_.accept(peerHandle->peer()) == -1 )
     {
         ACE_DEBUG((LM_DEBUG, "Accept connect failed\n"));
         delete peerHandle;
         return -1;
     }
     ACE_DEBUG((LM_DEBUG, "Established a connect\n"));
     if (peerHandle->open() == -1)
     {
          ACE_DEBUG((LM_DEBUG, "Activate LoggingEventHandler object failed\n"));
          peerHandle->handle_close();
          return -1;
      }
      return 0;
}
int LoggingAcceptor::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask closeMask)
{
     ACE_DEBUG((LM_DEBUG, "Close LoggingAcceptor\n"));
     acceptor_.close();
     delete this;
     return 0;
}

template<class ACCEPTOR>
class ReactorLoggingServer : public ACCEPTOR
{
public:
     ReactorLoggingServer(ACE_Reactor *reactor);
};

template<class ACCEPTOR>
ReactorLoggingServer<ACCEPTOR>::ReactorLoggingServer(ACE_Reactor *reactor)
:ACCEPTOR(reactor)
{
     int result;
     ACE_INET_Addr serverAddr;
     result = serverAddr.set(PORT);
     if ( result != -1 )
     {
         result = ACCEPTOR::open(serverAddr);
     }
     if ( result == -1 )
     {
         reactor->end_reactor_event_loop();
     }
}

typedef ReactorLoggingServer<LoggingAcceptor>ServerDemon;

int main( int argc, char *argv[] )
{
     ACE_DEBUG((LM_DEBUG, "Server[%P] Starting\n"));
     ACE_Reactor reactor;
     ServerDemon *server = NULL;
     ACE_NEW_RETURN(server, ServerDemon(&reactor), -1);
     ACE_DEBUG((LM_DEBUG, "Looping the event...\n"));

     if (reactor.run_reactor_event_loop() == -1)
     {
         ACE_ERROR_RETURN((LM_ERROR, "%p\n", "run_reactor_event_loop()"), 1);
     }

     return 0;
}

客户端代码如下:
#include <ace\OS.h>
#include <ace\CDR_Stream.h>
#include <ace\INET_Addr.h>
#include <ace\SOCK_Connector.h>
#include <ace\SOCK_Stream.h>
#include <ace\Log_Record.h>
#include <ace\streams.h>
#include <ace\Log_Msg.h>
#include <string>
#define PORT   20000
#define SERVER "127.0.0.1"

class LoggingClient
{
public:
// Send <Log_Record> to the server
int send(const ACE_Log_Record &logRecord);

// Access method
ACE_SOCK_Stream &peer()
{
      return loggingPeer_;
}
// Close the connection to the server
~LoggingClient()
{
      loggingPeer_.close();
}
private:
     ACE_SOCK_Stream loggingPeer_;
};

int LoggingClient::send(const ACE_Log_Record &logRecord)
{
    const size_t maxPayloadSize =
       4                             // type
      +8                            // timestamp
      +4                            // process id
      +4                            // data length
      +ACE_Log_Record::MAXLOGMSGLEN // data
      +ACE_CDR::MAX_ALIGNMENT;         // padding


       ACE_OutputCDR payLoad(maxPayloadSize);
       payLoad << logRecord;
       ACE_CDR::ULong length = payLoad.total_length();

      ACE_OutputCDR header(ACE_CDR::MAX_ALIGNMENT + 8);
      header << ACE_OutputCDR::from_boolean(ACE_CDR_BYTE_ORDER);
      header << length;

      iovec iov[2];
      iov[0].iov_base = header.begin()->rd_ptr();
      iov[0].iov_len  = 8;
      iov[1].iov_base = payLoad.begin()->rd_ptr();
      iov[1].iov_len  = length;
      return loggingPeer_.send_n(iov, 2);
}


int main(int argc, char *argv[])
{
     ACE_INET_Addr serverAddr(PORT, SERVER);
     ACE_SOCK_Connector connector;
     LoggingClient logClient;
     ACE_Time_Value t(10,100);
     if ( connector.connect(logClient.peer(), serverAddr, &t) < 0 )
     {
         ACE_DEBUG((LM_DEBUG, "Connect to %s failed\n", SERVER));
         return 0;
     }

     ACE_DEBUG((LM_DEBUG, "Connect to %s succeed\n", SERVER));
     // Limit the number of characters read on each record
     cin.width(ACE_Log_Record::MAXLOGMSGLEN );

     while (1)
     {
         std::cout<<"get data from client(exit):";
         std::string userInput;
         getline(cin, userInput, '\n');
         if ( userInput.compare("exit") == 0 )
         {
              break;
         }

         ACE_Time_Value now(ACE_OS::gettimeofday());
         ACE_Log_Record logRecord(LM_INFO, now, ACE_OS::getpid());
         logRecord.msg_data(userInput.c_str());
         if ( logClient.send(logRecord) <= 0 )
         {
             ACE_DEBUG((LM_DEBUG, "Send record to %s failed\n", SERVER));
             break;
         }
         ACE_DEBUG((LM_DEBUG, "Send record to %s succeed\n", SERVER));
         ACE_OS::sleep(1);
     }

     return 0;
}

[ 本帖最后由 pengxiqin 于 2009-12-16 23:33 编辑 ]
 楼主| 发表于 2009-12-16 18:50:11 | 显示全部楼层
哪位大侠帮我解答下吧,小弟真的找不到解答的人了,谢谢!!!!!!!!
发表于 2009-12-16 20:01:12 | 显示全部楼层
这类问题,可以用抓包、输出的办法来解决。
阻塞到这里面,是因为要收取的数据数量收不到。
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-12-23 13:10 , Processed in 0.446726 second(s), 6 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表