lihaowei2028 发表于 2014-4-15 18:04:23

自动驱动工作模型的两种实现

自动驱动工作模型: 让程序自动的进行任务执行, 直到全部任务完成.
应用场景是, 当你的任务分为很多子任务, 子任务又包括很多的子模块, 一个子任务中的各个子模块的的执行是可以并行的, 而子任务是串行执行的,
一个子任务必须必须等待它前一个子任务结束后才能执行.

实现版本一(这个版本是版主设计):

#include "stdafx.h"

#include "ace/SString.h"
#include "ace/Malloc.h"
#include "ace/Malloc_T.h"
#include "ace/Task_T.h"
#include "ace/Local_Memory_Pool.h"
#include "ace/Time_Value.h"
#include "ace/OS_main.h"
#include "ace/OS_NS_sys_stat.h"
#include "ace/OS_NS_sys_socket.h"
#include "ace/OS_NS_unistd.h"
#include "ace/Thread.h"

#include <vector>

using namespace std;

enum ENUM_STATE
{
        WORKER_CREATE = 0,
        WORKER_BEGIN,
        WORKER_ERROR,
        WORKER_END
};

//工作线程对象信息
struct _ThreadInfo
{
        int      m_nThreadID;      //序列号
        int      m_nGrpID;         //线程的实际ID
        ENUM_STATE m_emState;      //工作线程状态

        bool (*fnCallBack)(int nThreadID, ENUM_STATE m_emState);

        _ThreadInfo()
        {
                m_nThreadID = 0;
                m_nGrpID    = 0;
                SetState(WORKER_CREATE);
        }

        void Init(bool (*fn)(int nThreadID, ENUM_STATE m_emState))
        {
                fnCallBack = fn;
        }

        void SetState(ENUM_STATE emState = WORKER_CREATE)
        {
                m_emState = emState;
                if(m_emState == WORKER_END || emState == WORKER_ERROR)
                {
                        fnCallBack(m_nThreadID, emState);
                }
        }
};

typedef vector<_ThreadInfo*> vecThreadInfo;

void KillThread(int nThreadID)
{
#ifdef WIN32
        ACE_hthread_t hthread = 0;
        if (ACE_Thread_Manager::instance()->hthread_grp_list(nThreadID, &hthread, 1) == 1)
        {
                ::TerminateThread ((HANDLE)hthread, -1);
        }
#else
        ACE_Thread_Manager::instance()->kill_grp(nThreadID, SIGUSR1);
#endif
}

bool ThreadCallBack(int nThreadID, ENUM_STATE m_emState);

//全局变量部分
//*********************************************
int g_Rule_id = 0;               //裁决者线程ID
vecThreadInfo objvecThreadInfo;    //所有工作线程信息
int g_Index = 0;                   //当前运行次数
#define MAX_RUN_COUNT 10         //运行10次
//*********************************************

//工作线程
ACE_THR_FUNC worker(void* arg)
{
        _ThreadInfo* pThreadInfo = (_ThreadInfo* )arg;
        pThreadInfo->SetState(WORKER_BEGIN);

        ACE_Time_Value tvSleep(pThreadInfo->m_nThreadID + 1, 0);

        //这里模拟工作线程执行时间
        //执行时间为挂起ThreadID + 1秒
        ACE_OS::sleep(tvSleep);

        pThreadInfo->SetState(WORKER_END);

        return NULL;
}

void Event_Begin()
{
        int nThreadCount = 5;

        for(int i = 0; i < nThreadCount; i++)
        {
                _ThreadInfo* pThreadInfo = new _ThreadInfo();
                pThreadInfo->m_nThreadID = i;
                pThreadInfo->Init(&ThreadCallBack);

                pThreadInfo->m_nGrpID = ACE_Thread_Manager::instance()->spawn((ACE_THR_FUNC)worker, (void* )pThreadInfo);

                objvecThreadInfo.push_back(pThreadInfo);
        }
}

void Event_End()
{
        //最后清理
        for(int i = 0; i < (int)objvecThreadInfo.size(); i++)
        {
                _ThreadInfo* pThreadInfo = (_ThreadInfo* )objvecThreadInfo;
                delete pThreadInfo;
        }

        objvecThreadInfo.clear();
}


//裁决者线程
ACE_THR_FUNC Event_Rule(void* arg)
{
        //这个假设等待15秒,作为最多等待15秒的最长等待时间。
        ACE_Time_Value tvCheck(3, 0);
        ACE_OS::sleep(tvCheck);

        for(int i = 0; i < (int)objvecThreadInfo.size(); i++)
        {
                ACE_DEBUG((LM_INFO, "nThreadID=%d, m_emState=%d.\n", objvecThreadInfo->m_nThreadID, objvecThreadInfo->m_emState));
                if(WORKER_END != objvecThreadInfo->m_emState)
                {
                        //当前工作线程还没结束
                        KillThread(objvecThreadInfo->m_nGrpID);
                        ACE_DEBUG((LM_INFO, "nThreadID=%d, 你被裁决了!!\n", objvecThreadInfo->m_nThreadID));
                }
        }

        g_Index++;
        if(g_Index < MAX_RUN_COUNT)
        {
                //全部运行结束,进行下一轮
                ACE_DEBUG((LM_INFO, "全部工作结束,进行下一轮[%d].\n", g_Index));

                //进行线程清理工作
                Event_End();

                //开始新一轮
                Event_Begin();

                //创建一个裁决者
                g_Rule_id = ACE_Thread_Manager::instance()->spawn((ACE_THR_FUNC)Event_Rule, (void* )NULL);

        }
        else
        {
                //彻底全部结束
                ACE_DEBUG((LM_INFO, "彻底的工作结束啦.\n"));
        }

        return NULL;
}

//监控者(运行10次)
ACE_THR_FUNC Run(void* arg)
{
        g_Index = 0;

        //创建一个裁决者
        g_Rule_id = ACE_Thread_Manager::instance()->spawn((ACE_THR_FUNC)Event_Rule, (void* )NULL);

        //开始循环
        Event_Begin();

        return NULL;
}

//事件通知函数,当线程执行结束或者出错的时候
bool ThreadCallBack(int nThreadID, ENUM_STATE m_emState)
{
        //在这里可以处理你的逻辑
        //比如一共多少线程,线程全部结束后怎么处理等等。
        ACE_DEBUG((LM_INFO, "nThreadID=%d, m_emState=%d.\n", nThreadID, m_emState));

        //判断所有的工作线程是否已经结束
        bool blAllFinish = true;
        for(int i = 0; i < (int)objvecThreadInfo.size(); i++)
        {
                if(WORKER_END != objvecThreadInfo->m_emState)
                {
                        blAllFinish = false;
                }
        }

        if(blAllFinish == true)
        {
                g_Index++;
                if(g_Index < MAX_RUN_COUNT)
                {
                        //全部运行结束,进行下一轮
                        ACE_DEBUG((LM_INFO, "全部工作结束,进行下一轮[%d].\n", g_Index));

                        //杀死裁决者,裁决者你不用再等了
                        KillThread(g_Rule_id);

                        //进行线程清理工作
                        Event_End();

                        //开始新一轮
                        Event_Begin();

                        //创建一个裁决者
                        g_Rule_id = ACE_Thread_Manager::instance()->spawn((ACE_THR_FUNC)Event_Rule, (void* )NULL);

                }
                else
                {
                        //彻底全部结束
                        ACE_DEBUG((LM_INFO, "彻底的工作结束啦.\n"));
                }
        }

        return true;
}

int ACE_TMAIN(int argc, ACE_TCHAR* argv[])
{

        ACE_Thread_Manager::instance()->spawn((ACE_THR_FUNC)Run, (void* )NULL);

        getchar();
        return 0;
}

实现版本二(根据眼总设计版本改造版本):

#include <stdio.h>
#include <stdlib.h>

#include "ace/SString.h"
#include "ace/Malloc.h"
#include "ace/Malloc_T.h"
#include "ace/Task_T.h"
#include "ace/Local_Memory_Pool.h"
#include "ace/Time_Value.h"
#include "ace/Date_Time.h"
#include "ace/OS_main.h"
#include "ace/OS_NS_sys_stat.h"
#include "ace/OS_NS_sys_socket.h"
#include "ace/OS_NS_unistd.h"
#include "ace/Thread.h"
#include "ace/Condition_T.h"
#include "ace/Synch.h"


#include <vector>

using namespace std;

typedefACE_Malloc<ACE_LOCAL_MEMORY_POOL, ACE_SYNCH_MUTEX> MUTEX_MALLOC;
typedef ACE_Allocator_Adapter<MUTEX_MALLOC> Mutex_Allocator;
Mutex_Allocator _module_service_mb_allocator;

//工作人员等待工作时间(30s)
#define WORKER_WAITTIME                30000000

//等待一个工作组完成时间(20s)
#define WORKGROUP_WAITTIME        20*1*1000*1000


//工作组数量
#define WORKGROUP_COUNT                10

//工作数量
#define WORK_COUNT                        10

//工作
struct WORKINFO
{
        int nTaskNo;
};

//工作组, 包含多个工作
struct WORKGROUPINFO
{
        vector<WORKINFO*>vecWorks;
};

//任务, 包含多个工作组
struct TASKINFO
{
        vector<WORKGROUPINFO*>vecWorkGroups;
};

//创建任务
int CreateTaskInfo(TASKINFO** ppTaskInfo)
{
        TASKINFO* pTaskInfo = new TASKINFO();

        int nWorkNo=0;
        for (int i=0; i<WORKGROUP_COUNT;i++)
        {
WORKGROUPINFO* pWGInfo = new WORKGROUPINFO();
                for (int j=0; j<WORK_COUNT; j++)
                {
                        WORKINFO* pWorkInfo = new WORKINFO();

                        pWorkInfo->nTaskNo = nWorkNo++;
                        pWGInfo->vecWorks.push_back(pWorkInfo);
                }

                pTaskInfo->vecWorkGroups.push_back(pWGInfo);
        }

        *ppTaskInfo = pTaskInfo;

        return 0;
}


//清理任务
int DestroyTaskInfo(TASKINFO* pTaskInfo)
{
        if (pTaskInfo == NULL)
        {
                return 0;
        }

        int nWorkGroupCount = pTaskInfo->vecWorkGroups.size();

        for (int i=0;i<nWorkGroupCount;i++)
        {
                WORKGROUPINFO* pWGInfo = (WORKGROUPINFO*)pTaskInfo->vecWorkGroups;
                if (pWGInfo != NULL)
                {
                        int nWorkCount = pWGInfo->vecWorks.size();
                        for (int j=0; j<nWorkCount;j++)
                        {
                                WORKINFO* pWorkInfo = (WORKINFO*)pWGInfo->vecWorks;
                                if (pWorkInfo != NULL)
                                {
                                        delete pWorkInfo;
                                        pWorkInfo = NULL;
                                }
                        }

                        pWGInfo->vecWorks.clear();
                        delete pWGInfo;
                        pWGInfo = NULL;
                }
        }

        pTaskInfo->vecWorkGroups.clear();
        delete pTaskInfo;
        pTaskInfo = NULL;

        return 0;
}

//工作者, 执行工作
class CWorkers : public ACE_Task<ACE_MT_SYNCH>
{
public:
        CWorkers() : m_mySema(0), m_nThreadCount(0), m_myMutex(0) {};
        ~CWorkers()
        {
                msg_queue()->deactivate();
                msg_queue()->flush();


        //杀死阻塞线程
#ifdef WIN32
                ACE_hthread_t hthread = {0x00};
                int ngrp_id = grp_id();
                int nCount = thr_mgr()->hthread_grp_list(ngrp_id, hthread, 10);

                for (int i=0; i<nCount;i++)
                {
                        int ret = ::TerminateThread (hthread, -1);
                        thr_mgr()->wait_grp (ngrp_id);
                        ACE_DEBUG((LM_DEBUG, "-kill One Thread %d\n", ret));
                }
#else
                int ngrp_id = grp_id();
                int ret = thr_mgr()->kill_grp(ngrp_id, SIGUSR1);
                ACE_DEBUG((LM_DEBUG, "-kill One Thread %d\n", ret));
#endif

                ACE_DEBUG((LM_INFO, "-Workers Exit!\n"));
        };

        bool Open(int nThreadCount)
        {
                if(activate(THR_NEW_LWP | THR_DETACHED | THR_JOINABLE, nThreadCount) == -1)
                {
                        return false;
                }

                m_nThreadCount   = nThreadCount;

                return true;
        }

        int       Wait(ACE_Time_Value& tvWait)
        {
                return m_mySema.acquire(tvWait);
        }

        intPutWorkInfo(WORKINFO* pWorkInfo)
        {
                ACE_Message_Block* mb = NULL;
                ACE_NEW_RETURN(mb, ACE_Message_Block(sizeof(WORKINFO*)), -1);

                WORKINFO** loadin = (WORKINFO **)mb->base();
                *loadin = pWorkInfo;

                ACE_Time_Value xtime;
                xtime = ACE_OS::gettimeofday();
                if(this->putq(mb, &xtime) == -1)
                {       
                        mb->release();
                        return -1;
                }

                return 0;

        }

        virtual int svc()
        {
                ACE_Message_Block* mb = NULL;

                while (true)
                {
                        ACE_Time_Value waitTime(0, WORKER_WAITTIME);
                        ACE_Time_Value tvNow01(ACE_OS::gettimeofday());
                        tvNow01 += waitTime;
                        if(getq(mb, &tvNow01) == -1)
                        {
                                break;
                        }

                        if (mb == NULL)
                        {
                                break;
                        }

                        WORKINFO* pWorkInfo = *((WORKINFO**)mb->base());
                        if (pWorkInfo == NULL)
                        {
                                mb->release();
                                break;
                        }

                        int nTaskNo = pWorkInfo->nTaskNo;

                        mb->release();

                        int nWaitTime = (nTaskNo>=7 && (nTaskNo%7)==0) ? nTaskNo*10 : nTaskNo;

                        ACE_OS::sleep(ACE_Time_Value(0, nWaitTime*1000*100));//等待以100ms为单位的时间

                        ACE_DEBUG((LM_INFO, "----WORK [%03d] HAVEN END !\n", nTaskNo, nWaitTime));

                        break;
                }


                m_myMutex.acquire();
                int nCount = (--m_nThreadCount);
                m_myMutex.release();

                if (nCount<=0)
                {
                        m_mySema.release();
                }

                return 0;
        };


private:
        ACE_Thread_Semaphore        m_mySema;                        //信号量, 用来等待任务超时
        int                                                m_nThreadCount;                //工作线程数
        ACE_Thread_Mutex                m_myMutex;                        //线程锁

};

//管理者, 负责分发任务
ACE_THR_FUNC Manager(void * arg)
{
        TASKINFO* pTaskInfo = (TASKINFO*)arg;

        if (pTaskInfo == NULL)
        {
                return NULL;
        }

        //工作组数
        int nWorkGroupCount = pTaskInfo->vecWorkGroups.size();

        //开始任务
        for (int i=0; i<nWorkGroupCount;i++)
        {
                //开始工作组
                WORKGROUPINFO* pWGINFO = pTaskInfo->vecWorkGroups;
                if (pWGINFO != NULL)
                {
                        ACE_DEBUG((LM_INFO, "--Work Group [%03d] Begin!\n", i));

                        //获得工作组中的工作数量
                        int nWorkCount = pWGINFO->vecWorks.size();

                        //启动工作线程
                        CWorkersworkers;
                        workers.Open(nWorkCount);

                        //分派任务
                        for (int j=0; j<nWorkCount;j++)
                        {
                                WORKINFO* pWorkInfo = pWGINFO->vecWorks;
                                workers.PutWorkInfo(pWorkInfo);
                        }

                        //等待任务结束或超时
                        ACE_Time_Value tvWait(0, WORKGROUP_WAITTIME);

                        tvWait += ACE_OS::gettimeofday();
                        workers.Wait(tvWait);

                        ACE_DEBUG((LM_INFO, "--Work Group [%03d] End!\n", i));

                }
                else
                {
                        ACE_DEBUG((LM_INFO, "--Work Group [%03d] Failure!\n", i));
                }
        }

        //清理任务信息
        DestroyTaskInfo(pTaskInfo);

        return NULL;
}


int ACE_TMAIN(int argc, ACE_TCHAR* argv[])
{
        //创建任务
        TASKINFO* pTaskInfo = NULL;
        CreateTaskInfo(&pTaskInfo);

        //分派任务
        ACE_DEBUG((LM_INFO, "TASK BEGIN!\n"));
        ACE_Thread_Manager::instance()->spawn((ACE_THR_FUNC)Manager, (void* )pTaskInfo);

        getchar();
        return 0;
}





webmaster 发表于 2014-4-16 08:59:37

非常棒!宇/ka程枫

lihaowei2028 发表于 2014-4-16 10:43:16

webmaster 发表于 2014-4-16 08:59
非常棒!宇/ka程枫

谢谢, 还请多提意见

freeeyes 发表于 2014-4-16 13:46:14

不错,你用了ace_task维护了你的进程组,非常好。
页: [1]
查看完整版本: 自动驱动工作模型的两种实现