winston 发表于 2011-12-22 11:25:57

Lock-Free的链表实现及与加锁实现的性能对比

在编写多线程程序时,临界资源的处理常常需要互斥量、读写锁等来加以保护。这时需要考虑锁的粒度问题,粒度太粗,会出现很多线程阻塞等待相同的锁,源自并发性的改善微乎其微;如果锁的粒度太细,那么过多的锁开销会使系统性能受到影响,而且代码变得相当复杂。除此之外,还要细致的考虑各种dead lock问题。
因此,对于某些关键数据结构(临界资源),可以考虑使用Lock Free的实现手段。一个Lock Free的程序能够确保执行它的所有线程至少有一个能够继续往下执行,从而免疫了死锁等问题。Lock Free算法需要对应的原子操作加以支持,比如CAS(compare-and-swap)及其变种。CAS实现的逻辑如下:bool CAS(inptr_t* addr, intptr_t oldv, intptr_t newv)
atomically{
   if(*addr == oldv)
   {
          *addr = newv;
          return true;
   }
   else
    {
   return false
   }

}

接下来给出有锁和无锁的链表实现,以及最后的性能测试数据。
有锁实现的链表:
lock_stack.h 1 #ifndef __LOCK_STACK_H
2 #define __LOCK_STACK_H
3 #include<pthread.h>
4
5 typedef int value_t;
6
7 struct cell
8 {
9   struct cell *next;
10   value_t value;
11 };
12
13 struct stack
14 {
15   struct cell *top;
16   pthread_mutex_t lock;
17 };
18
19 struct stack *stack_alloc(void);
20 void stack_push(struct stack *fp,struct cell *cl);
21 struct cell *stack_pop(struct stack *fp);
22
23 #endif

lock_stack.c 1 #include "lock_stack.h"
2 #include <pthread.h>
3 #include <stdlib.h>
4
5 struct stack *stack_alloc(void)
6 {
7   struct stack *fp;
8   if((fp = (struct stack*)malloc(sizeof(struct stack))) != NULL)
9   {
10         fp->top = NULL;
11         if(pthread_mutex_init(&fp->lock,NULL) != 0)
12         {
13             free(fp);
14             return NULL;
15         }
16   }
17   return fp;
18 }
19
20 void stack_push(struct stack *fp,struct cell *cl)
21 {
22   pthread_mutex_lock(&fp->lock);
23   
24   cl->next = fp->top;
25   fp->top = cl;
26
27   pthread_mutex_unlock(&fp->lock);
28 }
29
30 struct cell *stack_pop(struct stack *fp)
31 {
32   cell *head;
33   pthread_mutex_lock(&fp->lock);
34   
35   head = fp->top;
36   if(head == NULL)
37   {
38         pthread_mutex_unlock(&fp->lock);
39         return head;
40   }
41
42   fp->top = head->next;
43   pthread_mutex_unlock(&fp->lock);
44 }

无锁实现的链表:
lock_free_stack.h 1 #ifndef __LOCK_FREE_STACK_H
2 #define __LOCK_FREE_STACK_H
3 #include<pthread.h>
4
5 typedef int value_t;
6
7 struct cell
8 {
9   struct cell *next;
10   value_t value;
11 };
12
13 struct stack
14 {
15   struct cell *top;
16 };
17
18 struct stack *stack_alloc(void);
19 void stack_push(struct stack *fp,struct cell *cl);
20 struct cell *stack_pop(struct stack *fp);
21
22 #endif

lock_free_stack.c 1 #include "lock_free_stack.h"
2 #include <pthread.h>
3 #include <stdlib.h>
4
5 struct stack *stack_alloc(void)
6 {
7   struct stack *fp;
8   if((fp = (struct stack*)malloc(sizeof(struct stack))) != NULL)
9   {
10         fp->top = NULL;
11   }
12   return fp;
13 }
14
15 void stack_push(struct stack *fp,struct cell *cl)
16 {
17   do{
18         cl->next = fp->top;
19   }while(!__sync_bool_compare_and_swap(&fp->top,cl->next,cl));
20 }
21
22 struct cell *stack_pop(struct stack *fp)
23 {
24   cell *head,*next;
25   
26   do
27   {
28         head = fp->top;
29         if(head == NULL)
30         {
31             break;
32         }
33         next = head->next;
34   }while(!__sync_bool_compare_and_swap(&fp->top,head,next));
35   return head;
36 }

主程序使用100个线程,每个线程对同一链表进行500000次随机的push/pop操作(具体进行某个操作rand确定),并记录了每个线程的执行时间,代码如下: 1 #ifdef __LOCK_VERSION
2   #include "lock_stack.h"
3 #endif
4 #ifdef __LOCK_FREE_VERSION
5   #include "lock_free_stack.h"
6 #endif
7 #include <stdlib.h>
8 #include <stdio.h>
9 #include <unistd.h>
10 #include <time.h>
11
12 void* thr_fn(void *fp)
13 {
14   time_t t1 = time(NULL);
15   for(int i = 0; i != 500000; i++)
16   {
17         int r = rand()%2;
18         struct cell *cl;
19         
20     if(r == 0)
21     {
22           if((cl = (struct cell*)malloc(sizeof(struct cell))) != NULL)
23           {
24             stack_push((struct stack*)fp,cl);
25           }
26           printf("push!\n");
27     }
28     else
29     {
30         cl = stack_pop((struct stack*)fp);
31           if(cl != NULL)
32           {
33               //free(cl);
34           }
35           printf("pop!\n");
36         }
37   }
38
39   printf("cost time: %d\n",time(NULL)-t1);
40
41   return NULL;
42 }
43
44 int main()
45 {
46   struct stack *fp;
47   fp = stack_alloc();
48
49   pthread_t tid;
50   for(int i = 0; i < 100; i++)
51   {
52         pthread_create(&tid,NULL,thr_fn,(void *)fp);
53   }
54   
55   sleep(1000);
56
57   return 0;
58 }

makefile文件如下:1 lock_free:
2   g++ lock_free_stack.c main.c -lpthread -D__LOCK_FREE_VERSION -march=nocona -o lock_free
3 lock:
4   g++ lock_stack.c main.c -lpthread -D__LOCK_VERSION -o lock
5
6 clean:
7   rm lock lock_free

得到最终的测试数据如下所示:
http://pic002.cnblogs.com/images/2011/305098/2011122112105458.jpg
相比较而言,在性能上lock_free_stack有微弱的优势。
注:
1)大家在看代码时可能已经注意到了,main.c 33行free()函数被注释掉了,这是有意为之,在主动进行内存管理时,Lock Free结构会遇到ABA问题,需要CAS2配合Sequence Number加以解决。感兴趣的读者可以继续阅读文后的参考文献;
2)主线程需要等待所有的子线程执行完毕,这里不好使用pthread_join,本文作者采用取巧的办法,调用sleep函数进行等待,具体等待时间可能和测试环境相关;
3)__sync_bool_compare_and_swap 是由GCC提供的内置函数(GCC 4.1或更高版本),编译时需要指定-march参数;

参考资料:
搜狗实验室   C10K与高性能程序续篇  http://www.sogou.com/labs/report/4-2.pdf
W.Richard Stevens   Advanced Programming in the Unix Environment  11.6节 线程同步
设计不适用互斥锁的并发数据结构http://www.ibm.com/developerworks/cn/aix/library/au-multithreaded_structures2/
解决了“undefined reference to `__sync_bool_compare_and_swap_4”的编译错误   http://raylinn.iteye.com/blog/347323
compare_and_swap  http://en.wikipedia.org/wiki/Compare-and-swaphttp://www.cnblogs.com/liuhao/aggbug/2295671.html?type=1
本文链接
页: [1]
查看完整版本: Lock-Free的链表实现及与加锁实现的性能对比