|
- #ifndef _terminal_processor_h
- #define _terminal_processor_h
- #include "ace/OS_NS_string.h"
- #include "ace/OS_main.h"
- #include "ace/Service_Config.h"
- #include "ace/Proactor.h"
- #include "ace/Asynch_IO.h"
- #include "ace/Asynch_IO_Impl.h"
- #include "ace/Asynch_Acceptor.h"
- #include "ace/INET_Addr.h"
- #include "ace/SOCK_Connector.h"
- #include "ace/SOCK_Acceptor.h"
- #include "ace/SOCK_Stream.h"
- #include "ace/SOCK_SEQPACK_Association.h"
- #include "ace/Message_Block.h"
- #include "ace/Get_Opt.h"
- #include "ace/Log_Msg.h"
- #include "ace/OS_NS_sys_stat.h"
- #include "ace/OS_NS_sys_socket.h"
- #include "ace/OS_NS_unistd.h"
- #include "ace/OS_NS_fcntl.h"
- #include "ace/message_queue_t.h"
- #include "ace/task_t.h"
- #include "../include/log_imp.h"
- #include "../include/data_pool_imp.h"
- #include "./terminal_message.h"
- ACE_Recursive_Thread_Mutex map_lock;
- // 消息类型
- #define MB_NORMAL_PACKET 0x201 // 普通
- #define MB_CONNECT_PACKET 0x202 // 连接建立
- #define MB_CLOSE_PACKET 0x203 // 连接关闭
- typedef std::map<ACE_HANDLE, void*> client_connection;
- typedef ACE_Singleton<client_connection, ACE_SYNCH_MUTEX> _conn;
- class PacketHeader{
- public:
- void* handler;
- CommHead data_;
- };
- class NetworkThread;
- class LogicThread;
- NetworkThread *io_thread;
- LogicThread *logic_thread;
- class arg_{
- public:
- arg_(){
- }
- void* p_this;
- ACE_Message_Block* mb;
- size_t len;
- typedef int (*_fun)(arg_);
- _fun fun;
- void set_fun(int (*fun)(arg_)){
- this->fun = fun;
- }
- int write_(){
- return fun(*this);
- }
- };
- typedef std::map<void*, arg_> map_conns;
- typedef ACE_Singleton<map_conns, ACE_Null_Mutex> conns;
- ACE_Recursive_Thread_Mutex lock_conns;
- class LogicThread : public ACE_Task<ACE_MT_SYNCH>
- {
- public:
- LogicThread()
- {
- }
- ~LogicThread()
- {
- }
- int open(){
- return this->activate(THR_NEW_LWP, (int)para::instance()->thread_count);
- }
- int close(){
- ACE_Message_Block * hangup;
- for(int i=0; i<(int)para::instance()->thread_count; i++){
- ACE_NEW_RETURN (hangup, ACE_Message_Block(0, ACE_Message_Block::MB_HANGUP), -1);
- this->putq (hangup);
- }
- this->wait ();
- return 0;
- }
- int svc()
- {
- ACE_Message_Block * message;
- for (message = 0; ; )
- {
- if (this->getq (message) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,ACE_TEXT("%p\n"), ACE_TEXT("getq")), -1);
- if (message->msg_type () == ACE_Message_Block::MB_HANGUP){
- //ACE_DEBUG ((LM_DEBUG, ACE_TEXT("LogicThread::svc hangup\n")));
- write_log("Process thread with parsing input exit.");
- message->release ();
- break;
- }
- // ...
- switch (message->msg_type())
- {
- case MB_NORMAL_PACKET:
- {
- PacketHeader * hdr = reinterpret_cast<PacketHeader *> (message->rd_ptr ());
- ACE_Message_Block * data_mb = message->cont();
- //...校验处理,省略
- // 回包
- {
- ACE_Guard<ACE_Recursive_Thread_Mutex> _lock(lock_conns);
- map_conns::iterator it = conns::instance()->find(hdr->handler);
- if(it != conns::instance()->end()){
- arg_ arg = it->second;
- if((size_t)process_cont.CommSend.commHead.wLength <= sizeof(CommMessage)){
- ACE_Message_Block* mb;
- ACE_NEW_NORETURN(mb, ACE_Message_Block((size_t)process_cont.CommSend.commHead.wLength));
- mb->copy((char*)&process_cont.CommSend, mb->space());
- arg.p_this = hdr->handler;
- arg.mb = mb;
- arg.len = (size_t)process_cont.CommSend.commHead.wLength;
- arg.write_();
- }
- }
- }
- }
- break;
- default:
- ACE_ERROR ((LM_ERROR, ACE_TEXT("LogicThread::svc unkown msg_type %d"), message->msg_type()));
- break;
- }
- message->release();
- }
- return 0;
- }
- };
- class HA_Proactive_Service : public ACE_Service_Handler
- {
- public:
- HA_Proactive_Service() {
- }
- ~HA_Proactive_Service (){
- if (this->handle () != ACE_INVALID_HANDLE)
- ACE_OS::closesocket (this->handle ());
- }
- void open (ACE_HANDLE h, ACE_Message_Block&){
- this->handle (h);
- if (this->reader_.open (*this) != 0 || this->writer_.open (*this) != 0 ){
- delete this;
- return;
- }
- //write_log("Current died conn amounts is %lu.", conns::instance()->size());
- arg_ arg;
- arg.set_fun(init_write_stream);
- {
- ACE_Guard<ACE_Recursive_Thread_Mutex> _lock(lock_conns);
- conns::instance()->insert(map_conns::value_type((void*)this, arg));
- }
- if(this->init_read_stream() != 0){
- return;
- }
- }
- void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result){
- ACE_Message_Block &mb = result.message_block ();
- if (!result.success () || result.bytes_transferred () == 0){
- ACE_Guard<ACE_Recursive_Thread_Mutex> _lock(lock_conns);
- if(conns::instance()->erase((void*)this) == 1){
- recv_data_->release ();
- delete this;
- }
- return;
- }
- if(recv_data_->space() != 0){
- // 数据包长度信息还未接收完
- if(this->reader_.read (*recv_data_, recv_data_->space ())){
- ACE_Guard<ACE_Recursive_Thread_Mutex> _lock(lock_conns);
- if(conns::instance()->erase((void*)this) == 1){
- recv_data_->release ();
- delete this;
- }
- return;
- }
- return;
- }
- if(para::instance()->b_debug){
- ACE_INET_Addr addr;
- ACE_SOCK_SEQPACK_Association ass(this->handle());
- size_t addr_size = 1;
- ass.get_local_addrs(&addr, addr_size);
- ACE_TCHAR tmp[128];
- ACE_OS::sprintf(tmp, "Recv from %s:%u", addr.get_host_addr(), addr.get_port_number());
- ACE_HEX_DUMP((LM_DEBUG, result.message_block().rd_ptr(), result.bytes_transferred(), tmp));
- }
- PacketHeader * hdr = reinterpret_cast<PacketHeader *> (recv_data_->rd_ptr());
- if(hdr->data_.wMessageType != MessageType ||
- (size_t)hdr->data_.wLength >= sizeof(CommMessage)){
- recv_data_->release ();
- this->init_read_stream();
- return;
- }
- ACE_Message_Block * data_mb = recv_data_->cont();
- if (!data_mb){
- // 刚刚接收完长度信息
- ACE_NEW (data_mb, ACE_Message_Block((size_t)hdr->data_.wLength-sizeof(hdr->data_)));
- recv_data_->cont (data_mb);
- }
- if(data_mb->space() == 0){
- //write_log("read %s.", recv_data_->rd_ptr());
- logic_thread->putq (recv_data_);
- this->init_read_stream();
- return;
- }
- // 否则继续接收该数据包
- if(this->reader_.read (*data_mb, data_mb->space ()) == -1){
- ACE_Guard<ACE_Recursive_Thread_Mutex> _lock(lock_conns);
- if(conns::instance()->erase((void*)this) == 1){
- recv_data_->release ();
- delete this;
- }
- return;
- }
- }
- void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result){
- if(para::instance()->b_debug){
- ACE_INET_Addr addr;
- ACE_SOCK_SEQPACK_Association ass(this->handle());
- size_t addr_size = 1;
- ass.get_local_addrs(&addr, addr_size);
- ACE_TCHAR tmp[128];
- ACE_OS::sprintf(tmp, "Recv from %s:%u", addr.get_host_addr(), addr.get_port_number());
- ACE_HEX_DUMP((LM_DEBUG, result.message_block().base(), result.bytes_transferred(), tmp));
- }
- result.message_block ().release ();
- }
- private:
- int init_read_stream(){
- ACE_NEW_NORETURN (recv_data_, ACE_Message_Block (sizeof(PacketHeader), MB_NORMAL_PACKET));
- //ACE_HANDLE handle = this->handle ();
- //void* p_this = (void*)this;
- recv_data_->copy ((const char *)(this), sizeof(this));
- if(this->reader_.read (*recv_data_, recv_data_->space ()) == -1){
- recv_data_->release();
- ACE_ERROR_RETURN((LM_ERROR, "%p\n", "ACE_Asynch_Read_Stream::read"), -1);
- {
- ACE_Guard<ACE_Recursive_Thread_Mutex> _lock(lock_conns);
- if(conns::instance()->erase((void*)this) == 1)
- delete this;
- }
- return -1;
- }
- return 0;
- }
- int init_write_stream (ACE_Message_Block & mb, size_t nBytes){
- if (this->writer_.write (mb , nBytes ) == -1){
- mb.release ();
- ACE_ERROR_RETURN((LM_ERROR, "%p\n", "ACE_Asynch_Write_File::write"), -1);
- {
- ACE_Guard<ACE_Recursive_Thread_Mutex> _lock(lock_conns);
- if(conns::instance()->erase((void*)this) == 1)
- delete this;
- }
- return -1;
- }
- return 0;
- }
- private:
- ACE_Asynch_Read_Stream reader_;
- ACE_Asynch_Write_Stream writer_;
- ACE_Message_Block* recv_data_;
- };
- class NetworkThread : public ACE_Task_Base
- {
- public:
- NetworkThread(){
- }
- ~NetworkThread(){
- }
- int open(){
- return this->activate(THR_NEW_LWP, 4);
- }
- int close(){
- ACE_Proactor::instance()->proactor_end_event_loop();
- this->wait();
- return 0;
- }
- int svc(){
- ACE_Proactor::instance ()->proactor_run_event_loop ();
- write_log("Process thread with recving input exit.");
- return 0;
- }
- };
- #endif
复制代码 代码如上,缺省了几个具体业务流程模块,基本还全。
思路是仿造论坛上面的一个例子,唯一不同的是增加了关于连接句柄的保存,目的是在处理数据之后回包时判断连接是否有效,如果有别的方式给个意见
目前的问题是:客户端用tp_reactor测试连接,发现链接数在2000以下CPU占用率10%-,4000+以后客户端连接失败,原因是拒绝连接,同时服务端已有连接出现个别掉线现象,反复几次都是如此。:( |
|