|
用vc.net编译的。用下面的客户端发送数据,客户端int LoggingClient::send(const ACE_Log_Record &logRecord)函数里
iovec iov[2];
iov[0].iov_base = header.begin()->rd_ptr();
iov[0].iov_len = 8;
iov[1].iov_base = payLoad.begin()->rd_ptr();
iov[1].iov_len = length;
return loggingPeer_.send_n(iov, 2)填充数据正确,发送数据也成功,而下面的服务端代码总是被阻塞在LoggingHandler类中的recvLogRecord函数中的if ( peer_.recv_n(payload->wr_ptr(), 8) == 8 )处,显示接收到的数据不正确。但是我换成如下的接收方式
iovec iov[2];
peer_.recv_n(iov, 2);
接收到的数据内容还是不正确。为什么呢?
请高手们帮忙解答,谢谢!
客户端,服务端代码如下,谢谢指教,也找不到询问的朋友,愿大家指导?
服务端代码如下:
#include <ace\Event_Handler.h>
#include <ace\INET_Addr.h>
#include <ace\Reactor.h>
#include <ace\SOCK_Acceptor.h>
#include <ace\SOCK_Stream.h>
#include <ace\Log_Record.h>
#include <ace\Log_Msg.h>
#include <ace\Message_Block.h>
#include <ace\OS.h>
#include <ace\FILE.h>
#include <ace\FILE_IO.h>
#include <ace\CDR_Base.h>
#include <iostream>
#include <ace\CDR_Stream.h>
#include <ace\FILE_Connector.h>
#include <ace\Thread_Manager.h>
using namespace std;
#define PORT 20000
class LoggingHandler
{
public:
LoggingHandler( ACE_FILE_IO &logFile )
:logFile_(logFile)
{
}
LoggingHandler( ACE_FILE_IO &logFile, ACE_HANDLE handle )
:logFile_(logFile)
{
peer_.set_handle(handle);
}
LoggingHandler( ACE_FILE_IO &logFile, const ACE_SOCK_Stream &loggingPeer )
:logFile_(logFile), peer_(loggingPeer)
{
}
int close()
{
return peer_.close();
}
int recvLogRecord( ACE_Message_Block *&mb );
int writeLogRecord( ACE_Message_Block *mb );
int logRecrod();
ACE_SOCK_Stream &peer()
{
return peer_;
}
private:
// Reference to a log file
ACE_FILE_IO &logFile_;
// Connect to the client
ACE_SOCK_Stream peer_;
};
int LoggingHandler::recvLogRecord( ACE_Message_Block *&mb )
{
ACE_INET_Addr remoteAddr;
peer_.get_remote_addr(remoteAddr);
mb = new ACE_Message_Block(MAXHOSTNAMELEN + 1);
remoteAddr.get_host_name( mb->wr_ptr(), MAXHOSTNAMELEN );
mb->wr_ptr(strlen(mb->wr_ptr()) + 1);
ACE_Message_Block *payload = new ACE_Message_Block(ACE_DEFAULT_CDR_BUFSIZE);
ACE_CDR::mb_align(payload);
// 用iovec当缓冲区去接收数据结果还是不对,这是为什么?
// 底下的客户端代码正确,调试发现数据填充也正确。
// iovec iov[2];
// peer_.recv_n(iov, 2);
if ( peer_.recv_n(payload->wr_ptr(), 8) == 8 )
{
payload->wr_ptr(8);
ACE_InputCDR cdr(payload);
ACE_CDR::Boolean byteOrder;
cdr>>ACE_InputCDR::to_boolean(byteOrder);
ACE_CDR::ULong length = 0;
cdr>>length;
payload->size(length + 8 + ACE_CDR::MAX_ALIGNMENT);
if ( peer_.recv_n(payload->wr_ptr(), length) > 0 )
{
payload->wr_ptr(length);
mb->cont(payload);
return length;
}
}
payload->release();
mb->release();
payload = NULL;
mb = NULL;
return -1;
}
int LoggingHandler::writeLogRecord( ACE_Message_Block *mb )
{
if( logFile_.send_n(mb) == -1 )
{
return -1;
}
if (ACE::debug())
{
ACE_InputCDR cdr(mb->cont());
ACE_CDR::Boolean byteOrder;
ACE_CDR::ULong len;
cdr >> ACE_InputCDR::to_boolean(byteOrder);
cdr.reset_byte_order(byteOrder);
cdr >> len;
ACE_Log_Record log_recrod;
cdr >> log_recrod;
log_recrod.print(mb->rd_ptr(), 1, cerr);
}
return mb->total_length();
}
int LoggingHandler::logRecrod()
{
ACE_Message_Block *mb = 0;
if (recvLogRecord(mb) == -1)
{
return -1;
}
else
{
int result = writeLogRecord(mb);
mb->release();
return result == -1 ? -1 : 0;
}
}
class LoggingEventHandler : public ACE_Event_Handler
{
public:
// Initialize the base class and logging handler
LoggingEventHandler(ACE_Reactor *r)
:ACE_Event_Handler(r), loggingHandler_(logFile_)
{
}
virtual ~LoggingEventHandler()
{
// No-op destructor
}
// Activate the object
virtual int open();
// Called by reactor when logging events arrive
virtual int handle_input(ACE_HANDLE fd = ACE_INVALID_HANDLE);
// Called when this object was destroyed
virtual int handle_close(ACE_HANDLE handle = ACE_INVALID_HANDLE, ACE_Reactor_Mask closeMask = 0);
// Return socket handle of the contained<Logging_Handler>
virtual ACE_HANDLE get_handle()const
{
LoggingHandler &h = const_cast<LoggingHandler &>(loggingHandler_);//.peer().get_handle();
return h.peer().get_handle();
}
// Get a reference to the contained<ACE_SOCK_Stream>
ACE_SOCK_Stream &peer()
{
return loggingHandler_.peer();
}
// Return a reference to the <logFile>
const ACE_FILE_IO &logFile()const
{
return logFile_;
}
private:
// File where log records are written.
ACE_FILE_IO logFile_;
// Connection to remote peer.
LoggingHandler loggingHandler_;
};
int LoggingEventHandler::open()
{
static const char LOGFILE_SUBFIX[] = ".log";
char fileName[MAXHOSTNAMELEN+sizeof(LOGFILE_SUBFIX)];
ACE_INET_Addr remoteAddr;
loggingHandler_.peer().get_remote_addr(remoteAddr);
remoteAddr.get_host_name(fileName, MAXHOSTNAMELEN);
ACE_OS::strcat(fileName, LOGFILE_SUBFIX);
ACE_DEBUG((LM_DEBUG, "Create a log file %s\n", fileName));
ACE_FILE_Connector connector;
connector.connect(logFile_, ACE_FILE_Addr(fileName), 0, ACE_Addr::sap_any, 0, O_RDWR|O_CREAT|O_APPEND, ACE_DEFAULT_FILE_PERMS);
return reactor()->register_handler(this, ACE_Event_Handler::READ_MASK);
}
int LoggingEventHandler::handle_input(ACE_HANDLE handle)
{
if (loggingHandler_.logRecrod() == -1)
{
ACE_DEBUG((LM_DEBUG, "Write log record failed!\n"));
return -1;
}
else
{
ACE_DEBUG((LM_DEBUG, "Write log record succeed!\n"));
return 0;
}
return 0;
}
int LoggingEventHandler::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask closeMask)
{
loggingHandler_.close();
logFile_.close();
delete this;
return 0;
}
class LoggingAcceptor : public ACE_Event_Handler
{
public:
// constructor
LoggingAcceptor(ACE_Reactor *r = ACE_Reactor::instance())
:ACE_Event_Handler(r)
{
}
// Initialization method
virtual int open(const ACE_INET_Addr &localAddr);
virtual int handle_input(ACE_HANDLE fd = ACE_INVALID_HANDLE);
virtual int handle_close(ACE_HANDLE fd = ACE_INVALID_HANDLE, ACE_Reactor_Mask closeMask = 0);
virtual ACE_HANDLE get_handle()const
{
return acceptor_.get_handle();
}
ACE_SOCK_Acceptor &acceptor()
{
return acceptor_;
}
protected:
~LoggingAcceptor()
{
// No-op destructor
}
private:
ACE_SOCK_Acceptor acceptor_;
};
int LoggingAcceptor::open(const ACE_INET_Addr &localAddr)
{
if ( acceptor_.open(localAddr) == -1 )
{
ACE_DEBUG((LM_DEBUG, "Open acceptor failed\n"));
return -1;
}
ACE_DEBUG((LM_DEBUG, "Open the server to accept the connectors!\n"));
return reactor()->register_handler(this, ACE_Event_Handler::ACCEPT_MASK);
}
int LoggingAcceptor::handle_input(ACE_HANDLE handle)
{
LoggingEventHandler *peerHandle = NULL;
ACE_NEW_RETURN(peerHandle, LoggingEventHandler(reactor()), -1);
if ( acceptor_.accept(peerHandle->peer()) == -1 )
{
ACE_DEBUG((LM_DEBUG, "Accept connect failed\n"));
delete peerHandle;
return -1;
}
ACE_DEBUG((LM_DEBUG, "Established a connect\n"));
if (peerHandle->open() == -1)
{
ACE_DEBUG((LM_DEBUG, "Activate LoggingEventHandler object failed\n"));
peerHandle->handle_close();
return -1;
}
return 0;
}
int LoggingAcceptor::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask closeMask)
{
ACE_DEBUG((LM_DEBUG, "Close LoggingAcceptor\n"));
acceptor_.close();
delete this;
return 0;
}
template<class ACCEPTOR>
class ReactorLoggingServer : public ACCEPTOR
{
public:
ReactorLoggingServer(ACE_Reactor *reactor);
};
template<class ACCEPTOR>
ReactorLoggingServer<ACCEPTOR>::ReactorLoggingServer(ACE_Reactor *reactor)
:ACCEPTOR(reactor)
{
int result;
ACE_INET_Addr serverAddr;
result = serverAddr.set(PORT);
if ( result != -1 )
{
result = ACCEPTOR::open(serverAddr);
}
if ( result == -1 )
{
reactor->end_reactor_event_loop();
}
}
typedef ReactorLoggingServer<LoggingAcceptor>ServerDemon;
int main( int argc, char *argv[] )
{
ACE_DEBUG((LM_DEBUG, "Server[%P] Starting\n"));
ACE_Reactor reactor;
ServerDemon *server = NULL;
ACE_NEW_RETURN(server, ServerDemon(&reactor), -1);
ACE_DEBUG((LM_DEBUG, "Looping the event...\n"));
if (reactor.run_reactor_event_loop() == -1)
{
ACE_ERROR_RETURN((LM_ERROR, "%p\n", "run_reactor_event_loop()"), 1);
}
return 0;
}
客户端代码如下:
#include <ace\OS.h>
#include <ace\CDR_Stream.h>
#include <ace\INET_Addr.h>
#include <ace\SOCK_Connector.h>
#include <ace\SOCK_Stream.h>
#include <ace\Log_Record.h>
#include <ace\streams.h>
#include <ace\Log_Msg.h>
#include <string>
#define PORT 20000
#define SERVER "127.0.0.1"
class LoggingClient
{
public:
// Send <Log_Record> to the server
int send(const ACE_Log_Record &logRecord);
// Access method
ACE_SOCK_Stream &peer()
{
return loggingPeer_;
}
// Close the connection to the server
~LoggingClient()
{
loggingPeer_.close();
}
private:
ACE_SOCK_Stream loggingPeer_;
};
int LoggingClient::send(const ACE_Log_Record &logRecord)
{
const size_t maxPayloadSize =
4 // type
+8 // timestamp
+4 // process id
+4 // data length
+ACE_Log_Record::MAXLOGMSGLEN // data
+ACE_CDR::MAX_ALIGNMENT; // padding
ACE_OutputCDR payLoad(maxPayloadSize);
payLoad << logRecord;
ACE_CDR::ULong length = payLoad.total_length();
ACE_OutputCDR header(ACE_CDR::MAX_ALIGNMENT + 8);
header << ACE_OutputCDR::from_boolean(ACE_CDR_BYTE_ORDER);
header << length;
iovec iov[2];
iov[0].iov_base = header.begin()->rd_ptr();
iov[0].iov_len = 8;
iov[1].iov_base = payLoad.begin()->rd_ptr();
iov[1].iov_len = length;
return loggingPeer_.send_n(iov, 2);
}
int main(int argc, char *argv[])
{
ACE_INET_Addr serverAddr(PORT, SERVER);
ACE_SOCK_Connector connector;
LoggingClient logClient;
ACE_Time_Value t(10,100);
if ( connector.connect(logClient.peer(), serverAddr, &t) < 0 )
{
ACE_DEBUG((LM_DEBUG, "Connect to %s failed\n", SERVER));
return 0;
}
ACE_DEBUG((LM_DEBUG, "Connect to %s succeed\n", SERVER));
// Limit the number of characters read on each record
cin.width(ACE_Log_Record::MAXLOGMSGLEN );
while (1)
{
std::cout<<"get data from client(exit):";
std::string userInput;
getline(cin, userInput, '\n');
if ( userInput.compare("exit") == 0 )
{
break;
}
ACE_Time_Value now(ACE_OS::gettimeofday());
ACE_Log_Record logRecord(LM_INFO, now, ACE_OS::getpid());
logRecord.msg_data(userInput.c_str());
if ( logClient.send(logRecord) <= 0 )
{
ACE_DEBUG((LM_DEBUG, "Send record to %s failed\n", SERVER));
break;
}
ACE_DEBUG((LM_DEBUG, "Send record to %s succeed\n", SERVER));
ACE_OS::sleep(1);
}
return 0;
}
[ 本帖最后由 pengxiqin 于 2009-12-16 23:33 编辑 ] |
|