|
楼主 |
发表于 2010-3-30 11:10:24
|
显示全部楼层
我系统是vista
发现我的ACE中ACE_Reactor与AC_WFMO_Reactor不一样
class ACE_Export ACE_Reactor : public ACE_Reactor_Timer_Interface{...};
class ACE_Export ACE_WFMO_Reactor : public ACE_Reactor_Impl{...};
但这两个都不能监控ACE_SPIPE_Stream的句柄
能否帮忙试一下
我给个源码
#include <ace/log_msg.h>
#include <ace/os.h>
#include <ace/process.h>
#include <ace/SPIPE_Addr.h>
#include <ace/SPIPE_Acceptor.h>
#include <ace/SPIPE_Connector.h>
#include <ace/Task.h>
#include <ace/synch.h>
#include <ace/handle_set.h>
#include <ace/event_handler.h>
#include <ace/wfmo_reactor.h>
#include <iostream>
#include <string>
#include <sstream>
#include <iomanip>
using namespace std;
#if defined (ACE_WIN32)
#define MAKE_PIPE_NAME(X) ACE_TEXT ("\\\\.\\pipe\\") ACE_TEXT (X)
#endif
#ifdef _DEBUG
#pragma comment(lib, "aced.lib")
#else
#pragma comment(lib, "ace.lib")
#endif
class Pipe_Event_Read : public ACE_Event_Handler
{
public:
Pipe_Event_Read(ACE_HANDLE handle)
{
pipe_stream_.set_handle(handle);
}
ACE_HANDLE get_handle(void) const
{
return pipe_stream_.get_handle();
}
int handle_input(ACE_HANDLE handle)
{
char buf[10] = {0};
pipe_stream_.recv_n(buf, 9);
int len = ACE_OS::atoi(buf);
char *bufdata = new char[len+1];
bufdata[len] = 0;
pipe_stream_.recv_n(bufdata, len);
cout << "len from child: " << buf << endl;
cout << "data from child: " << bufdata << endl;
return 0;
}
private:
ACE_SPIPE_Stream pipe_stream_;
};
class Pipe_Reactor : public ACE_Task<ACE_MT_SYNCH>
{
public:
int open(void* args)
{
wfmo_reactor_ = new ACE_WFMO_Reactor;
ACE_HANDLE handle = (ACE_HANDLE)args;
//master_handles_.set_bit(handle);
Pipe_Event_Read *pipe_event = new Pipe_Event_Read(handle);
wfmo_reactor_->register_handler(pipe_event, ACE_Event_Handler::READ_MASK);
activate();
return 0;
}
int svc()
{
cout << "thread start\n";
for(;;)
{
// active_handles_ = master_handles_;
// int width = (int)active_handles_.max_set()+1;
// if(select(width, active_handles_.fdset(), 0, 0, 0) == -1)
// cout << "select fail\n";
// ACE_Handle_Set_Iterator it(active_handles_);
// for(ACE_HANDLE handle; (handle = it()) != ACE_INVALID_HANDLE;)
// {
//// ACE_OS::sleep(5);
// ACE_SPIPE_Stream pipe_stream;
// pipe_stream.set_handle(handle);
// char buf[10] = {0};
// pipe_stream.recv(buf, 9);
// int len = ACE_OS::atoi(buf);
// char *bufdata = new char[len+1];
// bufdata[len] = 0;
// pipe_stream.recv(bufdata, len);
// cout << "len from child: " << buf << endl;
// cout << "data from child: " << bufdata << endl;
// delete [] bufdata;
// }
//reactor
wfmo_reactor_->handle_events();
cout << "one circle\n";
}
return 0;
}
int close(u_long)
{
return 0;
}
private:
ACE_Handle_Set master_handles_;
ACE_Handle_Set active_handles_;
ACE_WFMO_Reactor *wfmo_reactor_;
};
int main(int argc, char* argv[])
{
if(argc == 1)
{
//主线程
ACE_Process_Options options;
const char* pipe_name = MAKE_PIPE_NAME("acepipe");
options.command_line("%s %s", argv[0], pipe_name);
ACE_Process child;
child.spawn(options);
ACE_OS::sleep(1);
ACE_SPIPE_Stream client_stream;
ACE_SPIPE_Connector connector;
if(connector.connect(client_stream, ACE_SPIPE_Addr(pipe_name)) == -1)
{
ACE_DEBUG((LM_DEBUG, "Can't conn\n"));
child.wait();
return 0;
}
Pipe_Reactor *pipe_reactor = new Pipe_Reactor;
pipe_reactor->open(client_stream.get_handle());
cout << "now write\n";
for(;;)
{
string strData;
cin >> strData;
stringstream ss;
ss << setw(9) << setfill('0') << strData.length();
// cout << strData;
client_stream.send(ss.str().c_str(), ss.str().length());
client_stream.send(strData.c_str(), strData.length());
// cout << "send over\n";
}
delete pipe_reactor;
}
else
{
//子进程,提供接收服务
const char* pipe_name = argv[1];
ACE_SPIPE_Acceptor acceptor;
ACE_SPIPE_Stream new_stream;
if( acceptor.open(ACE_SPIPE_Addr(pipe_name)) == -1 )
ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("%p\n"), ACE_TEXT("open")), 1);
if(acceptor.accept(new_stream, 0) == -1)
ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("%p\n"), ACE_TEXT("acceptor")), 1);
cout << "a conn\n";
char buf[100] = {0};
int n = 0;
while( (n = new_stream.recv_n(buf, 9)) > 0 )
{
// ACE_OS::sleep(8);
//收数据
int len = ACE_OS::atoi(buf);
ACE_DEBUG((LM_DEBUG, "recv:%d %s\n", len, buf));
char *content = new char[len+1];
ACE_OS::memset(content, 0, len+1);
new_stream.recv(content, len);
ACE_DEBUG((LM_DEBUG, "content:%s\n", content));
ACE_OS::sleep(1);
//往主进程里写
new_stream.send(buf, 9);
new_stream.send(content, len);
cout << "child send over\n";
delete [] content;
ACE_OS::memset(buf, 0, 100);
}
cout << "child end, " << n << buf << endl;
}
system("pause");
return 0;
} |
|