|
使用Linux平台上现有的信号量sem_t相关的一组API,可以方便地进行线程同步。现在用pthread_mutex_t和pthread_cond_t相关的一组API实现信号量机制。这组API包括:pthread_mutex_init,pthread_cond_init,pthread_mutex_lock,pthread_cond_signal,pthread_mutex_unlock,pthread_cond_wait,pthread_cond_timedwait,pthread_cond_destroy和pthread_mutex_destroy,可以在
http://www.9linux.com找到各API的说明。下边,是封装的信号量类,以及测试代码。使用VS2005编辑,在虚拟机 Fedora 13中编译,测试通过。
MySemaphore.h- #ifndef Semaphore_Header
- #define Semaphore_Header
- #include <iostream>
- #include <pthread.h>
- #include <errno.h>
- #include <assert.h>
- using namespace std;
- //------------------------------------------------------------------------
- class CSemaphoreImpl
- {
- protected:
- CSemaphoreImpl(int n, int max);
- ~CSemaphoreImpl();
- void SetImpl();
- void WaitImpl();
- bool WaitImpl(long lMilliseconds);
- private:
- volatile int m_n;
- int m_max;
- pthread_mutex_t m_mutex;
- pthread_cond_t m_cond;
- };
- inline void CSemaphoreImpl::SetImpl()
- {
- if (pthread_mutex_lock(&m_mutex))
- cout<<"cannot signal semaphore (lock)"<<endl;
- if (m_n < m_max)
- {
- ++m_n;
- }
- else
- {
- pthread_mutex_unlock(&m_mutex);
- cout<<"cannot signal semaphore: count would exceed maximum"<<" m_n = "<<m_n<<"m_max = "<<m_max<<endl;
- }
- //重新开始等待m_cond的线程,可被调度
- if (pthread_cond_signal(&m_cond))
- {
- pthread_mutex_unlock(&m_mutex);
- cout<<"cannot signal semaphore"<<endl;
- }
- pthread_mutex_unlock(&m_mutex);
- }
- //------------------------------------------------------------------------
- /*
- 信号量同步机制
- 信号量提供一个计数值,可以进行原子操作。V 将计数值加1,使得
- 等待该信号量的线程可以被调用(调用Set()),P 将计数值减1,使
- 当前线程被挂起,进行睡眠(调用Wait())。
- 当信号量的计数值被初始化为0时,调用P操作,将挂起当前线程。
- 当信号量被激活,即调用V操作后,被挂起的线程就有机会被重新调度了。
- */
- class CMySemaphore: private CSemaphoreImpl
- {
- public:
- /*
- 创建一个信号量,信号量计数值当前值为参数n,最大值为max。
- 如果只有n,则n必须大于0;如果同时有n和max,则n必须不小
- 于0,且不大于max
- */
- CMySemaphore(int n);
- CMySemaphore(int n, int max);
-
- /*
- 销毁一个信号量
- */
- ~CMySemaphore();
- /*
- 对信号量计数值做加1动作,信号量变为有信号状态,使得
- 另一个等待该信号量的线程可以被调度
- */
- void Set();
- /*
- 对信号量计数值做减1动作,信号量变为无信号状态。若
- 计数值变得大于0时,信号量才会变为有信号状态。
- */
- void Wait();
- /*
- 在给定的时间间隔里等待信号量变为有信号状态,若成功,
- 则将计数值减1,否则将发生超时。
- */
- void Wait(long lMilliseconds);
- /*
- 在给定的时间间隔里等待信号量变为有信号状态,若成功,
- 则将计数值减1,返回true;否则返回false。
- */
- bool TryWait(long lMilliseconds);
- private:
- CMySemaphore();
- CMySemaphore(const CMySemaphore&);
- CMySemaphore& operator = (const CMySemaphore&);
- };
- inline void CMySemaphore::Set()
- {
- SetImpl();
- }
- inline void CMySemaphore::Wait()
- {
- WaitImpl();
- }
- inline void CMySemaphore::Wait(long lMilliseconds)
- {
- if (!WaitImpl(lMilliseconds))
- cout<<"time out"<<endl;
- }
- inline bool CMySemaphore::TryWait(long lMilliseconds)
- {
- return WaitImpl(lMilliseconds);
- }
- #endif
- MySemaphore.cpp
- #include "MySemaphore.h"
- #include <sys/time.h>
- CSemaphoreImpl::CSemaphoreImpl(int n, int max): m_n(n), m_max(max)
- {
- assert (n >= 0 && max > 0 && n <= max);
- if (pthread_mutex_init(&m_mutex, NULL))
- cout<<"cannot create semaphore (mutex)"<<endl;
- if (pthread_cond_init(&m_cond, NULL))
- cout<<"cannot create semaphore (condition)"<<endl;
- }
- CSemaphoreImpl::~CSemaphoreImpl()
- {
- pthread_cond_destroy(&m_cond);
- pthread_mutex_destroy(&m_mutex);
- }
- void CSemaphoreImpl::WaitImpl()
- {
- if (pthread_mutex_lock(&m_mutex))
- cout<<"wait for semaphore failed (lock)"<<endl;
- while (m_n < 1)
- {
- //对互斥体进行原子的解锁工作,然后等待状态信号
- if (pthread_cond_wait(&m_cond, &m_mutex))
- {
- pthread_mutex_unlock(&m_mutex);
- cout<<"wait for semaphore failed"<<endl;
- }
- }
- --m_n;
- pthread_mutex_unlock(&m_mutex);
- }
- bool CSemaphoreImpl::WaitImpl(long lMilliseconds)
- {
- int rc = 0;
- struct timespec abstime;
- struct timeval tv;
- gettimeofday(&tv, NULL);
- abstime.tv_sec = tv.tv_sec + lMilliseconds / 1000;
- abstime.tv_nsec = tv.tv_usec*1000 + (lMilliseconds % 1000)*1000000;
- if (abstime.tv_nsec >= 1000000000)
- {
- abstime.tv_nsec -= 1000000000;
- abstime.tv_sec++;
- }
- if (pthread_mutex_lock(&m_mutex) != 0)
- cout<<"wait for semaphore failed (lock)"<<endl;
- while (m_n < 1)
- {
- //自动释放互斥体并且等待m_cond状态,并且限制了最大的等待时间
- if ((rc = pthread_cond_timedwait(&m_cond, &m_mutex, &abstime)))
- {
- if (rc == ETIMEDOUT) break;
- pthread_mutex_unlock(&m_mutex);
- cout<<"cannot wait for semaphore"<<endl;
- }
- }
- if (rc == 0) --m_n;
- pthread_mutex_unlock(&m_mutex);
- return rc == 0;
- }
- CMySemaphore::CMySemaphore(int n): CSemaphoreImpl(n, n)
- {
- }
- CMySemaphore::CMySemaphore(int n, int max): CSemaphoreImpl(n, max)
- {
- }
- CMySemaphore::~CMySemaphore()
- {
- }
复制代码 下边是测试代码- // pthread_semaphore.cpp : 定义控制台应用程序的入口点。
- //
- #include "MySemaphore.h"
- //创建一个信号量,其计数值当前值为0,最大值为3
- CMySemaphore g_MySem(0, 3);
- //线程函数
- void * StartThread(void *pParam)
- {
- //休眠1秒,确保主线程函数main中
- //创建工作线程下一句g_MySem.Set();先执行
- sleep(1);
- g_MySem.Wait(); //信号量计数值减1
- cout<<"Do print StartThread"<<endl;
- return (void *)0;
- }
- int main(int argc, char* argv[])
- {
- pthread_t thread;
- pthread_attr_t attr;
- assert ( !g_MySem.TryWait(10) );
- g_MySem.Set(); //信号量计数值加1
- g_MySem.Wait(); //信号量计数值减1
- try
- {
- g_MySem.Wait(100);
- cout<<"must timeout"<<endl; //此处发生超时
- }
- catch (...)
- {
- cout<<"wrong exception"<<endl;
- }
- g_MySem.Set();
- g_MySem.Set();
- assert ( g_MySem.TryWait(0) );
- g_MySem.Wait();
- assert ( !g_MySem.TryWait(10) );
- //创建工作线程
- pthread_attr_init(&attr);
- pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_JOINABLE);
- if (pthread_create(&thread,&attr, StartThread,NULL) == -1)
- {
- cout<<"StartThread: create failed"<<endl;
- }
- g_MySem.Set();
- //等待线程结束
- void *result;
- pthread_join(thread,&result);
- assert ( !g_MySem.TryWait(10) ); //若将断言中的 ! 去掉,则会发生断言错误
- //关闭线程句柄,释放资源
- pthread_attr_destroy(&attr);
- int iWait;
- cin>>iWait;
- return 0;
- }
复制代码 编译,运行。可以看到,与Win32平台上的测试结果相同
由此可见,信号量机制很关键的一点就是计数值 m_n。
作者:chexlong 发表于2011-12-23 22:25:10 原文链接 |
|