#include <ace\OS_main.h>
#include <ace\Future.h>
#include <ace\Task.h>
#include <ace\Method_Request.h>
#include <ace\Activation_Queue.h>
#include <ace\Reactor_Notification_Strategy.h>
#include <ace\Reactor.h>
#include <ace\Message_Queue.h>
#include <iostream>
using std::cout;
using std::endl;
class readFileResult
{
public:
readFileResult(char* buff, int buffer_Len)
{
m_buff = buff;
m_buffer_Len = buffer_Len;
}
readFileResult()
{
}
readFileResult(const readFileResult & other)
{
m_buff = other.m_buff;
m_buffer_Len = other.m_buffer_Len;
}
char * GetBuffer()
{
return m_buff;
}
int GetBufferLen()
{
return m_buffer_Len;
}
void displayBuffer()
{
cout<< m_buff<<endl;
}
private:
char *m_buff;
int m_buffer_Len;
};
class ReadFile_Observer1:public ACE_Future_Observer<readFileResult>
{
public:
ReadFile_Observer1()
{
}
~ReadFile_Observer1()
{
}
virtual void update (const ACE_Future<readFileResult> &future)
{
readFileResult rs ;
future.get(rs);
cout<<"ReadFile_Observer1 :"<<rs.GetBuffer()<<endl;
}
};
class ReadFile_Observer2:public ACE_Future_Observer<readFileResult>
{
public:
ReadFile_Observer2()
{
}
~ReadFile_Observer2()
{
}
virtual void update (const ACE_Future<readFileResult> &future)
{
readFileResult rs ;
future.get(rs);
cout<<"ReadFile_Observer2 :"<<rs.GetBuffer()<<endl;
}
};
class Function1_Request : public ACE_Method_Request
{
public:
Function1_Request (readFileResult &rs,
ACE_Future<readFileResult> returnVal)
:rs_(rs)//, returnVal_(returnVal)
{
returnVal_= returnVal;
}
virtual int call (void)
{
::Sleep(500);
strcpy(rs_.GetBuffer(), "end function1");
this->returnVal_.set (rs_);
return 0;
}
private:
readFileResult &rs_;
ACE_Future<readFileResult> returnVal_;
};
class ActivityObject : public ACE_Task<ACE_NULL_SYNCH>
{
public:
ActivityObject()
: notifier_ (0, this, ACE_Event_Handler::WRITE_MASK)
{
this->reactor (ACE_Reactor::instance());
this->reactor ()->register_handler(this, ACE_Event_Handler::READ_MASK);
this->notifier_.reactor (this->reactor ());
this->m_activity_queue.queue()->notification_strategy (&this->notifier_);
this->activate();
}
~ActivityObject()
{
}
virtual int svc(void)
{
this->reactor ()->owner(ACE_Thread::self());
return this->reactor ()->run_event_loop();
}
ACE_Future<readFileResult> function1(readFileResult &rfile )
{
ACE_Future<readFileResult> test;
m_activity_queue.enqueue(new Function1_Request(rfile, test));
return test;
}
int handle_input(ACE_HANDLE fd = 0)
{
ACE_Method_Request * pMethodRequest = m_activity_queue.dequeue();
if (pMethodRequest->call() == -1)
{
return -1;
}
delete pMethodRequest;
return 0;
}
private:
ACE_Activation_Queue m_activity_queue;
ACE_Reactor_Notification_Strategy notifier_;
};
int ACE_TMAIN(int, ACE_TCHAR *[])
{
ActivityObject obj;
char buff[1024];
memset(buff,0,sizeof(buff)/sizeof(char));
readFileResult rfile(buff, 1024);
ACE_Future<readFileResult> read_result = obj.function1(rfile);
read_result.attach(new ReadFile_Observer1);
read_result.attach(new ReadFile_Observer2);
obj.wait();
return 0;
}
为什么int handle_input(ACE_HANDLE fd = 0)总是不能被调用?我在线程中已经改变了ACE_Reactor的拥有者了啊! |