|
斑竹好,大家好,学习ACE中,使用proactor的程序在使用中偶尔会在ace的这段代码里面出错
ACE_INLINE void
ACE_Message_Block::wr_ptr (size_t n)
{
ACE_TRACE ("ACE_Message_Block::wr_ptr");
this->wr_ptr_ += n;
}
也就是说this已经是一个非法指针了,仍旧在访问wr_ptr,检查来检查去也不知道是哪里的问题,希望大家帮忙,先谢谢了
程序只有一个线程,流程是这样的,一个异步的acceptor一直处于监听网络连接状态,当有新的连接进来时,创建一个ClientNetworkHandler对象和
一个Client对象,在Client对象的构造函数里面启动一个定时器,当这个client过了10秒钟还没有收到512个字节的数据的话,就将这个Client和ClientNetworkHandler
删除,ClientNetworkHandler用于从网络上接收数据,接收到数据之后放到Client对象的缓冲区中,由Client对象处理接收到的数据,
每从网络上接收到4个字节的数据,则通过ClientNetworkHandler回应10K的数据。
ClientNetworkHandler从ACE_Service_Handler和ACE_Task<ACE_MT_SYNCH>上派生,当socket不可写时,则先通过putq将数据放在
ClientNetworkHandler的缓冲区(也就是ACE_Task<ACE_MT_SYNCH>的message_queue)中,在下一次socket可写时,再继续发送
如果网络出错,则将Client和ClientNetworkHandler的对象删除
在ClientNetworkHandler的析构函数里面关闭socket句柄和释放message_queue中的message_block,下面是ClientNetworkHandler与Client的代码
Class Client;
class ClientNetworkHandler : public ACE_Service_Handler, public ACE_Task<ACE_MT_SYNCH>
{
public:
static const int client_message_head_size = 8;
static const int max_http_request_size = 10240;
private:
Client* m_client; //the client of this handler
rw::net::NetAddress m_remote; //the client address of this handler
rw::net::NetAddress m_local; //the server address of this hanlder
ACE_Asynch_Read_Stream m_reader; //reader read from network
ACE_Asynch_Write_Stream m_writer; //writer write to network
ACE_Message_Block *m_readBlock; //temp data read from network
bool m_canWrite; //if the socket can write now, if we've pending a write operation on this client, we assume that the socket cannot
//be write, else the socket can be write
public:
ClientNetworkHandler(Client* client = NULL);
virtual ~ClientNetworkHandler();
public:
void dispatchReceived(ACE_Message_Block* data);
void write(ACE_Message_Block* block = NULL);
public:
virtual void open(ACE_HANDLE new_handle, ACE_Message_Block& message_block);
virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result& result);
virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result& result);
virtual void addresses(const ACE_INET_Addr& remote_address, const ACE_INET_Addr& local_address);
public:
void drop();
void send(const ACE_Message_Block* data);
const rw::net::NetAddress& remote() const{ return m_remote; }
};
ClientNetworkHandler::ClientNetworkHandler(Client* client)
: m_readBlock(NULL), m_canWrite(true)
{
m_client = client;
}
ClientNetworkHandler::~ClientNetworkHandler()
{
//关闭socket句柄
if(handle() != ACE_INVALID_HANDLE){
ACE_OS::closesocket(handle());
}
//释放从网络上读取数据的message_block
if(m_readBlock) m_readBlock->release();
m_readBlock = NULL;
//将这个对象里面存储的还没有发送出去的message_block释放掉
ACE_Message_Block* block = NULL;
ACE_Time_Value non_block(0, 0);
this->getq(block, &non_block);
while(block != NULL){
block->release();
block = NULL;
this->getq(block, &non_block);
}
m_client = NULL;
m_canWrite = false;
//释放读stream
m_reader.cancel();
//释放写stream
m_writer.cancel();
}
//将从网络上读取的数据交给对应的client处理
void
ClientNetworkHandler::dispatchReceived(ACE_Message_Block *data)
{
if(m_client) m_client->handleReceived(data);
}
//往网络上写数据
void
ClientNetworkHandler::write(ACE_Message_Block *block)
{
//如果写的是null,则从本对象的发送缓存里面取message_block进行发送
if(block == NULL){
ACE_Time_Value non_block(0);
this->getq(block, &non_block);
}
if(block){
m_canWrite = false;
if(m_writer.write(*block, block->length()) == -1){
//将发送出去的block从本对象的发送缓存中移除
ungetq(block);
}
}
}
//初始化
void
ClientNetworkHandler::open(ACE_HANDLE new_handle, ACE_Message_Block &message_block)
{
if(new_handle != ACE_INVALID_HANDLE) handle(new_handle);
if(m_reader.open(*this) < 0){
rf_log(RW_TEXT("open reader failed: ") << errno, RFLogger::LL_fatal);
drop();
return;
}
if(m_writer.open(*this) < 0){
rf_log(RW_TEXT("open writer failed: ") << errno, RFLogger::LL_fatal);
drop();
return;
}
ACE_NEW_NORETURN(m_readBlock, ACE_Message_Block(max_http_request_size));
if(m_readBlock == NULL){
rf_log(RW_TEXT("new message block failed: ") << errno, RFLogger::LL_fatal);
drop();
return;
}
m_reader.read(*m_readBlock, max_http_request_size);
}
//处理从网络上读取的数据
void
ClientNetworkHandler::handle_read_stream(const ACE_Asynch_Read_Stream::Result& result)
{
if(result.handle() != handle()){
return ;
}
if(!result.success() ||
(result.bytes_to_read() != 0 && result.bytes_transferred() == 0)){
//如果读取失败,则关闭这个对象
rf_log(RW_TEXT("socket closed by client: ") << charsetAlign(m_remote.toString()).c_str(), RFLogger::LL_debug);
drop();
return;
} else {
//如果读取到了数据则将数据交给相应的client对象处理
dispatchReceived(m_readBlock);
m_readBlock->reset();
m_reader.read(*m_readBlock, max_http_request_size);
}
}
//处理网络的写事件
void
ClientNetworkHandler::handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
{
ACE_Message_Block &temp_block = result.message_block();
if(!result.success()){
//如果写失败了,则关闭这个对象
drop();
return;
} else {
if(handle() != result.handle()){
//如果穿回来了错误的句柄,则关闭这个对象
m_canWrite = false;
drop();
return;
}
m_canWrite = true;
if (temp_block.length () == 0) {
//如果返回的message_block已经完全发送了,则将这个message_block释放
temp_block.release();
//同时从缓冲区中取出新的message_block进行发送
write();
} else {
//如果返回的message_block还没有发送完成,则继续发送这个message_block剩下的数据
write(&temp_block);
}
}
}
void
ClientNetworkHandler::addresses(const ACE_INET_Addr &remote_address, const ACE_INET_Addr &local_address)
{
m_remote.family(remote_address.get_type());
m_remote.ip(ACE_HTONL(remote_address.get_ip_address()));
m_remote.port(remote_address.get_port_number());
m_local.family(local_address.get_type());
m_local.ip(ACE_HTONL(local_address.get_ip_address()));
m_local.port(local_address.get_port_number());
//cout << "remote address: " << m_remote.toString() << " local address: " << m_local.toString() << endl;
}
//关闭这个网络数据处理对象
void
ClientNetworkHandler::drop()
{
//如果这个网络对象有相应的client,则告诉这个client这个网络对象已经关闭
if(m_client) m_client->handleHandlerClosed();
else{
//否则把自己给删除了
rf_log(RW_TEXT("no client for this handler: ") << charsetAlign(m_remote.toString()).c_str(), RFLogger::LL_fatal);
delete this;
}
}
void
ClientNetworkHandler::send(const ACE_Message_Block* data)
{
//如果可以直接往网络上发送数据,则将data发送给网络
if(m_canWrite) write(const_cast<ACE_Message_Block*>(data));
//如果不行,则将这个数据先放到缓冲区中,等待下次调用handle_write_stream时再进行发送
else this->putq(const_cast<ACE_Message_Block*>(data));
}
class Client : public ACE_Handler
{
public:
static const int client_id_size = 20; //id size of client
static const int max_initial_timeout = 10; //the maximum time we wait for the client to set it's initial imformation, 10 seconds
protected:
ClientNetworkHandler* m_networkHandler; //handler of this client
char m_buffer[1024]; //接收客户数据的缓冲区(客户发送的最多数据不会超过1024个字节)
int m_currentBufferLength; //当期缓冲区数据总长度
int m_currentParseOffset; //当前解析到的数据偏移
public:
Client();
virtual ~Client();
public:
virtual void handle_time_out(const ACE_Time_Value& tv, const void* art = 0);
void handleReceived(ACE_Message_Block* msg_block);
void handleHandlerClosed();
void dropSelf();
public:
int processInputBuffer();
void sendData(const ACE_Message_Block* data);
};
Client::Client()
: m_networkHandler(NULL), m_currentBufferLength(0),m_currentParseOffset(0)
{
ACE_Proactor::instance()->schedule_timer(*this, (void*)1, ACE_Time_Value(10));
}
Client::~Client()
{
}
void
Client::handle_time_out(const ACE_Time_Value &tv, const void *art)
{
int timer = (int)art;
switch(timer)
{
case 1:
{
if(m_currentBufferLength < 512){
dropSelf();
}
}
break;
}
}
void
Client::handleReceived(ACE_Message_Block* msg_block)
{
if(msg_block->length() + m_currentBufferLength > 1024){
//如果数据大于所规定的最大数据,则关闭这个客户
dropSelf();
return;
}
//将数据复制到客户的缓冲区中
memcpy(m_buffer + m_currentBufferLength, msg_block->rd_ptr(), msg_block->length());
m_currentBufferLength += msg_block->length();
//处理缓冲区中的数据
processInputBuffer();
}
void
Client::handleHandlerClosed()
{
//如果客户的网络处理对象关了,则把这个客户也关了
dropSelf();
}
void
Client::dropSelf()
{
//delete这个客户的网络处理对象
if(m_networkHandler) delete m_networkHandler;
m_networkHandler = NULL;
//delete这个客户
delete this;
}
int
Client::processInputBuffer()
{
while(true){
if(m_currentParseOffset + 4 > m_currentBufferLength) break;//如果当前解析数据的便宜已经大于当前缓冲区长度
m_currentParseOffset += 4; //解析偏移4个字节
//客户没发送4个字节的数据上来,回应10K的数据回去
ACE_Message_Block* ret_data;
ACE_NEW_NORETURN(ret_data, ACE_Message_Block(10240));
send(ret_data);
}
return 0;
}
void
Client::sendData(const ACE_Message_Block* data)
{
m_networkHandler->send(data);
} |
|