|
自动驱动工作模型: 让程序自动的进行任务执行, 直到全部任务完成.
应用场景是, 当你的任务分为很多子任务, 子任务又包括很多的子模块, 一个子任务中的各个子模块的的执行是可以并行的, 而子任务是串行执行的,
一个子任务必须必须等待它前一个子任务结束后才能执行.
实现版本一(这个版本是版主设计):
- #include "stdafx.h"
- #include "ace/SString.h"
- #include "ace/Malloc.h"
- #include "ace/Malloc_T.h"
- #include "ace/Task_T.h"
- #include "ace/Local_Memory_Pool.h"
- #include "ace/Time_Value.h"
- #include "ace/OS_main.h"
- #include "ace/OS_NS_sys_stat.h"
- #include "ace/OS_NS_sys_socket.h"
- #include "ace/OS_NS_unistd.h"
- #include "ace/Thread.h"
- #include <vector>
- using namespace std;
- enum ENUM_STATE
- {
- WORKER_CREATE = 0,
- WORKER_BEGIN,
- WORKER_ERROR,
- WORKER_END
- };
- //工作线程对象信息
- struct _ThreadInfo
- {
- int m_nThreadID; //序列号
- int m_nGrpID; //线程的实际ID
- ENUM_STATE m_emState; //工作线程状态
- bool (*fnCallBack)(int nThreadID, ENUM_STATE m_emState);
- _ThreadInfo()
- {
- m_nThreadID = 0;
- m_nGrpID = 0;
- SetState(WORKER_CREATE);
- }
- void Init(bool (*fn)(int nThreadID, ENUM_STATE m_emState))
- {
- fnCallBack = fn;
- }
- void SetState(ENUM_STATE emState = WORKER_CREATE)
- {
- m_emState = emState;
- if(m_emState == WORKER_END || emState == WORKER_ERROR)
- {
- fnCallBack(m_nThreadID, emState);
- }
- }
- };
- typedef vector<_ThreadInfo*> vecThreadInfo;
- void KillThread(int nThreadID)
- {
- #ifdef WIN32
- ACE_hthread_t hthread = 0;
- if (ACE_Thread_Manager::instance()->hthread_grp_list(nThreadID, &hthread, 1) == 1)
- {
- ::TerminateThread ((HANDLE)hthread, -1);
- }
- #else
- ACE_Thread_Manager::instance()->kill_grp(nThreadID, SIGUSR1);
- #endif
- }
- bool ThreadCallBack(int nThreadID, ENUM_STATE m_emState);
- //全局变量部分
- //*********************************************
- int g_Rule_id = 0; //裁决者线程ID
- vecThreadInfo objvecThreadInfo; //所有工作线程信息
- int g_Index = 0; //当前运行次数
- #define MAX_RUN_COUNT 10 //运行10次
- //*********************************************
- //工作线程
- ACE_THR_FUNC worker(void* arg)
- {
- _ThreadInfo* pThreadInfo = (_ThreadInfo* )arg;
- pThreadInfo->SetState(WORKER_BEGIN);
- ACE_Time_Value tvSleep(pThreadInfo->m_nThreadID + 1, 0);
- //这里模拟工作线程执行时间
- //执行时间为挂起ThreadID + 1秒
- ACE_OS::sleep(tvSleep);
- pThreadInfo->SetState(WORKER_END);
- return NULL;
- }
- void Event_Begin()
- {
- int nThreadCount = 5;
- for(int i = 0; i < nThreadCount; i++)
- {
- _ThreadInfo* pThreadInfo = new _ThreadInfo();
- pThreadInfo->m_nThreadID = i;
- pThreadInfo->Init(&ThreadCallBack);
- pThreadInfo->m_nGrpID = ACE_Thread_Manager::instance()->spawn((ACE_THR_FUNC)worker, (void* )pThreadInfo);
- objvecThreadInfo.push_back(pThreadInfo);
- }
- }
- void Event_End()
- {
- //最后清理
- for(int i = 0; i < (int)objvecThreadInfo.size(); i++)
- {
- _ThreadInfo* pThreadInfo = (_ThreadInfo* )objvecThreadInfo[i];
- delete pThreadInfo;
- }
- objvecThreadInfo.clear();
- }
- //裁决者线程
- ACE_THR_FUNC Event_Rule(void* arg)
- {
- //这个假设等待15秒,作为最多等待15秒的最长等待时间。
- ACE_Time_Value tvCheck(3, 0);
- ACE_OS::sleep(tvCheck);
- for(int i = 0; i < (int)objvecThreadInfo.size(); i++)
- {
- ACE_DEBUG((LM_INFO, "[Rule]nThreadID=%d, m_emState=%d.\n", objvecThreadInfo[i]->m_nThreadID, objvecThreadInfo[i]->m_emState));
- if(WORKER_END != objvecThreadInfo[i]->m_emState)
- {
- //当前工作线程还没结束
- KillThread(objvecThreadInfo[i]->m_nGrpID);
- ACE_DEBUG((LM_INFO, "[Rule]nThreadID=%d, 你被裁决了!!\n", objvecThreadInfo[i]->m_nThreadID));
- }
- }
- g_Index++;
- if(g_Index < MAX_RUN_COUNT)
- {
- //全部运行结束,进行下一轮
- ACE_DEBUG((LM_INFO, "[Event_Rule]全部工作结束,进行下一轮[%d].\n", g_Index));
- //进行线程清理工作
- Event_End();
- //开始新一轮
- Event_Begin();
- //创建一个裁决者
- g_Rule_id = ACE_Thread_Manager::instance()->spawn((ACE_THR_FUNC)Event_Rule, (void* )NULL);
- }
- else
- {
- //彻底全部结束
- ACE_DEBUG((LM_INFO, "[Event_Rule]彻底的工作结束啦.\n"));
- }
- return NULL;
- }
- //监控者(运行10次)
- ACE_THR_FUNC Run(void* arg)
- {
- g_Index = 0;
- //创建一个裁决者
- g_Rule_id = ACE_Thread_Manager::instance()->spawn((ACE_THR_FUNC)Event_Rule, (void* )NULL);
- //开始循环
- Event_Begin();
- return NULL;
- }
- //事件通知函数,当线程执行结束或者出错的时候
- bool ThreadCallBack(int nThreadID, ENUM_STATE m_emState)
- {
- //在这里可以处理你的逻辑
- //比如一共多少线程,线程全部结束后怎么处理等等。
- ACE_DEBUG((LM_INFO, "[ThreadCallBack]nThreadID=%d, m_emState=%d.\n", nThreadID, m_emState));
- //判断所有的工作线程是否已经结束
- bool blAllFinish = true;
- for(int i = 0; i < (int)objvecThreadInfo.size(); i++)
- {
- if(WORKER_END != objvecThreadInfo[i]->m_emState)
- {
- blAllFinish = false;
- }
- }
- if(blAllFinish == true)
- {
- g_Index++;
- if(g_Index < MAX_RUN_COUNT)
- {
- //全部运行结束,进行下一轮
- ACE_DEBUG((LM_INFO, "[ThreadCallBack]全部工作结束,进行下一轮[%d].\n", g_Index));
- //杀死裁决者,裁决者你不用再等了
- KillThread(g_Rule_id);
- //进行线程清理工作
- Event_End();
- //开始新一轮
- Event_Begin();
- //创建一个裁决者
- g_Rule_id = ACE_Thread_Manager::instance()->spawn((ACE_THR_FUNC)Event_Rule, (void* )NULL);
- }
- else
- {
- //彻底全部结束
- ACE_DEBUG((LM_INFO, "[ThreadCallBack]彻底的工作结束啦.\n"));
- }
- }
- return true;
- }
- int ACE_TMAIN(int argc, ACE_TCHAR* argv[])
- {
- ACE_Thread_Manager::instance()->spawn((ACE_THR_FUNC)Run, (void* )NULL);
- getchar();
- return 0;
- }
复制代码 实现版本二(根据眼总设计版本改造版本):
- #include <stdio.h>
- #include <stdlib.h>
- #include "ace/SString.h"
- #include "ace/Malloc.h"
- #include "ace/Malloc_T.h"
- #include "ace/Task_T.h"
- #include "ace/Local_Memory_Pool.h"
- #include "ace/Time_Value.h"
- #include "ace/Date_Time.h"
- #include "ace/OS_main.h"
- #include "ace/OS_NS_sys_stat.h"
- #include "ace/OS_NS_sys_socket.h"
- #include "ace/OS_NS_unistd.h"
- #include "ace/Thread.h"
- #include "ace/Condition_T.h"
- #include "ace/Synch.h"
- #include <vector>
- using namespace std;
- typedef ACE_Malloc<ACE_LOCAL_MEMORY_POOL, ACE_SYNCH_MUTEX> MUTEX_MALLOC;
- typedef ACE_Allocator_Adapter<MUTEX_MALLOC> Mutex_Allocator;
- Mutex_Allocator _module_service_mb_allocator;
- //工作人员等待工作时间(30s)
- #define WORKER_WAITTIME 30000000
- //等待一个工作组完成时间(20s)
- #define WORKGROUP_WAITTIME 20*1*1000*1000
- //工作组数量
- #define WORKGROUP_COUNT 10
- //工作数量
- #define WORK_COUNT 10
- //工作
- struct WORKINFO
- {
- int nTaskNo;
- };
- //工作组, 包含多个工作
- struct WORKGROUPINFO
- {
- vector<WORKINFO*> vecWorks;
- };
- //任务, 包含多个工作组
- struct TASKINFO
- {
- vector<WORKGROUPINFO*> vecWorkGroups;
- };
- //创建任务
- int CreateTaskInfo(TASKINFO** ppTaskInfo)
- {
- TASKINFO* pTaskInfo = new TASKINFO();
- int nWorkNo=0;
- for (int i=0; i<WORKGROUP_COUNT;i++)
- {
- WORKGROUPINFO* pWGInfo = new WORKGROUPINFO();
- for (int j=0; j<WORK_COUNT; j++)
- {
- WORKINFO* pWorkInfo = new WORKINFO();
- pWorkInfo->nTaskNo = nWorkNo++;
- pWGInfo->vecWorks.push_back(pWorkInfo);
- }
- pTaskInfo->vecWorkGroups.push_back(pWGInfo);
- }
- *ppTaskInfo = pTaskInfo;
- return 0;
- }
- //清理任务
- int DestroyTaskInfo(TASKINFO* pTaskInfo)
- {
- if (pTaskInfo == NULL)
- {
- return 0;
- }
- int nWorkGroupCount = pTaskInfo->vecWorkGroups.size();
- for (int i=0;i<nWorkGroupCount;i++)
- {
- WORKGROUPINFO* pWGInfo = (WORKGROUPINFO*)pTaskInfo->vecWorkGroups[i];
- if (pWGInfo != NULL)
- {
- int nWorkCount = pWGInfo->vecWorks.size();
- for (int j=0; j<nWorkCount;j++)
- {
- WORKINFO* pWorkInfo = (WORKINFO*)pWGInfo->vecWorks[j];
- if (pWorkInfo != NULL)
- {
- delete pWorkInfo;
- pWorkInfo = NULL;
- }
- }
- pWGInfo->vecWorks.clear();
- delete pWGInfo;
- pWGInfo = NULL;
- }
- }
- pTaskInfo->vecWorkGroups.clear();
- delete pTaskInfo;
- pTaskInfo = NULL;
- return 0;
- }
- //工作者, 执行工作
- class CWorkers : public ACE_Task<ACE_MT_SYNCH>
- {
- public:
- CWorkers() : m_mySema(0), m_nThreadCount(0), m_myMutex(0) {};
- ~CWorkers()
- {
- msg_queue()->deactivate();
- msg_queue()->flush();
- //杀死阻塞线程
- #ifdef WIN32
- ACE_hthread_t hthread[16] = {0x00};
- int ngrp_id = grp_id();
- int nCount = thr_mgr()->hthread_grp_list(ngrp_id, hthread, 10);
- for (int i=0; i<nCount;i++)
- {
- int ret = ::TerminateThread (hthread[i], -1);
- thr_mgr()->wait_grp (ngrp_id);
- ACE_DEBUG((LM_DEBUG, "-kill One Thread %d\n", ret));
- }
- #else
- int ngrp_id = grp_id();
- int ret = thr_mgr()->kill_grp(ngrp_id, SIGUSR1);
- ACE_DEBUG((LM_DEBUG, "-kill One Thread %d\n", ret));
- #endif
- ACE_DEBUG((LM_INFO, "-Workers Exit!\n"));
- };
- bool Open(int nThreadCount)
- {
- if(activate(THR_NEW_LWP | THR_DETACHED | THR_JOINABLE, nThreadCount) == -1)
- {
- return false;
- }
- m_nThreadCount = nThreadCount;
- return true;
- }
- int Wait(ACE_Time_Value& tvWait)
- {
- return m_mySema.acquire(tvWait);
- }
- int PutWorkInfo(WORKINFO* pWorkInfo)
- {
- ACE_Message_Block* mb = NULL;
- ACE_NEW_RETURN(mb, ACE_Message_Block(sizeof(WORKINFO*)), -1);
- WORKINFO** loadin = (WORKINFO **)mb->base();
- *loadin = pWorkInfo;
- ACE_Time_Value xtime;
- xtime = ACE_OS::gettimeofday();
- if(this->putq(mb, &xtime) == -1)
- {
- mb->release();
- return -1;
- }
- return 0;
- }
- virtual int svc()
- {
- ACE_Message_Block* mb = NULL;
- while (true)
- {
- ACE_Time_Value waitTime(0, WORKER_WAITTIME);
- ACE_Time_Value tvNow01(ACE_OS::gettimeofday());
- tvNow01 += waitTime;
- if(getq(mb, &tvNow01) == -1)
- {
- break;
- }
- if (mb == NULL)
- {
- break;
- }
- WORKINFO* pWorkInfo = *((WORKINFO**)mb->base());
- if (pWorkInfo == NULL)
- {
- mb->release();
- break;
- }
- int nTaskNo = pWorkInfo->nTaskNo;
- mb->release();
- int nWaitTime = (nTaskNo>=7 && (nTaskNo%7)==0) ? nTaskNo*10 : nTaskNo;
- ACE_OS::sleep(ACE_Time_Value(0, nWaitTime*1000*100));//等待以100ms为单位的时间
- ACE_DEBUG((LM_INFO, "----WORK [%03d] HAVEN END [COST:%05d]!\n", nTaskNo, nWaitTime));
- break;
- }
- m_myMutex.acquire();
- int nCount = (--m_nThreadCount);
- m_myMutex.release();
- if (nCount<=0)
- {
- m_mySema.release();
- }
- return 0;
- };
- private:
- ACE_Thread_Semaphore m_mySema; //信号量, 用来等待任务超时
- int m_nThreadCount; //工作线程数
- ACE_Thread_Mutex m_myMutex; //线程锁
- };
- //管理者, 负责分发任务
- ACE_THR_FUNC Manager(void * arg)
- {
- TASKINFO* pTaskInfo = (TASKINFO*)arg;
- if (pTaskInfo == NULL)
- {
- return NULL;
- }
- //工作组数
- int nWorkGroupCount = pTaskInfo->vecWorkGroups.size();
- //开始任务
- for (int i=0; i<nWorkGroupCount;i++)
- {
- //开始工作组
- WORKGROUPINFO* pWGINFO = pTaskInfo->vecWorkGroups[i];
- if (pWGINFO != NULL)
- {
- ACE_DEBUG((LM_INFO, "--Work Group [%03d] Begin!\n", i));
- //获得工作组中的工作数量
- int nWorkCount = pWGINFO->vecWorks.size();
- //启动工作线程
- CWorkers workers;
- workers.Open(nWorkCount);
- //分派任务
- for (int j=0; j<nWorkCount;j++)
- {
- WORKINFO* pWorkInfo = pWGINFO->vecWorks[j];
- workers.PutWorkInfo(pWorkInfo);
- }
- //等待任务结束或超时
- ACE_Time_Value tvWait(0, WORKGROUP_WAITTIME);
- tvWait += ACE_OS::gettimeofday();
- workers.Wait(tvWait);
- ACE_DEBUG((LM_INFO, "--Work Group [%03d] End!\n", i));
- }
- else
- {
- ACE_DEBUG((LM_INFO, "--Work Group [%03d] Failure!\n", i));
- }
- }
- //清理任务信息
- DestroyTaskInfo(pTaskInfo);
- return NULL;
- }
- int ACE_TMAIN(int argc, ACE_TCHAR* argv[])
- {
- //创建任务
- TASKINFO* pTaskInfo = NULL;
- CreateTaskInfo(&pTaskInfo);
- //分派任务
- ACE_DEBUG((LM_INFO, "TASK BEGIN!\n"));
- ACE_Thread_Manager::instance()->spawn((ACE_THR_FUNC)Manager, (void* )pTaskInfo);
- getchar();
- return 0;
- }
复制代码
|
|