自动驱动工作模型的两种实现
自动驱动工作模型: 让程序自动的进行任务执行, 直到全部任务完成.应用场景是, 当你的任务分为很多子任务, 子任务又包括很多的子模块, 一个子任务中的各个子模块的的执行是可以并行的, 而子任务是串行执行的,
一个子任务必须必须等待它前一个子任务结束后才能执行.
实现版本一(这个版本是版主设计):
#include "stdafx.h"
#include "ace/SString.h"
#include "ace/Malloc.h"
#include "ace/Malloc_T.h"
#include "ace/Task_T.h"
#include "ace/Local_Memory_Pool.h"
#include "ace/Time_Value.h"
#include "ace/OS_main.h"
#include "ace/OS_NS_sys_stat.h"
#include "ace/OS_NS_sys_socket.h"
#include "ace/OS_NS_unistd.h"
#include "ace/Thread.h"
#include <vector>
using namespace std;
enum ENUM_STATE
{
WORKER_CREATE = 0,
WORKER_BEGIN,
WORKER_ERROR,
WORKER_END
};
//工作线程对象信息
struct _ThreadInfo
{
int m_nThreadID; //序列号
int m_nGrpID; //线程的实际ID
ENUM_STATE m_emState; //工作线程状态
bool (*fnCallBack)(int nThreadID, ENUM_STATE m_emState);
_ThreadInfo()
{
m_nThreadID = 0;
m_nGrpID = 0;
SetState(WORKER_CREATE);
}
void Init(bool (*fn)(int nThreadID, ENUM_STATE m_emState))
{
fnCallBack = fn;
}
void SetState(ENUM_STATE emState = WORKER_CREATE)
{
m_emState = emState;
if(m_emState == WORKER_END || emState == WORKER_ERROR)
{
fnCallBack(m_nThreadID, emState);
}
}
};
typedef vector<_ThreadInfo*> vecThreadInfo;
void KillThread(int nThreadID)
{
#ifdef WIN32
ACE_hthread_t hthread = 0;
if (ACE_Thread_Manager::instance()->hthread_grp_list(nThreadID, &hthread, 1) == 1)
{
::TerminateThread ((HANDLE)hthread, -1);
}
#else
ACE_Thread_Manager::instance()->kill_grp(nThreadID, SIGUSR1);
#endif
}
bool ThreadCallBack(int nThreadID, ENUM_STATE m_emState);
//全局变量部分
//*********************************************
int g_Rule_id = 0; //裁决者线程ID
vecThreadInfo objvecThreadInfo; //所有工作线程信息
int g_Index = 0; //当前运行次数
#define MAX_RUN_COUNT 10 //运行10次
//*********************************************
//工作线程
ACE_THR_FUNC worker(void* arg)
{
_ThreadInfo* pThreadInfo = (_ThreadInfo* )arg;
pThreadInfo->SetState(WORKER_BEGIN);
ACE_Time_Value tvSleep(pThreadInfo->m_nThreadID + 1, 0);
//这里模拟工作线程执行时间
//执行时间为挂起ThreadID + 1秒
ACE_OS::sleep(tvSleep);
pThreadInfo->SetState(WORKER_END);
return NULL;
}
void Event_Begin()
{
int nThreadCount = 5;
for(int i = 0; i < nThreadCount; i++)
{
_ThreadInfo* pThreadInfo = new _ThreadInfo();
pThreadInfo->m_nThreadID = i;
pThreadInfo->Init(&ThreadCallBack);
pThreadInfo->m_nGrpID = ACE_Thread_Manager::instance()->spawn((ACE_THR_FUNC)worker, (void* )pThreadInfo);
objvecThreadInfo.push_back(pThreadInfo);
}
}
void Event_End()
{
//最后清理
for(int i = 0; i < (int)objvecThreadInfo.size(); i++)
{
_ThreadInfo* pThreadInfo = (_ThreadInfo* )objvecThreadInfo;
delete pThreadInfo;
}
objvecThreadInfo.clear();
}
//裁决者线程
ACE_THR_FUNC Event_Rule(void* arg)
{
//这个假设等待15秒,作为最多等待15秒的最长等待时间。
ACE_Time_Value tvCheck(3, 0);
ACE_OS::sleep(tvCheck);
for(int i = 0; i < (int)objvecThreadInfo.size(); i++)
{
ACE_DEBUG((LM_INFO, "nThreadID=%d, m_emState=%d.\n", objvecThreadInfo->m_nThreadID, objvecThreadInfo->m_emState));
if(WORKER_END != objvecThreadInfo->m_emState)
{
//当前工作线程还没结束
KillThread(objvecThreadInfo->m_nGrpID);
ACE_DEBUG((LM_INFO, "nThreadID=%d, 你被裁决了!!\n", objvecThreadInfo->m_nThreadID));
}
}
g_Index++;
if(g_Index < MAX_RUN_COUNT)
{
//全部运行结束,进行下一轮
ACE_DEBUG((LM_INFO, "全部工作结束,进行下一轮[%d].\n", g_Index));
//进行线程清理工作
Event_End();
//开始新一轮
Event_Begin();
//创建一个裁决者
g_Rule_id = ACE_Thread_Manager::instance()->spawn((ACE_THR_FUNC)Event_Rule, (void* )NULL);
}
else
{
//彻底全部结束
ACE_DEBUG((LM_INFO, "彻底的工作结束啦.\n"));
}
return NULL;
}
//监控者(运行10次)
ACE_THR_FUNC Run(void* arg)
{
g_Index = 0;
//创建一个裁决者
g_Rule_id = ACE_Thread_Manager::instance()->spawn((ACE_THR_FUNC)Event_Rule, (void* )NULL);
//开始循环
Event_Begin();
return NULL;
}
//事件通知函数,当线程执行结束或者出错的时候
bool ThreadCallBack(int nThreadID, ENUM_STATE m_emState)
{
//在这里可以处理你的逻辑
//比如一共多少线程,线程全部结束后怎么处理等等。
ACE_DEBUG((LM_INFO, "nThreadID=%d, m_emState=%d.\n", nThreadID, m_emState));
//判断所有的工作线程是否已经结束
bool blAllFinish = true;
for(int i = 0; i < (int)objvecThreadInfo.size(); i++)
{
if(WORKER_END != objvecThreadInfo->m_emState)
{
blAllFinish = false;
}
}
if(blAllFinish == true)
{
g_Index++;
if(g_Index < MAX_RUN_COUNT)
{
//全部运行结束,进行下一轮
ACE_DEBUG((LM_INFO, "全部工作结束,进行下一轮[%d].\n", g_Index));
//杀死裁决者,裁决者你不用再等了
KillThread(g_Rule_id);
//进行线程清理工作
Event_End();
//开始新一轮
Event_Begin();
//创建一个裁决者
g_Rule_id = ACE_Thread_Manager::instance()->spawn((ACE_THR_FUNC)Event_Rule, (void* )NULL);
}
else
{
//彻底全部结束
ACE_DEBUG((LM_INFO, "彻底的工作结束啦.\n"));
}
}
return true;
}
int ACE_TMAIN(int argc, ACE_TCHAR* argv[])
{
ACE_Thread_Manager::instance()->spawn((ACE_THR_FUNC)Run, (void* )NULL);
getchar();
return 0;
}
实现版本二(根据眼总设计版本改造版本):
#include <stdio.h>
#include <stdlib.h>
#include "ace/SString.h"
#include "ace/Malloc.h"
#include "ace/Malloc_T.h"
#include "ace/Task_T.h"
#include "ace/Local_Memory_Pool.h"
#include "ace/Time_Value.h"
#include "ace/Date_Time.h"
#include "ace/OS_main.h"
#include "ace/OS_NS_sys_stat.h"
#include "ace/OS_NS_sys_socket.h"
#include "ace/OS_NS_unistd.h"
#include "ace/Thread.h"
#include "ace/Condition_T.h"
#include "ace/Synch.h"
#include <vector>
using namespace std;
typedefACE_Malloc<ACE_LOCAL_MEMORY_POOL, ACE_SYNCH_MUTEX> MUTEX_MALLOC;
typedef ACE_Allocator_Adapter<MUTEX_MALLOC> Mutex_Allocator;
Mutex_Allocator _module_service_mb_allocator;
//工作人员等待工作时间(30s)
#define WORKER_WAITTIME 30000000
//等待一个工作组完成时间(20s)
#define WORKGROUP_WAITTIME 20*1*1000*1000
//工作组数量
#define WORKGROUP_COUNT 10
//工作数量
#define WORK_COUNT 10
//工作
struct WORKINFO
{
int nTaskNo;
};
//工作组, 包含多个工作
struct WORKGROUPINFO
{
vector<WORKINFO*>vecWorks;
};
//任务, 包含多个工作组
struct TASKINFO
{
vector<WORKGROUPINFO*>vecWorkGroups;
};
//创建任务
int CreateTaskInfo(TASKINFO** ppTaskInfo)
{
TASKINFO* pTaskInfo = new TASKINFO();
int nWorkNo=0;
for (int i=0; i<WORKGROUP_COUNT;i++)
{
WORKGROUPINFO* pWGInfo = new WORKGROUPINFO();
for (int j=0; j<WORK_COUNT; j++)
{
WORKINFO* pWorkInfo = new WORKINFO();
pWorkInfo->nTaskNo = nWorkNo++;
pWGInfo->vecWorks.push_back(pWorkInfo);
}
pTaskInfo->vecWorkGroups.push_back(pWGInfo);
}
*ppTaskInfo = pTaskInfo;
return 0;
}
//清理任务
int DestroyTaskInfo(TASKINFO* pTaskInfo)
{
if (pTaskInfo == NULL)
{
return 0;
}
int nWorkGroupCount = pTaskInfo->vecWorkGroups.size();
for (int i=0;i<nWorkGroupCount;i++)
{
WORKGROUPINFO* pWGInfo = (WORKGROUPINFO*)pTaskInfo->vecWorkGroups;
if (pWGInfo != NULL)
{
int nWorkCount = pWGInfo->vecWorks.size();
for (int j=0; j<nWorkCount;j++)
{
WORKINFO* pWorkInfo = (WORKINFO*)pWGInfo->vecWorks;
if (pWorkInfo != NULL)
{
delete pWorkInfo;
pWorkInfo = NULL;
}
}
pWGInfo->vecWorks.clear();
delete pWGInfo;
pWGInfo = NULL;
}
}
pTaskInfo->vecWorkGroups.clear();
delete pTaskInfo;
pTaskInfo = NULL;
return 0;
}
//工作者, 执行工作
class CWorkers : public ACE_Task<ACE_MT_SYNCH>
{
public:
CWorkers() : m_mySema(0), m_nThreadCount(0), m_myMutex(0) {};
~CWorkers()
{
msg_queue()->deactivate();
msg_queue()->flush();
//杀死阻塞线程
#ifdef WIN32
ACE_hthread_t hthread = {0x00};
int ngrp_id = grp_id();
int nCount = thr_mgr()->hthread_grp_list(ngrp_id, hthread, 10);
for (int i=0; i<nCount;i++)
{
int ret = ::TerminateThread (hthread, -1);
thr_mgr()->wait_grp (ngrp_id);
ACE_DEBUG((LM_DEBUG, "-kill One Thread %d\n", ret));
}
#else
int ngrp_id = grp_id();
int ret = thr_mgr()->kill_grp(ngrp_id, SIGUSR1);
ACE_DEBUG((LM_DEBUG, "-kill One Thread %d\n", ret));
#endif
ACE_DEBUG((LM_INFO, "-Workers Exit!\n"));
};
bool Open(int nThreadCount)
{
if(activate(THR_NEW_LWP | THR_DETACHED | THR_JOINABLE, nThreadCount) == -1)
{
return false;
}
m_nThreadCount = nThreadCount;
return true;
}
int Wait(ACE_Time_Value& tvWait)
{
return m_mySema.acquire(tvWait);
}
intPutWorkInfo(WORKINFO* pWorkInfo)
{
ACE_Message_Block* mb = NULL;
ACE_NEW_RETURN(mb, ACE_Message_Block(sizeof(WORKINFO*)), -1);
WORKINFO** loadin = (WORKINFO **)mb->base();
*loadin = pWorkInfo;
ACE_Time_Value xtime;
xtime = ACE_OS::gettimeofday();
if(this->putq(mb, &xtime) == -1)
{
mb->release();
return -1;
}
return 0;
}
virtual int svc()
{
ACE_Message_Block* mb = NULL;
while (true)
{
ACE_Time_Value waitTime(0, WORKER_WAITTIME);
ACE_Time_Value tvNow01(ACE_OS::gettimeofday());
tvNow01 += waitTime;
if(getq(mb, &tvNow01) == -1)
{
break;
}
if (mb == NULL)
{
break;
}
WORKINFO* pWorkInfo = *((WORKINFO**)mb->base());
if (pWorkInfo == NULL)
{
mb->release();
break;
}
int nTaskNo = pWorkInfo->nTaskNo;
mb->release();
int nWaitTime = (nTaskNo>=7 && (nTaskNo%7)==0) ? nTaskNo*10 : nTaskNo;
ACE_OS::sleep(ACE_Time_Value(0, nWaitTime*1000*100));//等待以100ms为单位的时间
ACE_DEBUG((LM_INFO, "----WORK [%03d] HAVEN END !\n", nTaskNo, nWaitTime));
break;
}
m_myMutex.acquire();
int nCount = (--m_nThreadCount);
m_myMutex.release();
if (nCount<=0)
{
m_mySema.release();
}
return 0;
};
private:
ACE_Thread_Semaphore m_mySema; //信号量, 用来等待任务超时
int m_nThreadCount; //工作线程数
ACE_Thread_Mutex m_myMutex; //线程锁
};
//管理者, 负责分发任务
ACE_THR_FUNC Manager(void * arg)
{
TASKINFO* pTaskInfo = (TASKINFO*)arg;
if (pTaskInfo == NULL)
{
return NULL;
}
//工作组数
int nWorkGroupCount = pTaskInfo->vecWorkGroups.size();
//开始任务
for (int i=0; i<nWorkGroupCount;i++)
{
//开始工作组
WORKGROUPINFO* pWGINFO = pTaskInfo->vecWorkGroups;
if (pWGINFO != NULL)
{
ACE_DEBUG((LM_INFO, "--Work Group [%03d] Begin!\n", i));
//获得工作组中的工作数量
int nWorkCount = pWGINFO->vecWorks.size();
//启动工作线程
CWorkersworkers;
workers.Open(nWorkCount);
//分派任务
for (int j=0; j<nWorkCount;j++)
{
WORKINFO* pWorkInfo = pWGINFO->vecWorks;
workers.PutWorkInfo(pWorkInfo);
}
//等待任务结束或超时
ACE_Time_Value tvWait(0, WORKGROUP_WAITTIME);
tvWait += ACE_OS::gettimeofday();
workers.Wait(tvWait);
ACE_DEBUG((LM_INFO, "--Work Group [%03d] End!\n", i));
}
else
{
ACE_DEBUG((LM_INFO, "--Work Group [%03d] Failure!\n", i));
}
}
//清理任务信息
DestroyTaskInfo(pTaskInfo);
return NULL;
}
int ACE_TMAIN(int argc, ACE_TCHAR* argv[])
{
//创建任务
TASKINFO* pTaskInfo = NULL;
CreateTaskInfo(&pTaskInfo);
//分派任务
ACE_DEBUG((LM_INFO, "TASK BEGIN!\n"));
ACE_Thread_Manager::instance()->spawn((ACE_THR_FUNC)Manager, (void* )pTaskInfo);
getchar();
return 0;
}
非常棒!宇/ka程枫 webmaster 发表于 2014-4-16 08:59
非常棒!宇/ka程枫
谢谢, 还请多提意见 不错,你用了ace_task维护了你的进程组,非常好。
页:
[1]