proactor连接数4000+出现拒绝连接的问题
#ifndef _terminal_processor_h
#define _terminal_processor_h
#include "ace/OS_NS_string.h"
#include "ace/OS_main.h"
#include "ace/Service_Config.h"
#include "ace/Proactor.h"
#include "ace/Asynch_IO.h"
#include "ace/Asynch_IO_Impl.h"
#include "ace/Asynch_Acceptor.h"
#include "ace/INET_Addr.h"
#include "ace/SOCK_Connector.h"
#include "ace/SOCK_Acceptor.h"
#include "ace/SOCK_Stream.h"
#include "ace/SOCK_SEQPACK_Association.h"
#include "ace/Message_Block.h"
#include "ace/Get_Opt.h"
#include "ace/Log_Msg.h"
#include "ace/OS_NS_sys_stat.h"
#include "ace/OS_NS_sys_socket.h"
#include "ace/OS_NS_unistd.h"
#include "ace/OS_NS_fcntl.h"
#include "ace/message_queue_t.h"
#include "ace/task_t.h"
#include "../include/log_imp.h"
#include "../include/data_pool_imp.h"
#include "./terminal_message.h"
ACE_Recursive_Thread_Mutex map_lock;
// 消息类型
#define MB_NORMAL_PACKET 0x201 // 普通
#define MB_CONNECT_PACKET 0x202 // 连接建立
#define MB_CLOSE_PACKET 0x203 // 连接关闭
typedef std::map<ACE_HANDLE, void*> client_connection;
typedef ACE_Singleton<client_connection, ACE_SYNCH_MUTEX> _conn;
class PacketHeader{
public:
void* handler;
CommHead data_;
};
class NetworkThread;
class LogicThread;
NetworkThread *io_thread;
LogicThread *logic_thread;
class arg_{
public:
arg_(){
}
void* p_this;
ACE_Message_Block* mb;
size_t len;
typedef int (*_fun)(arg_);
_fun fun;
void set_fun(int (*fun)(arg_)){
this->fun = fun;
}
int write_(){
return fun(*this);
}
};
typedef std::map<void*, arg_> map_conns;
typedef ACE_Singleton<map_conns, ACE_Null_Mutex> conns;
ACE_Recursive_Thread_Mutex lock_conns;
class LogicThread : public ACE_Task<ACE_MT_SYNCH>
{
public:
LogicThread()
{
}
~LogicThread()
{
}
int open(){
return this->activate(THR_NEW_LWP, (int)para::instance()->thread_count);
}
int close(){
ACE_Message_Block * hangup;
for(int i=0; i<(int)para::instance()->thread_count; i++){
ACE_NEW_RETURN (hangup, ACE_Message_Block(0, ACE_Message_Block::MB_HANGUP), -1);
this->putq (hangup);
}
this->wait ();
return 0;
}
int svc()
{
ACE_Message_Block * message;
for (message = 0; ; )
{
if (this->getq (message) == -1)
ACE_ERROR_RETURN ((LM_ERROR,ACE_TEXT("%p\n"), ACE_TEXT("getq")), -1);
if (message->msg_type () == ACE_Message_Block::MB_HANGUP){
//ACE_DEBUG ((LM_DEBUG, ACE_TEXT("LogicThread::svc hangup\n")));
write_log("Process thread with parsing input exit.");
message->release ();
break;
}
// ...
switch (message->msg_type())
{
case MB_NORMAL_PACKET:
{
PacketHeader * hdr = reinterpret_cast<PacketHeader *> (message->rd_ptr ());
ACE_Message_Block * data_mb = message->cont();
//...校验处理,省略
// 回包
{
ACE_Guard<ACE_Recursive_Thread_Mutex> _lock(lock_conns);
map_conns::iterator it = conns::instance()->find(hdr->handler);
if(it != conns::instance()->end()){
arg_ arg = it->second;
if((size_t)process_cont.CommSend.commHead.wLength <= sizeof(CommMessage)){
ACE_Message_Block* mb;
ACE_NEW_NORETURN(mb, ACE_Message_Block((size_t)process_cont.CommSend.commHead.wLength));
mb->copy((char*)&process_cont.CommSend, mb->space());
arg.p_this = hdr->handler;
arg.mb = mb;
arg.len = (size_t)process_cont.CommSend.commHead.wLength;
arg.write_();
}
}
}
}
break;
default:
ACE_ERROR ((LM_ERROR, ACE_TEXT("LogicThread::svc unkown msg_type %d"), message->msg_type()));
break;
}
message->release();
}
return 0;
}
};
class HA_Proactive_Service : public ACE_Service_Handler
{
public:
HA_Proactive_Service() {
}
~HA_Proactive_Service (){
if (this->handle () != ACE_INVALID_HANDLE)
ACE_OS::closesocket (this->handle ());
}
void open (ACE_HANDLE h, ACE_Message_Block&){
this->handle (h);
if (this->reader_.open (*this) != 0 || this->writer_.open (*this) != 0 ){
delete this;
return;
}
//write_log("Current died conn amounts is %lu.", conns::instance()->size());
arg_ arg;
arg.set_fun(init_write_stream);
{
ACE_Guard<ACE_Recursive_Thread_Mutex> _lock(lock_conns);
conns::instance()->insert(map_conns::value_type((void*)this, arg));
}
if(this->init_read_stream() != 0){
return;
}
}
void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result){
ACE_Message_Block &mb = result.message_block ();
if (!result.success () || result.bytes_transferred () == 0){
ACE_Guard<ACE_Recursive_Thread_Mutex> _lock(lock_conns);
if(conns::instance()->erase((void*)this) == 1){
recv_data_->release ();
delete this;
}
return;
}
if(recv_data_->space() != 0){
// 数据包长度信息还未接收完
if(this->reader_.read (*recv_data_, recv_data_->space ())){
ACE_Guard<ACE_Recursive_Thread_Mutex> _lock(lock_conns);
if(conns::instance()->erase((void*)this) == 1){
recv_data_->release ();
delete this;
}
return;
}
return;
}
if(para::instance()->b_debug){
ACE_INET_Addr addr;
ACE_SOCK_SEQPACK_Association ass(this->handle());
size_t addr_size = 1;
ass.get_local_addrs(&addr, addr_size);
ACE_TCHAR tmp;
ACE_OS::sprintf(tmp, "Recv from %s:%u", addr.get_host_addr(), addr.get_port_number());
ACE_HEX_DUMP((LM_DEBUG, result.message_block().rd_ptr(), result.bytes_transferred(), tmp));
}
PacketHeader * hdr = reinterpret_cast<PacketHeader *> (recv_data_->rd_ptr());
if(hdr->data_.wMessageType != MessageType ||
(size_t)hdr->data_.wLength >= sizeof(CommMessage)){
recv_data_->release ();
this->init_read_stream();
return;
}
ACE_Message_Block * data_mb = recv_data_->cont();
if (!data_mb){
// 刚刚接收完长度信息
ACE_NEW (data_mb, ACE_Message_Block((size_t)hdr->data_.wLength-sizeof(hdr->data_)));
recv_data_->cont (data_mb);
}
if(data_mb->space() == 0){
//write_log("read %s.", recv_data_->rd_ptr());
logic_thread->putq (recv_data_);
this->init_read_stream();
return;
}
// 否则继续接收该数据包
if(this->reader_.read (*data_mb, data_mb->space ()) == -1){
ACE_Guard<ACE_Recursive_Thread_Mutex> _lock(lock_conns);
if(conns::instance()->erase((void*)this) == 1){
recv_data_->release ();
delete this;
}
return;
}
}
void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result){
if(para::instance()->b_debug){
ACE_INET_Addr addr;
ACE_SOCK_SEQPACK_Association ass(this->handle());
size_t addr_size = 1;
ass.get_local_addrs(&addr, addr_size);
ACE_TCHAR tmp;
ACE_OS::sprintf(tmp, "Recv from %s:%u", addr.get_host_addr(), addr.get_port_number());
ACE_HEX_DUMP((LM_DEBUG, result.message_block().base(), result.bytes_transferred(), tmp));
}
result.message_block ().release ();
}
private:
int init_read_stream(){
ACE_NEW_NORETURN (recv_data_, ACE_Message_Block (sizeof(PacketHeader), MB_NORMAL_PACKET));
//ACE_HANDLE handle = this->handle ();
//void* p_this = (void*)this;
recv_data_->copy ((const char *)(this), sizeof(this));
if(this->reader_.read (*recv_data_, recv_data_->space ()) == -1){
recv_data_->release();
ACE_ERROR_RETURN((LM_ERROR, "%p\n", "ACE_Asynch_Read_Stream::read"), -1);
{
ACE_Guard<ACE_Recursive_Thread_Mutex> _lock(lock_conns);
if(conns::instance()->erase((void*)this) == 1)
delete this;
}
return -1;
}
return 0;
}
int init_write_stream (ACE_Message_Block & mb, size_t nBytes){
if (this->writer_.write (mb , nBytes ) == -1){
mb.release ();
ACE_ERROR_RETURN((LM_ERROR, "%p\n", "ACE_Asynch_Write_File::write"), -1);
{
ACE_Guard<ACE_Recursive_Thread_Mutex> _lock(lock_conns);
if(conns::instance()->erase((void*)this) == 1)
delete this;
}
return -1;
}
return 0;
}
private:
ACE_Asynch_Read_Stream reader_;
ACE_Asynch_Write_Stream writer_;
ACE_Message_Block* recv_data_;
};
class NetworkThread : public ACE_Task_Base
{
public:
NetworkThread(){
}
~NetworkThread(){
}
int open(){
return this->activate(THR_NEW_LWP, 4);
}
int close(){
ACE_Proactor::instance()->proactor_end_event_loop();
this->wait();
return 0;
}
int svc(){
ACE_Proactor::instance ()->proactor_run_event_loop ();
write_log("Process thread with recving input exit.");
return 0;
}
};
#endif
代码如上,缺省了几个具体业务流程模块,基本还全。
思路是仿造论坛上面的一个例子,唯一不同的是增加了关于连接句柄的保存,目的是在处理数据之后回包时判断连接是否有效,如果有别的方式给个意见
目前的问题是:客户端用tp_reactor测试连接,发现链接数在2000以下CPU占用率10%-,4000+以后客户端连接失败,原因是拒绝连接,同时服务端已有连接出现个别掉线现象,反复几次都是如此。:(
int
ACE_TMAIN (int argc, ACE_TCHAR *argv[])
{
//ACE_LOG_MSG->open(".\\tp.log", ACE_Log_Msg::LOGGER, ACE_DEFAULT_LOGGER_KEY);
if (parse_args (argc, argv) == -1){
char x;
std::cin >> x;
return -1;
}
if(1){
io_thread = new NetworkThread ();
logic_thread = new LogicThread ();
ACE_INET_Addr listen_addr(para::instance()->port, para::instance()->ip.c_str());
ACE_Asynch_Acceptor<HA_Proactive_Service> aio_acceptor;
if (0 != aio_acceptor.open (listen_addr, 0, 0, 5, 1, 0, 0))
ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("NetworkThread acceptor open")), 1);
logic_thread->open ();
io_thread->open ();
int x;
std::cin >> x;
io_thread->close ();
logic_thread->close ();
delete io_thread;
delete logic_thread;
}
else{
}
return 0;
}
这个是主函数,估计是没有多少帮助 aio_acceptor.open (listen_addr, 0, 0, 5, 1, 0, 0) 中的5 改成200或者2000,再试试。
还得看你机器的内存、OS情况。 改成aio_acceptor.open (listen_addr, 0, 0, SOMAXCONN, 1, 0, 0, 1, 5)效果好多了,连接数到达1.5w了...
看了代码,是open时候的一个bug,如果换个写法或许就没事了:(
客户端用的tp_reactor模型
谢谢大家 你好,我们现在做一个项目,服务器肯定要用proactor框架,我想请问客户端用什么最好,我们的流程是,客户端发送请求数据的信息给服务器端,服务器端分析后,发给客户端他所需要的数据。。。可能数据会很大,比如1个G的量。。。。 不知道楼主是怎么把这句话编通的 arg.set_fun(init_write_stream);
我的编译环境(VS2008)一直提示“HA_Proactive_Service::init_write_stream”: 函数调用缺少参数列表;请使用“&HA_Proactive_Service::init_write_stream”创建指向成员的指针。。
页:
[1]