|
楼主 |
发表于 2009-11-16 09:06:54
|
显示全部楼层
在发静态配置类(ClientService类)
int ClientService::open(void *p)
{
if(super::open(p) == -1)
return -1;
ACE_TCHAR peer_name[MAXHOSTNAMELEN];
ACE_INET_Addr peer_addr;
if(this->peer().get_remote_addr(peer_addr) == 0 &&
peer_addr.addr_to_string(peer_name,MAXHOSTNAMELEN))
ACE_DEBUG((LM_DEBUG,ACE_TEXT("(%P|%t) Connection from %s\n"),peer_name));
return 0;
}
int ClientService::handle_input(ACE_HANDLE fd)
{
const size_t INPUT_SIZE = 4096;
char buffer[INPUT_SIZE];
ssize_t recv_cnt,send_cnt;
recv_cnt = this->peer().recv(buffer,sizeof(buffer));
if(recv_cnt <=0)
{
ACE_DEBUG((LM_DEBUG,ACE_TEXT("(%P|%t) Connection closed \n")));
return -1;
}
send_cnt = this->peer().send(buffer,ACE_static_cast(size_t,recv_cnt));
if(send_cnt == recv_cnt)
return 0;
if(send_cnt == -1 && ACE_OS::last_error() != EWOULDBLOCK)
ACE_ERROR_RETURN((LM_ERROR,ACE_TEXT("(%P|%t) %p\n"),ACE_TEXT("Send")),0);
if(send_cnt == -1)
send_cnt = 0;
ACE_Message_Block *mb;
size_t remaining = ACE_static_cast(size_t,(recv_cnt-send_cnt));
ACE_NEW_RETURN(mb,ACE_Message_Block(&buffer[send_cnt],remaining),-1);
int output_off = this->msg_queue()->is_empty();
ACE_Time_Value nowait(ACE_OS::gettimeofday());
if(this->putq(mb,&nowait) == -1)
{
ACE_ERROR((LM_ERROR,ACE_TEXT("(%P|%t) %p; discarding data\n"),ACE_TEXT("enqueue failed")));
mb->release();
return 0;
}
if(output_off)
return this->reactor()->register_handler(this,ACE_Event_Handler::WRITE_MASK);
return 0;
}
int ClientService::handle_output(ACE_HANDLE fd)
{
ACE_Message_Block *mb;
ACE_Time_Value nowait(ACE_OS::gettimeofday());
while(this->getq(mb,&nowait) != -1)
{
ssize_t send_cnt = this->peer().send(mb->rd_ptr(),mb->length());
if(send_cnt == -1)
ACE_ERROR((LM_ERROR,ACE_TEXT("(%P|%t) %p\n"),ACE_TEXT("Send")));
else
mb->rd_ptr(ACE_static_cast(size_t,send_cnt));
if(mb->length()>0)
{
this->ungetq(mb);
break;
}
mb->release();
}
return (this->msg_queue()->is_empty()) ? -1:0;
}
int ClientService::handle_close(ACE_HANDLE fd,ACE_Reactor_Mask close_mask)
{
if(close_mask == ACE_Event_Handler::WRITE_MASK)
return 0;
return super::handle_close(fd,close_mask);
}
int HA_Status::init(int argc, ACE_TCHAR *argv[])
{
char config_file[MAXPATHLEN];
memset(config_file,'\0',MAXPATHLEN);
ACE_OS::strcpy(config_file,ACE_TEXT("HAStatus.conf"));
ACE_Configuration_Heap config;
if (config.open () == -1)
ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("config")), -1);
ACE_Registry_ImpExp config_importer (config);
if (config_importer.import_config (config_file) == -1)
ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT ("%p\n"), config_file), -1);
ACE_Configuration_Section_Key status_section;
//指定打开HAStatus 节进行读取
if (config.open_section (config.root_section (),
ACE_TEXT ("HAStatus"),
0,
status_section) == -1)
ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
ACE_TEXT ("Can't open HAStatus section")),
-1);
//分别用来保存读取的字符串类型的地址和整形的端口号
u_int status_port;
//读取整形的端口属性
if (config.get_integer_value (status_section,
ACE_TEXT ("ListenPort"),
status_port) == -1)
ACE_ERROR_RETURN((LM_ERROR,
ACE_TEXT ("HAStatus ListenPort does not exist\n")),
-1);
this->listen_addr_.set (ACE_static_cast(u_short,status_port));
if(this->ClientAcceptor.open(this->listen_addr_) != 0)
ACE_ERROR_RETURN((LM_ERROR,ACE_TEXT ("Acceptor error.\n")),-1);
return 0;
}
int HA_Status::fini(void)
{
this->ClientAcceptor.close();
return 0;
}
ACE_FACTORY_DEFINE(ACE_Local_Service,HA_Status)
ACE_STATIC_SVC_DEFINE(HA_Status_Descriptor,ACE_TEXT("HA_Status_Static_Service"),ACE_SVC_OBJ_T,&ACE_SVC_NAME(HA_Status),ACE_Service_Type::DELETE_OBJ|ACE_Service_Type::DELETE_THIS,0)
ACE_STATIC_SVC_REQUIRE(HA_Status_Descriptor) |
|