我大致看了下Dev_Poll_Reactor的实现源码,内部有个锁的,但它到底在多线程下是不是安全的?
我写了一个简单的Demo,已经确保了所有的代码全部Reactor内部运行,但还是会出现程序崩溃的现象。
代码非常简单,在多个线程中执行
while(1)
ACE_Reactor::instance()->handle_events();
运行情况是这个的,如果只有一个线程在做上面的循环,则一切正常,跑了很久都没事,但如果在多个线程中做上面的循环,则很快就崩溃了。
事件处理:只是把收到的包原样送回去,参考的都是书上的Demo
int CSessionHandler::handle_input(ACE_HANDLE h)
{
ssize_t bytes_transferred = peer().recv(buf_, ONE_PACKET_SIZE);
if (bytes_transferred <= 0)
{
return -1;
}
else
{
ACE_Message_Block* mb = NULL;
ACE_NEW_NORETURN(mb, ACE_Message_Block(bytes_transferred));
mb->copy(buf_, bytes_transferred);
ACE_Time_Value nowait(ACE_OS::gettimeofday());
if (putq(mb, &nowait) == -1)
{
mb->release();
return -1;
}
if (this->msg_queue()->message_count() == 1)
{
this->reactor()->register_handler(this, ACE_Event_Handler::WRITE_MASK);
}
}
return 0;
}
int CSessionHandler::handle_output(ACE_HANDLE fd /* = ACE_INVALID_HANDLE */)
{
ACE_Message_Block* mb = NULL;
ACE_Time_Value nowait(ACE_OS::gettimeofday());
while (-1 != this->getq(mb, &nowait))
{
ssize_t send_cnt = peer().send(mb->rd_ptr(), mb->length());
if (-1 == send_cnt)
{
ACE_OS::printf("Send data error!\n");
}
else
{
mb->rd_ptr(send_cnt);
}
if (mb->length() > 0)
{
this->ungetq(mb);
break;
}
mb->release();
}
return msg_queue()->is_empty()? -1 : 0;
}
int CSessionHandler::handle_close(ACE_HANDLE h, ACE_Reactor_Mask mask)
{
if (mask == ACE_Event_Handler::WRITE_MASK)
{
return -1;
}
return supper::handle_close(h, mask);
}
到底是怎么回事,该怎么处理,请高手指点。 |