找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 4390|回复: 3

自动驱动工作模型的两种实现

[复制链接]
发表于 2014-4-15 18:04:23 | 显示全部楼层 |阅读模式
自动驱动工作模型: 让程序自动的进行任务执行, 直到全部任务完成.
应用场景是, 当你的任务分为很多子任务, 子任务又包括很多的子模块, 一个子任务中的各个子模块的的执行是可以并行的, 而子任务是串行执行的,
一个子任务必须必须等待它前一个子任务结束后才能执行.  

实现版本一(这个版本是版主设计):
  1. #include "stdafx.h"
  2. #include "ace/SString.h"
  3. #include "ace/Malloc.h"
  4. #include "ace/Malloc_T.h"
  5. #include "ace/Task_T.h"
  6. #include "ace/Local_Memory_Pool.h"
  7. #include "ace/Time_Value.h"
  8. #include "ace/OS_main.h"
  9. #include "ace/OS_NS_sys_stat.h"
  10. #include "ace/OS_NS_sys_socket.h"
  11. #include "ace/OS_NS_unistd.h"
  12. #include "ace/Thread.h"
  13. #include <vector>
  14. using namespace std;
  15. enum ENUM_STATE
  16. {
  17.         WORKER_CREATE = 0,
  18.         WORKER_BEGIN,
  19.         WORKER_ERROR,
  20.         WORKER_END
  21. };
  22. //工作线程对象信息
  23. struct _ThreadInfo
  24. {
  25.         int        m_nThreadID;      //序列号
  26.         int        m_nGrpID;         //线程的实际ID  
  27.         ENUM_STATE m_emState;        //工作线程状态
  28.         bool (*fnCallBack)(int nThreadID, ENUM_STATE m_emState);
  29.         _ThreadInfo()
  30.         {
  31.                 m_nThreadID = 0;
  32.                 m_nGrpID    = 0;
  33.                 SetState(WORKER_CREATE);
  34.         }
  35.         void Init(bool (*fn)(int nThreadID, ENUM_STATE m_emState))
  36.         {
  37.                 fnCallBack = fn;
  38.         }
  39.         void SetState(ENUM_STATE emState = WORKER_CREATE)
  40.         {
  41.                 m_emState = emState;
  42.                 if(m_emState == WORKER_END || emState == WORKER_ERROR)
  43.                 {
  44.                         fnCallBack(m_nThreadID, emState);
  45.                 }
  46.         }
  47. };
  48. typedef vector<_ThreadInfo*> vecThreadInfo;
  49. void KillThread(int nThreadID)
  50. {
  51. #ifdef WIN32
  52.         ACE_hthread_t hthread = 0;
  53.         if (ACE_Thread_Manager::instance()->hthread_grp_list(nThreadID, &hthread, 1) == 1)
  54.         {
  55.                 ::TerminateThread ((HANDLE)hthread, -1);
  56.         }
  57. #else
  58.         ACE_Thread_Manager::instance()->kill_grp(nThreadID, SIGUSR1);
  59. #endif
  60. }
  61. bool ThreadCallBack(int nThreadID, ENUM_STATE m_emState);
  62. //全局变量部分
  63. //*********************************************
  64. int g_Rule_id = 0;                 //裁决者线程ID
  65. vecThreadInfo objvecThreadInfo;    //所有工作线程信息
  66. int g_Index = 0;                   //当前运行次数
  67. #define MAX_RUN_COUNT 10           //运行10次  
  68. //*********************************************
  69. //工作线程
  70. ACE_THR_FUNC worker(void* arg)
  71. {
  72.         _ThreadInfo* pThreadInfo = (_ThreadInfo* )arg;
  73.         pThreadInfo->SetState(WORKER_BEGIN);
  74.         ACE_Time_Value tvSleep(pThreadInfo->m_nThreadID + 1, 0);
  75.         //这里模拟工作线程执行时间
  76.         //执行时间为挂起ThreadID + 1秒
  77.         ACE_OS::sleep(tvSleep);
  78.         pThreadInfo->SetState(WORKER_END);
  79.         return NULL;
  80. }
  81. void Event_Begin()
  82. {
  83.         int nThreadCount = 5;
  84.         for(int i = 0; i < nThreadCount; i++)
  85.         {
  86.                 _ThreadInfo* pThreadInfo = new _ThreadInfo();
  87.                 pThreadInfo->m_nThreadID = i;
  88.                 pThreadInfo->Init(&ThreadCallBack);
  89.                 pThreadInfo->m_nGrpID = ACE_Thread_Manager::instance()->spawn((ACE_THR_FUNC)worker, (void* )pThreadInfo);
  90.                 objvecThreadInfo.push_back(pThreadInfo);
  91.         }
  92. }
  93. void Event_End()
  94. {
  95.         //最后清理
  96.         for(int i = 0; i < (int)objvecThreadInfo.size(); i++)
  97.         {
  98.                 _ThreadInfo* pThreadInfo = (_ThreadInfo* )objvecThreadInfo[i];
  99.                 delete pThreadInfo;
  100.         }
  101.         objvecThreadInfo.clear();
  102. }
  103. //裁决者线程
  104. ACE_THR_FUNC Event_Rule(void* arg)
  105. {
  106.         //这个假设等待15秒,作为最多等待15秒的最长等待时间。
  107.         ACE_Time_Value tvCheck(3, 0);
  108.         ACE_OS::sleep(tvCheck);
  109.         for(int i = 0; i < (int)objvecThreadInfo.size(); i++)
  110.         {
  111.                 ACE_DEBUG((LM_INFO, "[Rule]nThreadID=%d, m_emState=%d.\n", objvecThreadInfo[i]->m_nThreadID, objvecThreadInfo[i]->m_emState));
  112.                 if(WORKER_END != objvecThreadInfo[i]->m_emState)
  113.                 {
  114.                         //当前工作线程还没结束
  115.                         KillThread(objvecThreadInfo[i]->m_nGrpID);
  116.                         ACE_DEBUG((LM_INFO, "[Rule]nThreadID=%d, 你被裁决了!!\n", objvecThreadInfo[i]->m_nThreadID));
  117.                 }
  118.         }
  119.         g_Index++;
  120.         if(g_Index < MAX_RUN_COUNT)
  121.         {
  122.                 //全部运行结束,进行下一轮
  123.                 ACE_DEBUG((LM_INFO, "[Event_Rule]全部工作结束,进行下一轮[%d].\n", g_Index));
  124.                 //进行线程清理工作
  125.                 Event_End();
  126.                 //开始新一轮
  127.                 Event_Begin();
  128.                 //创建一个裁决者
  129.                 g_Rule_id = ACE_Thread_Manager::instance()->spawn((ACE_THR_FUNC)Event_Rule, (void* )NULL);
  130.         }
  131.         else
  132.         {
  133.                 //彻底全部结束
  134.                 ACE_DEBUG((LM_INFO, "[Event_Rule]彻底的工作结束啦.\n"));
  135.         }
  136.         return NULL;
  137. }
  138. //监控者(运行10次)
  139. ACE_THR_FUNC Run(void* arg)
  140. {
  141.         g_Index = 0;
  142.         //创建一个裁决者
  143.         g_Rule_id = ACE_Thread_Manager::instance()->spawn((ACE_THR_FUNC)Event_Rule, (void* )NULL);
  144.         //开始循环
  145.         Event_Begin();
  146.         return NULL;
  147. }
  148. //事件通知函数,当线程执行结束或者出错的时候
  149. bool ThreadCallBack(int nThreadID, ENUM_STATE m_emState)
  150. {
  151.         //在这里可以处理你的逻辑
  152.         //比如一共多少线程,线程全部结束后怎么处理等等。
  153.         ACE_DEBUG((LM_INFO, "[ThreadCallBack]nThreadID=%d, m_emState=%d.\n", nThreadID, m_emState));
  154.         //判断所有的工作线程是否已经结束
  155.         bool blAllFinish = true;
  156.         for(int i = 0; i < (int)objvecThreadInfo.size(); i++)
  157.         {
  158.                 if(WORKER_END != objvecThreadInfo[i]->m_emState)
  159.                 {
  160.                         blAllFinish = false;
  161.                 }
  162.         }
  163.         if(blAllFinish == true)
  164.         {
  165.                 g_Index++;
  166.                 if(g_Index < MAX_RUN_COUNT)
  167.                 {
  168.                         //全部运行结束,进行下一轮
  169.                         ACE_DEBUG((LM_INFO, "[ThreadCallBack]全部工作结束,进行下一轮[%d].\n", g_Index));
  170.                         //杀死裁决者,裁决者你不用再等了
  171.                         KillThread(g_Rule_id);
  172.                         //进行线程清理工作
  173.                         Event_End();
  174.                         //开始新一轮
  175.                         Event_Begin();
  176.                         //创建一个裁决者
  177.                         g_Rule_id = ACE_Thread_Manager::instance()->spawn((ACE_THR_FUNC)Event_Rule, (void* )NULL);
  178.                 }
  179.                 else
  180.                 {
  181.                         //彻底全部结束
  182.                         ACE_DEBUG((LM_INFO, "[ThreadCallBack]彻底的工作结束啦.\n"));
  183.                 }
  184.         }
  185.         return true;
  186. }
  187. int ACE_TMAIN(int argc, ACE_TCHAR* argv[])
  188. {
  189.         ACE_Thread_Manager::instance()->spawn((ACE_THR_FUNC)Run, (void* )NULL);
  190.         getchar();
  191.         return 0;
  192. }
复制代码
实现版本二(根据眼总设计版本改造版本):
  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. #include "ace/SString.h"
  4. #include "ace/Malloc.h"
  5. #include "ace/Malloc_T.h"
  6. #include "ace/Task_T.h"
  7. #include "ace/Local_Memory_Pool.h"
  8. #include "ace/Time_Value.h"
  9. #include "ace/Date_Time.h"
  10. #include "ace/OS_main.h"
  11. #include "ace/OS_NS_sys_stat.h"
  12. #include "ace/OS_NS_sys_socket.h"
  13. #include "ace/OS_NS_unistd.h"
  14. #include "ace/Thread.h"
  15. #include "ace/Condition_T.h"
  16. #include "ace/Synch.h"
  17. #include <vector>
  18. using namespace std;
  19. typedef  ACE_Malloc<ACE_LOCAL_MEMORY_POOL, ACE_SYNCH_MUTEX> MUTEX_MALLOC;
  20. typedef ACE_Allocator_Adapter<MUTEX_MALLOC> Mutex_Allocator;
  21. Mutex_Allocator _module_service_mb_allocator;
  22. //工作人员等待工作时间(30s)
  23. #define WORKER_WAITTIME                30000000
  24. //等待一个工作组完成时间(20s)
  25. #define WORKGROUP_WAITTIME        20*1*1000*1000
  26. //工作组数量
  27. #define WORKGROUP_COUNT                10
  28. //工作数量
  29. #define WORK_COUNT                        10
  30. //工作
  31. struct WORKINFO
  32. {
  33.         int nTaskNo;
  34. };
  35. //工作组, 包含多个工作
  36. struct WORKGROUPINFO
  37. {
  38.         vector<WORKINFO*>  vecWorks;
  39. };
  40. //任务, 包含多个工作组
  41. struct TASKINFO
  42. {
  43.         vector<WORKGROUPINFO*>  vecWorkGroups;
  44. };
  45. //创建任务
  46. int CreateTaskInfo(TASKINFO** ppTaskInfo)
  47. {
  48.         TASKINFO* pTaskInfo = new TASKINFO();
  49.         int nWorkNo=0;
  50.         for (int i=0; i<WORKGROUP_COUNT;i++)
  51.         {
  52. WORKGROUPINFO* pWGInfo = new WORKGROUPINFO();
  53.                 for (int j=0; j<WORK_COUNT; j++)
  54.                 {
  55.                         WORKINFO* pWorkInfo = new WORKINFO();
  56.                         pWorkInfo->nTaskNo = nWorkNo++;
  57.                         pWGInfo->vecWorks.push_back(pWorkInfo);
  58.                 }
  59.                 pTaskInfo->vecWorkGroups.push_back(pWGInfo);
  60.         }
  61.         *ppTaskInfo = pTaskInfo;
  62.         return 0;
  63. }
  64. //清理任务
  65. int DestroyTaskInfo(TASKINFO* pTaskInfo)
  66. {
  67.         if (pTaskInfo == NULL)
  68.         {
  69.                 return 0;
  70.         }
  71.         int nWorkGroupCount = pTaskInfo->vecWorkGroups.size();
  72.         for (int i=0;i<nWorkGroupCount;i++)
  73.         {
  74.                 WORKGROUPINFO* pWGInfo = (WORKGROUPINFO*)pTaskInfo->vecWorkGroups[i];
  75.                 if (pWGInfo != NULL)
  76.                 {
  77.                         int nWorkCount = pWGInfo->vecWorks.size();
  78.                         for (int j=0; j<nWorkCount;j++)
  79.                         {
  80.                                 WORKINFO* pWorkInfo = (WORKINFO*)pWGInfo->vecWorks[j];
  81.                                 if (pWorkInfo != NULL)
  82.                                 {
  83.                                         delete pWorkInfo;
  84.                                         pWorkInfo = NULL;
  85.                                 }
  86.                         }
  87.                         pWGInfo->vecWorks.clear();
  88.                         delete pWGInfo;
  89.                         pWGInfo = NULL;
  90.                 }
  91.         }
  92.         pTaskInfo->vecWorkGroups.clear();
  93.         delete pTaskInfo;
  94.         pTaskInfo = NULL;
  95.         return 0;
  96. }
  97. //工作者, 执行工作
  98. class CWorkers : public ACE_Task<ACE_MT_SYNCH>
  99. {
  100. public:
  101.         CWorkers() : m_mySema(0), m_nThreadCount(0), m_myMutex(0) {};
  102.         ~CWorkers()
  103.         {
  104.                 msg_queue()->deactivate();
  105.                 msg_queue()->flush();
  106.         //杀死阻塞线程
  107. #ifdef WIN32
  108.                 ACE_hthread_t hthread[16] = {0x00};
  109.                 int ngrp_id = grp_id();
  110.                 int nCount = thr_mgr()->hthread_grp_list(ngrp_id, hthread, 10);
  111.                 for (int i=0; i<nCount;i++)
  112.                 {
  113.                         int ret = ::TerminateThread (hthread[i], -1);
  114.                         thr_mgr()->wait_grp (ngrp_id);
  115.                         ACE_DEBUG((LM_DEBUG, "-kill One Thread %d\n", ret));
  116.                 }
  117. #else
  118.                 int ngrp_id = grp_id();
  119.                 int ret = thr_mgr()->kill_grp(ngrp_id, SIGUSR1);
  120.                 ACE_DEBUG((LM_DEBUG, "-kill One Thread %d\n", ret));
  121. #endif
  122.                 ACE_DEBUG((LM_INFO, "-Workers Exit!\n"));
  123.         };
  124.         bool Open(int nThreadCount)
  125.         {
  126.                 if(activate(THR_NEW_LWP | THR_DETACHED | THR_JOINABLE, nThreadCount) == -1)
  127.                 {
  128.                         return false;
  129.                 }
  130.                 m_nThreadCount   = nThreadCount;
  131.                 return true;
  132.         }
  133.         int         Wait(ACE_Time_Value& tvWait)
  134.         {
  135.                 return m_mySema.acquire(tvWait);
  136.         }
  137.         int  PutWorkInfo(WORKINFO* pWorkInfo)
  138.         {
  139.                 ACE_Message_Block* mb = NULL;
  140.                 ACE_NEW_RETURN(mb, ACE_Message_Block(sizeof(WORKINFO*)), -1);
  141.                 WORKINFO** loadin = (WORKINFO **)mb->base();
  142.                 *loadin = pWorkInfo;
  143.                 ACE_Time_Value xtime;
  144.                 xtime = ACE_OS::gettimeofday();
  145.                 if(this->putq(mb, &xtime) == -1)
  146.                 {       
  147.                         mb->release();
  148.                         return -1;
  149.                 }
  150.                 return 0;
  151.         }
  152.         virtual int svc()
  153.         {
  154.                 ACE_Message_Block* mb = NULL;
  155.                 while (true)
  156.                 {
  157.                         ACE_Time_Value waitTime(0, WORKER_WAITTIME);
  158.                         ACE_Time_Value tvNow01(ACE_OS::gettimeofday());
  159.                         tvNow01 += waitTime;
  160.                         if(getq(mb, &tvNow01) == -1)
  161.                         {
  162.                                 break;
  163.                         }
  164.                         if (mb == NULL)
  165.                         {
  166.                                 break;
  167.                         }
  168.                         WORKINFO* pWorkInfo = *((WORKINFO**)mb->base());
  169.                         if (pWorkInfo == NULL)
  170.                         {
  171.                                 mb->release();
  172.                                 break;
  173.                         }
  174.                         int nTaskNo = pWorkInfo->nTaskNo;
  175.                         mb->release();
  176.                         int nWaitTime = (nTaskNo>=7 && (nTaskNo%7)==0) ? nTaskNo*10 : nTaskNo;
  177.                         ACE_OS::sleep(ACE_Time_Value(0, nWaitTime*1000*100));//等待以100ms为单位的时间
  178.                         ACE_DEBUG((LM_INFO, "----WORK [%03d] HAVEN END [COST:%05d]!\n", nTaskNo, nWaitTime));
  179.                         break;
  180.                 }
  181.                 m_myMutex.acquire();
  182.                 int nCount = (--m_nThreadCount);
  183.                 m_myMutex.release();
  184.                 if (nCount<=0)
  185.                 {
  186.                         m_mySema.release();
  187.                 }
  188.                 return 0;
  189.         };
  190. private:
  191.         ACE_Thread_Semaphore        m_mySema;                        //信号量, 用来等待任务超时
  192.         int                                                m_nThreadCount;                //工作线程数
  193.         ACE_Thread_Mutex                m_myMutex;                        //线程锁
  194. };
  195. //管理者, 负责分发任务
  196. ACE_THR_FUNC Manager(void * arg)
  197. {
  198.         TASKINFO* pTaskInfo = (TASKINFO*)arg;
  199.         if (pTaskInfo == NULL)
  200.         {
  201.                 return NULL;
  202.         }
  203.         //工作组数
  204.         int nWorkGroupCount = pTaskInfo->vecWorkGroups.size();
  205.         //开始任务
  206.         for (int i=0; i<nWorkGroupCount;i++)
  207.         {
  208.                 //开始工作组
  209.                 WORKGROUPINFO* pWGINFO = pTaskInfo->vecWorkGroups[i];
  210.                 if (pWGINFO != NULL)
  211.                 {
  212.                         ACE_DEBUG((LM_INFO, "--Work Group [%03d] Begin!\n", i));
  213.                         //获得工作组中的工作数量
  214.                         int nWorkCount = pWGINFO->vecWorks.size();
  215.                         //启动工作线程
  216.                         CWorkers  workers;
  217.                         workers.Open(nWorkCount);
  218.                         //分派任务
  219.                         for (int j=0; j<nWorkCount;j++)
  220.                         {
  221.                                 WORKINFO* pWorkInfo = pWGINFO->vecWorks[j];
  222.                                 workers.PutWorkInfo(pWorkInfo);
  223.                         }
  224.                         //等待任务结束或超时
  225.                         ACE_Time_Value tvWait(0, WORKGROUP_WAITTIME);
  226.                         tvWait += ACE_OS::gettimeofday();
  227.                         workers.Wait(tvWait);
  228.                         ACE_DEBUG((LM_INFO, "--Work Group [%03d] End!\n", i));
  229.                 }
  230.                 else
  231.                 {
  232.                         ACE_DEBUG((LM_INFO, "--Work Group [%03d] Failure!\n", i));
  233.                 }
  234.         }
  235.         //清理任务信息
  236.         DestroyTaskInfo(pTaskInfo);
  237.         return NULL;
  238. }
  239. int ACE_TMAIN(int argc, ACE_TCHAR* argv[])
  240. {
  241.         //创建任务
  242.         TASKINFO* pTaskInfo = NULL;
  243.         CreateTaskInfo(&pTaskInfo);
  244.         //分派任务
  245.         ACE_DEBUG((LM_INFO, "TASK BEGIN!\n"));
  246.         ACE_Thread_Manager::instance()->spawn((ACE_THR_FUNC)Manager, (void* )pTaskInfo);
  247.         getchar();
  248.         return 0;
  249. }
复制代码





发表于 2014-4-16 08:59:37 | 显示全部楼层
非常棒!宇/ka程枫

点评

谢谢, 还请多提意见  详情 回复 发表于 2014-4-16 10:43
 楼主| 发表于 2014-4-16 10:43:16 | 显示全部楼层
webmaster 发表于 2014-4-16 08:59
非常棒!宇/ka程枫

谢谢, 还请多提意见
发表于 2014-4-16 13:46:14 | 显示全部楼层
不错,你用了ace_task维护了你的进程组,非常好。
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-4-28 20:05 , Processed in 0.018410 second(s), 6 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表