使用ACE Proactor的一个问题
斑竹好,大家好,学习ACE中,使用proactor的程序在使用中偶尔会在ace的这段代码里面出错ACE_INLINE void
ACE_Message_Block::wr_ptr (size_t n)
{
ACE_TRACE ("ACE_Message_Block::wr_ptr");
this->wr_ptr_ += n;
}
也就是说this已经是一个非法指针了,仍旧在访问wr_ptr,检查来检查去也不知道是哪里的问题,希望大家帮忙,先谢谢了
程序只有一个线程,流程是这样的,一个异步的acceptor一直处于监听网络连接状态,当有新的连接进来时,创建一个ClientNetworkHandler对象和
一个Client对象,在Client对象的构造函数里面启动一个定时器,当这个client过了10秒钟还没有收到512个字节的数据的话,就将这个Client和ClientNetworkHandler
删除,ClientNetworkHandler用于从网络上接收数据,接收到数据之后放到Client对象的缓冲区中,由Client对象处理接收到的数据,
每从网络上接收到4个字节的数据,则通过ClientNetworkHandler回应10K的数据。
ClientNetworkHandler从ACE_Service_Handler和ACE_Task<ACE_MT_SYNCH>上派生,当socket不可写时,则先通过putq将数据放在
ClientNetworkHandler的缓冲区(也就是ACE_Task<ACE_MT_SYNCH>的message_queue)中,在下一次socket可写时,再继续发送
如果网络出错,则将Client和ClientNetworkHandler的对象删除
在ClientNetworkHandler的析构函数里面关闭socket句柄和释放message_queue中的message_block,下面是ClientNetworkHandler与Client的代码
Class Client;
class ClientNetworkHandler : public ACE_Service_Handler, public ACE_Task<ACE_MT_SYNCH>
{
public:
static const int client_message_head_size = 8;
static const int max_http_request_size = 10240;
private:
Client* m_client; //the client of this handler
rw::net::NetAddress m_remote; //the client address of this handler
rw::net::NetAddress m_local; //the server address of this hanlder
ACE_Asynch_Read_Stream m_reader; //reader read from network
ACE_Asynch_Write_Stream m_writer; //writer write to network
ACE_Message_Block *m_readBlock; //temp data read from network
bool m_canWrite; //if the socket can write now, if we've pending a write operation on this client, we assume that the socket cannot
//be write, else the socket can be write
public:
ClientNetworkHandler(Client* client = NULL);
virtual ~ClientNetworkHandler();
public:
void dispatchReceived(ACE_Message_Block* data);
void write(ACE_Message_Block* block = NULL);
public:
virtual void open(ACE_HANDLE new_handle, ACE_Message_Block& message_block);
virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result& result);
virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result& result);
virtual void addresses(const ACE_INET_Addr& remote_address, const ACE_INET_Addr& local_address);
public:
void drop();
void send(const ACE_Message_Block* data);
const rw::net::NetAddress& remote() const{ return m_remote; }
};
ClientNetworkHandler::ClientNetworkHandler(Client* client)
: m_readBlock(NULL), m_canWrite(true)
{
m_client = client;
}
ClientNetworkHandler::~ClientNetworkHandler()
{
//关闭socket句柄
if(handle() != ACE_INVALID_HANDLE){
ACE_OS::closesocket(handle());
}
//释放从网络上读取数据的message_block
if(m_readBlock) m_readBlock->release();
m_readBlock = NULL;
//将这个对象里面存储的还没有发送出去的message_block释放掉
ACE_Message_Block* block = NULL;
ACE_Time_Value non_block(0, 0);
this->getq(block, &non_block);
while(block != NULL){
block->release();
block = NULL;
this->getq(block, &non_block);
}
m_client = NULL;
m_canWrite = false;
//释放读stream
m_reader.cancel();
//释放写stream
m_writer.cancel();
}
//将从网络上读取的数据交给对应的client处理
void
ClientNetworkHandler::dispatchReceived(ACE_Message_Block *data)
{
if(m_client) m_client->handleReceived(data);
}
//往网络上写数据
void
ClientNetworkHandler::write(ACE_Message_Block *block)
{
//如果写的是null,则从本对象的发送缓存里面取message_block进行发送
if(block == NULL){
ACE_Time_Value non_block(0);
this->getq(block, &non_block);
}
if(block){
m_canWrite = false;
if(m_writer.write(*block, block->length()) == -1){
//将发送出去的block从本对象的发送缓存中移除
ungetq(block);
}
}
}
//初始化
void
ClientNetworkHandler::open(ACE_HANDLE new_handle, ACE_Message_Block &message_block)
{
if(new_handle != ACE_INVALID_HANDLE) handle(new_handle);
if(m_reader.open(*this) < 0){
rf_log(RW_TEXT("open reader failed: ") << errno, RFLogger::LL_fatal);
drop();
return;
}
if(m_writer.open(*this) < 0){
rf_log(RW_TEXT("open writer failed: ") << errno, RFLogger::LL_fatal);
drop();
return;
}
ACE_NEW_NORETURN(m_readBlock, ACE_Message_Block(max_http_request_size));
if(m_readBlock == NULL){
rf_log(RW_TEXT("new message block failed: ") << errno, RFLogger::LL_fatal);
drop();
return;
}
m_reader.read(*m_readBlock, max_http_request_size);
}
//处理从网络上读取的数据
void
ClientNetworkHandler::handle_read_stream(const ACE_Asynch_Read_Stream::Result& result)
{
if(result.handle() != handle()){
return ;
}
if(!result.success() ||
(result.bytes_to_read() != 0 && result.bytes_transferred() == 0)){
//如果读取失败,则关闭这个对象
rf_log(RW_TEXT("socket closed by client:") << charsetAlign(m_remote.toString()).c_str(), RFLogger::LL_debug);
drop();
return;
} else {
//如果读取到了数据则将数据交给相应的client对象处理
dispatchReceived(m_readBlock);
m_readBlock->reset();
m_reader.read(*m_readBlock, max_http_request_size);
}
}
//处理网络的写事件
void
ClientNetworkHandler::handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
{
ACE_Message_Block &temp_block = result.message_block();
if(!result.success()){
//如果写失败了,则关闭这个对象
drop();
return;
} else {
if(handle() != result.handle()){
//如果穿回来了错误的句柄,则关闭这个对象
m_canWrite = false;
drop();
return;
}
m_canWrite = true;
if (temp_block.length () == 0) {
//如果返回的message_block已经完全发送了,则将这个message_block释放
temp_block.release();
//同时从缓冲区中取出新的message_block进行发送
write();
} else {
//如果返回的message_block还没有发送完成,则继续发送这个message_block剩下的数据
write(&temp_block);
}
}
}
void
ClientNetworkHandler::addresses(const ACE_INET_Addr &remote_address, const ACE_INET_Addr &local_address)
{
m_remote.family(remote_address.get_type());
m_remote.ip(ACE_HTONL(remote_address.get_ip_address()));
m_remote.port(remote_address.get_port_number());
m_local.family(local_address.get_type());
m_local.ip(ACE_HTONL(local_address.get_ip_address()));
m_local.port(local_address.get_port_number());
//cout << "remote address: " << m_remote.toString() << " local address: " << m_local.toString() << endl;
}
//关闭这个网络数据处理对象
void
ClientNetworkHandler::drop()
{
//如果这个网络对象有相应的client,则告诉这个client这个网络对象已经关闭
if(m_client) m_client->handleHandlerClosed();
else{
//否则把自己给删除了
rf_log(RW_TEXT("no client for this handler: ") << charsetAlign(m_remote.toString()).c_str(), RFLogger::LL_fatal);
delete this;
}
}
void
ClientNetworkHandler::send(const ACE_Message_Block* data)
{
//如果可以直接往网络上发送数据,则将data发送给网络
if(m_canWrite) write(const_cast<ACE_Message_Block*>(data));
//如果不行,则将这个数据先放到缓冲区中,等待下次调用handle_write_stream时再进行发送
else this->putq(const_cast<ACE_Message_Block*>(data));
}
class Client : public ACE_Handler
{
public:
static const int client_id_size = 20; //id size of client
static const int max_initial_timeout = 10; //the maximum time we wait for the client to set it's initial imformation, 10 seconds
protected:
ClientNetworkHandler* m_networkHandler; //handler of this client
char m_buffer; //接收客户数据的缓冲区(客户发送的最多数据不会超过1024个字节)
int m_currentBufferLength; //当期缓冲区数据总长度
int m_currentParseOffset; //当前解析到的数据偏移
public:
Client();
virtual ~Client();
public:
virtual void handle_time_out(const ACE_Time_Value& tv, const void* art = 0);
void handleReceived(ACE_Message_Block* msg_block);
void handleHandlerClosed();
void dropSelf();
public:
int processInputBuffer();
void sendData(const ACE_Message_Block* data);
};
Client::Client()
: m_networkHandler(NULL), m_currentBufferLength(0),m_currentParseOffset(0)
{
ACE_Proactor::instance()->schedule_timer(*this, (void*)1, ACE_Time_Value(10));
}
Client::~Client()
{
}
void
Client::handle_time_out(const ACE_Time_Value &tv, const void *art)
{
int timer = (int)art;
switch(timer)
{
case 1:
{
if(m_currentBufferLength < 512){
dropSelf();
}
}
break;
}
}
void
Client::handleReceived(ACE_Message_Block* msg_block)
{
if(msg_block->length() + m_currentBufferLength > 1024){
//如果数据大于所规定的最大数据,则关闭这个客户
dropSelf();
return;
}
//将数据复制到客户的缓冲区中
memcpy(m_buffer + m_currentBufferLength, msg_block->rd_ptr(), msg_block->length());
m_currentBufferLength += msg_block->length();
//处理缓冲区中的数据
processInputBuffer();
}
void
Client::handleHandlerClosed()
{
//如果客户的网络处理对象关了,则把这个客户也关了
dropSelf();
}
void
Client::dropSelf()
{
//delete这个客户的网络处理对象
if(m_networkHandler) delete m_networkHandler;
m_networkHandler = NULL;
//delete这个客户
delete this;
}
int
Client::processInputBuffer()
{
while(true){
if(m_currentParseOffset + 4 > m_currentBufferLength) break;//如果当前解析数据的便宜已经大于当前缓冲区长度
m_currentParseOffset += 4; //解析偏移4个字节
//客户没发送4个字节的数据上来,回应10K的数据回去
ACE_Message_Block* ret_data;
ACE_NEW_NORETURN(ret_data, ACE_Message_Block(10240));
send(ret_data);
}
return 0;
}
void
Client::sendData(const ACE_Message_Block* data)
{
m_networkHandler->send(data);
} 怎么这么多人路过不回复啊。。。
大家帮帮忙啊 路过的人很多,还是没人回答,是太简单了么?还是大家都过周末去了啊?
继续求助 代码问题很多的。
下面的代码,完全没有考虑到任何同步的问题。
Proactor架构难点就在于线程、异步、对象相互结合,稍有理解不当,就会出现错误。
void Client::dropSelf()
{
//delete这个客户的网络处理对象
if(m_networkHandler) delete m_networkHandler;//删除时候,此对象正在注册接受信息,怎么能如此删除?
m_networkHandler = NULL;
//delete这个客户
delete this;
} 你可以把所有的delete this先屏蔽,看看结果是不是不同。 谢谢管理员的回答,由于刚刚接触ACE没多久,看了它所介绍的那两本书(n1/n2),看了它所介绍的关于reactor与proactor的代码,参照它的一个示例所写的一个程序进行测试,没有考虑太多其它的东西,结果运行一段时间后就会出现上面的问题
这个错误比较很难重现,不过管理员说得很有道理,这里delete自身的话可能会有问题
dropSelf有两种情况下可能会被调用
1.网络错误,也就是对方关闭了这个socket的连接,这个时候应该不会出现你说的这个问题
2.超时时仍旧没有接收到足够多的数据,这时可能出现你说的这个问题
如果不这样删除的话,想实现超时时把这个socket关闭并删除这个对象应该如何实现呢?
或者有没有办法把相应的socket从proactor里面解注册?难道在析构函数中的closesocket和取消reader、writer不能将这个socket的事件从proactor中解注册掉吗?
还有就是管理员说代码问题很多,还有其它哪些问题呢?大家可否多加指点?先谢谢了
[ 本帖最后由 redf0x 于 2009-7-18 17:46 编辑 ] 这个涉及到对Proactor的理解。
看完下面的帖子:
http://www.acejoy.com/bbs/viewthread.php?tid=1126
http://www.acejoy.com/bbs/viewthread.php?tid=1178&extra=page%3D1
http://www.acejoy.com/bbs/viewthread.php?tid=690&extra=page%3D1 谢谢管理员的回答,昨晚发贴时,已经把这三个帖子看过了,没有仔细理解,刚才又仔细看了一下
我的理解是这样的:
在主动调用closesocket时,ClientNetworkHandler投递了几个操作在proactor里面,ace就一定还会回调回调几次该ClientNetworkHandler相应的handle_read_stream或者相应的handle_write_stream,调用完这些相应的函数之后,proactor就会把ClientNetworkHandler从里面解注册,不知是否这样?这个时候调用的handle_read_stream或者handle_write_stream中的result的应该是失败吧?也就是result的success()函数返回假?
那么在我的实例里面的话超时析构之后有可能handle_read_stream和handle_write_stream都还会被回调?虽然我的是单线程? 但是如果是这样的话,为什么出错是在ACE的这段代码里面呢?
ACE_INLINE void
ACE_Message_Block::wr_ptr (size_t n)
{
ACE_TRACE ("ACE_Message_Block::wr_ptr");
this->wr_ptr_ += n;
}
是因为即使我close了ClientNetworkHandler的这个socket之后proactor还在试图从ClientNetworkHandler的buffer里面来读取数据进行发送吗? 我刚才测试了一下,在open的时候用reader去read网络事件,超时时关闭这个socket,发现ClientNetworkHandler的handler_read_stream并没有被调用,这是为什么?
页:
[1]
2