pengxiqin 发表于 2009-12-16 00:53:07

Client用iovec填充数据发送成功。Server端recv_n接收数据错误

用vc.net编译的。用下面的客户端发送数据,客户端int LoggingClient::send(const ACE_Log_Record &logRecord)函数里
      iovec iov;
      iov.iov_base = header.begin()->rd_ptr();
      iov.iov_len= 8;
      iov.iov_base = payLoad.begin()->rd_ptr();
      iov.iov_len= length;
      return loggingPeer_.send_n(iov, 2)填充数据正确,发送数据也成功,而下面的服务端代码总是被阻塞在LoggingHandler类中的recvLogRecord函数中的if ( peer_.recv_n(payload->wr_ptr(), 8) == 8 )处,显示接收到的数据不正确。但是我换成如下的接收方式
iovec iov;
peer_.recv_n(iov, 2);
接收到的数据内容还是不正确。为什么呢?

请高手们帮忙解答,谢谢!

客户端,服务端代码如下,谢谢指教,也找不到询问的朋友,愿大家指导?
服务端代码如下:
#include <ace\Event_Handler.h>
#include <ace\INET_Addr.h>
#include <ace\Reactor.h>
#include <ace\SOCK_Acceptor.h>
#include <ace\SOCK_Stream.h>
#include <ace\Log_Record.h>
#include <ace\Log_Msg.h>
#include <ace\Message_Block.h>
#include <ace\OS.h>
#include <ace\FILE.h>
#include <ace\FILE_IO.h>
#include <ace\CDR_Base.h>
#include <iostream>
#include <ace\CDR_Stream.h>
#include <ace\FILE_Connector.h>
#include <ace\Thread_Manager.h>
using namespace std;
#define PORT 20000
class LoggingHandler
{
public:
LoggingHandler( ACE_FILE_IO &logFile )
:logFile_(logFile)
{
}

LoggingHandler( ACE_FILE_IO &logFile, ACE_HANDLE handle )
:logFile_(logFile)
{
   peer_.set_handle(handle);
}

LoggingHandler( ACE_FILE_IO &logFile, const ACE_SOCK_Stream &loggingPeer )
:logFile_(logFile), peer_(loggingPeer)
{
}

int close()
{
   return peer_.close();
}

int recvLogRecord( ACE_Message_Block *&mb );
int writeLogRecord( ACE_Message_Block *mb );
int logRecrod();

ACE_SOCK_Stream &peer()
{
   return peer_;
}

private:
// Reference to a log file
ACE_FILE_IO &logFile_;
// Connect to the client
ACE_SOCK_Stream peer_;
};

int LoggingHandler::recvLogRecord( ACE_Message_Block *&mb )
{
   ACE_INET_Addr remoteAddr;
   peer_.get_remote_addr(remoteAddr);
   mb = new ACE_Message_Block(MAXHOSTNAMELEN + 1);

   remoteAddr.get_host_name( mb->wr_ptr(), MAXHOSTNAMELEN );
   mb->wr_ptr(strlen(mb->wr_ptr()) + 1);

   ACE_Message_Block *payload = new ACE_Message_Block(ACE_DEFAULT_CDR_BUFSIZE);
   ACE_CDR::mb_align(payload);

   // 用iovec当缓冲区去接收数据结果还是不对,这是为什么?
   // 底下的客户端代码正确,调试发现数据填充也正确。
   // iovec iov;
   // peer_.recv_n(iov, 2);

    if ( peer_.recv_n(payload->wr_ptr(), 8) == 8 )
   {
         payload->wr_ptr(8);
         ACE_InputCDR cdr(payload);
         ACE_CDR::Boolean byteOrder;
         cdr>>ACE_InputCDR::to_boolean(byteOrder);
         ACE_CDR::ULong length = 0;
         cdr>>length;
         payload->size(length + 8 + ACE_CDR::MAX_ALIGNMENT);

          if ( peer_.recv_n(payload->wr_ptr(), length) > 0 )
          {
               payload->wr_ptr(length);
               mb->cont(payload);
               return length;
         }
   }

   payload->release();
   mb->release();
   payload = NULL;
   mb = NULL;

   return -1;
}

int LoggingHandler::writeLogRecord( ACE_Message_Block *mb )
{
   if( logFile_.send_n(mb) == -1 )
   {
         return -1;
   }
   if (ACE::debug())
    {
      ACE_InputCDR cdr(mb->cont());
      ACE_CDR::Boolean byteOrder;
      ACE_CDR::ULong len;
      cdr >> ACE_InputCDR::to_boolean(byteOrder);
      cdr.reset_byte_order(byteOrder);
      cdr >> len;
      ACE_Log_Record log_recrod;
      cdr >> log_recrod;
      log_recrod.print(mb->rd_ptr(), 1, cerr);
   }

   return mb->total_length();
}

int LoggingHandler::logRecrod()
{
   ACE_Message_Block *mb = 0;
   if (recvLogRecord(mb) == -1)
   {
          return -1;
      }
      else
      {
          int result = writeLogRecord(mb);
          mb->release();
          return result == -1 ? -1 : 0;
       }
}

class LoggingEventHandler : public ACE_Event_Handler
{
public:
// Initialize the base class and logging handler
LoggingEventHandler(ACE_Reactor *r)
:ACE_Event_Handler(r), loggingHandler_(logFile_)
{
}

virtual ~LoggingEventHandler()
{
// No-op destructor
}

// Activate the object
virtual int open();
// Called by reactor when logging events arrive
virtual int handle_input(ACE_HANDLE fd = ACE_INVALID_HANDLE);

// Called when this object was destroyed
virtual int handle_close(ACE_HANDLE handle = ACE_INVALID_HANDLE, ACE_Reactor_Mask closeMask = 0);

// Return socket handle of the contained<Logging_Handler>
virtual ACE_HANDLE get_handle()const
{
      LoggingHandler &h = const_cast<LoggingHandler &>(loggingHandler_);//.peer().get_handle();
      return h.peer().get_handle();
}

// Get a reference to the contained<ACE_SOCK_Stream>
ACE_SOCK_Stream &peer()
{
      return loggingHandler_.peer();
}

// Return a reference to the <logFile>
const ACE_FILE_IO &logFile()const
{
      return logFile_;
}

private:
   // File where log records are written.
   ACE_FILE_IO logFile_;
   // Connection to remote peer.
   LoggingHandler loggingHandler_;
};

int LoggingEventHandler::open()
{
   static const char LOGFILE_SUBFIX[] = ".log";
   char fileName;
   ACE_INET_Addr remoteAddr;
   loggingHandler_.peer().get_remote_addr(remoteAddr);
   remoteAddr.get_host_name(fileName, MAXHOSTNAMELEN);
   ACE_OS::strcat(fileName, LOGFILE_SUBFIX);
   ACE_DEBUG((LM_DEBUG, "Create a log file %s\n", fileName));
   ACE_FILE_Connector connector;
   connector.connect(logFile_, ACE_FILE_Addr(fileName), 0, ACE_Addr::sap_any, 0, O_RDWR|O_CREAT|O_APPEND, ACE_DEFAULT_FILE_PERMS);

   return reactor()->register_handler(this, ACE_Event_Handler::READ_MASK);
}

int LoggingEventHandler::handle_input(ACE_HANDLE handle)
{
   if (loggingHandler_.logRecrod() == -1)
   {
         ACE_DEBUG((LM_DEBUG, "Write log record failed!\n"));
         return -1;
   }
   else
   {
         ACE_DEBUG((LM_DEBUG, "Write log record succeed!\n"));
         return 0;
   }
   return 0;
}
int LoggingEventHandler::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask closeMask)
{
   loggingHandler_.close();
   logFile_.close();
   delete this;
   return 0;
}

class LoggingAcceptor : public ACE_Event_Handler
{
public:
// constructor
LoggingAcceptor(ACE_Reactor *r = ACE_Reactor::instance())
:ACE_Event_Handler(r)
{
}

// Initialization method
virtual int open(const ACE_INET_Addr &localAddr);
virtual int handle_input(ACE_HANDLE fd = ACE_INVALID_HANDLE);
virtual int handle_close(ACE_HANDLE fd = ACE_INVALID_HANDLE, ACE_Reactor_Mask closeMask = 0);

virtual ACE_HANDLE get_handle()const
{
      return acceptor_.get_handle();
}

ACE_SOCK_Acceptor &acceptor()
{
      return acceptor_;
}

protected:
~LoggingAcceptor()
{
// No-op destructor
}

private:
   ACE_SOCK_Acceptor acceptor_;
};

int LoggingAcceptor::open(const ACE_INET_Addr &localAddr)
{
   if ( acceptor_.open(localAddr) == -1 )
   {
         ACE_DEBUG((LM_DEBUG, "Open acceptor failed\n"));
         return -1;
   }

   ACE_DEBUG((LM_DEBUG, "Open the server to accept the connectors!\n"));
   return reactor()->register_handler(this, ACE_Event_Handler::ACCEPT_MASK);
}

int LoggingAcceptor::handle_input(ACE_HANDLE handle)
{
   LoggingEventHandler *peerHandle = NULL;
   ACE_NEW_RETURN(peerHandle, LoggingEventHandler(reactor()), -1);

   if ( acceptor_.accept(peerHandle->peer()) == -1 )
   {
         ACE_DEBUG((LM_DEBUG, "Accept connect failed\n"));
         delete peerHandle;
         return -1;
   }
   ACE_DEBUG((LM_DEBUG, "Established a connect\n"));
   if (peerHandle->open() == -1)
   {
          ACE_DEBUG((LM_DEBUG, "Activate LoggingEventHandler object failed\n"));
          peerHandle->handle_close();
          return -1;
      }
      return 0;
}
int LoggingAcceptor::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask closeMask)
{
   ACE_DEBUG((LM_DEBUG, "Close LoggingAcceptor\n"));
   acceptor_.close();
   delete this;
   return 0;
}

template<class ACCEPTOR>
class ReactorLoggingServer : public ACCEPTOR
{
public:
   ReactorLoggingServer(ACE_Reactor *reactor);
};

template<class ACCEPTOR>
ReactorLoggingServer<ACCEPTOR>::ReactorLoggingServer(ACE_Reactor *reactor)
:ACCEPTOR(reactor)
{
   int result;
   ACE_INET_Addr serverAddr;
   result = serverAddr.set(PORT);
   if ( result != -1 )
   {
         result = ACCEPTOR::open(serverAddr);
   }
   if ( result == -1 )
   {
         reactor->end_reactor_event_loop();
   }
}

typedef ReactorLoggingServer<LoggingAcceptor>ServerDemon;

int main( int argc, char *argv[] )
{
   ACE_DEBUG((LM_DEBUG, "Server[%P] Starting\n"));
   ACE_Reactor reactor;
   ServerDemon *server = NULL;
   ACE_NEW_RETURN(server, ServerDemon(&reactor), -1);
   ACE_DEBUG((LM_DEBUG, "Looping the event...\n"));

   if (reactor.run_reactor_event_loop() == -1)
   {
         ACE_ERROR_RETURN((LM_ERROR, "%p\n", "run_reactor_event_loop()"), 1);
   }

   return 0;
}

客户端代码如下:
#include <ace\OS.h>
#include <ace\CDR_Stream.h>
#include <ace\INET_Addr.h>
#include <ace\SOCK_Connector.h>
#include <ace\SOCK_Stream.h>
#include <ace\Log_Record.h>
#include <ace\streams.h>
#include <ace\Log_Msg.h>
#include <string>
#define PORT   20000
#define SERVER "127.0.0.1"

class LoggingClient
{
public:
// Send <Log_Record> to the server
int send(const ACE_Log_Record &logRecord);

// Access method
ACE_SOCK_Stream &peer()
{
      return loggingPeer_;
}
// Close the connection to the server
~LoggingClient()
{
      loggingPeer_.close();
}
private:
   ACE_SOCK_Stream loggingPeer_;
};

int LoggingClient::send(const ACE_Log_Record &logRecord)
{
    const size_t maxPayloadSize =
       4                           // type
      +8                            // timestamp
      +4                            // process id
      +4                            // data length
      +ACE_Log_Record::MAXLOGMSGLEN // data
      +ACE_CDR::MAX_ALIGNMENT;         // padding

       ACE_OutputCDR payLoad(maxPayloadSize);
       payLoad << logRecord;
       ACE_CDR::ULong length = payLoad.total_length();

      ACE_OutputCDR header(ACE_CDR::MAX_ALIGNMENT + 8);
      header << ACE_OutputCDR::from_boolean(ACE_CDR_BYTE_ORDER);
      header << length;

      iovec iov;
      iov.iov_base = header.begin()->rd_ptr();
      iov.iov_len= 8;
      iov.iov_base = payLoad.begin()->rd_ptr();
      iov.iov_len= length;
      return loggingPeer_.send_n(iov, 2);
}

int main(int argc, char *argv[])
{
   ACE_INET_Addr serverAddr(PORT, SERVER);
   ACE_SOCK_Connector connector;
   LoggingClient logClient;
   ACE_Time_Value t(10,100);
   if ( connector.connect(logClient.peer(), serverAddr, &t) < 0 )
   {
         ACE_DEBUG((LM_DEBUG, "Connect to %s failed\n", SERVER));
         return 0;
   }

   ACE_DEBUG((LM_DEBUG, "Connect to %s succeed\n", SERVER));
   // Limit the number of characters read on each record
   cin.width(ACE_Log_Record::MAXLOGMSGLEN );

   while (1)
   {
         std::cout<<"get data from client(exit):";
         std::string userInput;
         getline(cin, userInput, '\n');
         if ( userInput.compare("exit") == 0 )
         {
            break;
         }

         ACE_Time_Value now(ACE_OS::gettimeofday());
         ACE_Log_Record logRecord(LM_INFO, now, ACE_OS::getpid());
         logRecord.msg_data(userInput.c_str());
         if ( logClient.send(logRecord) <= 0 )
         {
             ACE_DEBUG((LM_DEBUG, "Send record to %s failed\n", SERVER));
             break;
         }
         ACE_DEBUG((LM_DEBUG, "Send record to %s succeed\n", SERVER));
         ACE_OS::sleep(1);
   }

   return 0;
}

[ 本帖最后由 pengxiqin 于 2009-12-16 23:33 编辑 ]

pengxiqin 发表于 2009-12-16 18:50:11

哪位大侠帮我解答下吧,小弟真的找不到解答的人了,谢谢!!!!!!!!

winston 发表于 2009-12-16 20:01:12

这类问题,可以用抓包、输出的办法来解决。
阻塞到这里面,是因为要收取的数据数量收不到。
页: [1]
查看完整版本: Client用iovec填充数据发送成功。Server端recv_n接收数据错误