找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 4378|回复: 0

Lock-Free的链表实现及与加锁实现的性能对比

[复制链接]
发表于 2011-12-22 11:25:57 | 显示全部楼层 |阅读模式
在编写多线程程序时,临界资源的处理常常需要互斥量、读写锁等来加以保护。这时需要考虑锁的粒度问题,粒度太粗,会出现很多线程阻塞等待相同的锁,源自并发性的改善微乎其微;如果锁的粒度太细,那么过多的锁开销会使系统性能受到影响,而且代码变得相当复杂。除此之外,还要细致的考虑各种dead lock问题。
因此,对于某些关键数据结构(临界资源),可以考虑使用Lock Free的实现手段。一个Lock Free的程序能够确保执行它的所有线程至少有一个能够继续往下执行,从而免疫了死锁等问题。Lock Free算法需要对应的原子操作加以支持,比如CAS(compare-and-swap)及其变种。CAS实现的逻辑如下:bool CAS(inptr_t* addr, intptr_t oldv, intptr_t newv)
atomically{
     if(*addr == oldv)
     {
          *addr = newv;
          return true;
     }
     else
    {
     return false
     }

}

接下来给出有锁和无锁的链表实现,以及最后的性能测试数据。
有锁实现的链表:
lock_stack.h 1 #ifndef __LOCK_STACK_H
2 #define __LOCK_STACK_H
3 #include<pthread.h>
4
5 typedef int value_t;
6
7 struct cell
8 {
9     struct cell *next;
10     value_t value;
11 };
12
13 struct stack
14 {
15     struct cell *top;
16     pthread_mutex_t lock;
17 };
18
19 struct stack *stack_alloc(void);
20 void stack_push(struct stack *fp,struct cell *cl);
21 struct cell *stack_pop(struct stack *fp);
22
23 #endif

lock_stack.c 1 #include "lock_stack.h"
2 #include <pthread.h>
3 #include <stdlib.h>
4
5 struct stack *stack_alloc(void)
6 {
7     struct stack *fp;
8     if((fp = (struct stack*)malloc(sizeof(struct stack))) != NULL)
9     {
10         fp->top = NULL;
11         if(pthread_mutex_init(&fp->lock,NULL) != 0)
12         {
13             free(fp);
14             return NULL;
15         }
16     }
17     return fp;
18 }
19
20 void stack_push(struct stack *fp,struct cell *cl)
21 {
22     pthread_mutex_lock(&fp->lock);
23     
24     cl->next = fp->top;
25     fp->top = cl;
26
27     pthread_mutex_unlock(&fp->lock);
28 }
29
30 struct cell *stack_pop(struct stack *fp)
31 {
32     cell *head;
33     pthread_mutex_lock(&fp->lock);
34     
35     head = fp->top;
36     if(head == NULL)
37     {
38         pthread_mutex_unlock(&fp->lock);
39         return head;
40     }
41
42     fp->top = head->next;
43     pthread_mutex_unlock(&fp->lock);
44 }

无锁实现的链表:
lock_free_stack.h 1 #ifndef __LOCK_FREE_STACK_H
2 #define __LOCK_FREE_STACK_H
3 #include<pthread.h>
4
5 typedef int value_t;
6
7 struct cell
8 {
9     struct cell *next;
10     value_t value;
11 };
12
13 struct stack
14 {
15     struct cell *top;
16 };
17
18 struct stack *stack_alloc(void);
19 void stack_push(struct stack *fp,struct cell *cl);
20 struct cell *stack_pop(struct stack *fp);
21
22 #endif

lock_free_stack.c 1 #include "lock_free_stack.h"
2 #include <pthread.h>
3 #include <stdlib.h>
4
5 struct stack *stack_alloc(void)
6 {
7     struct stack *fp;
8     if((fp = (struct stack*)malloc(sizeof(struct stack))) != NULL)
9     {
10         fp->top = NULL;
11     }
12     return fp;
13 }
14
15 void stack_push(struct stack *fp,struct cell *cl)
16 {
17     do{
18         cl->next = fp->top;
19     }while(!__sync_bool_compare_and_swap(&fp->top,cl->next,cl));
20 }
21
22 struct cell *stack_pop(struct stack *fp)
23 {
24     cell *head,*next;
25     
26     do
27     {
28         head = fp->top;
29         if(head == NULL)
30         {
31             break;
32         }
33         next = head->next;
34     }while(!__sync_bool_compare_and_swap(&fp->top,head,next));
35     return head;
36 }

主程序使用100个线程,每个线程对同一链表进行500000次随机的push/pop操作(具体进行某个操作rand确定),并记录了每个线程的执行时间,代码如下: 1 #ifdef __LOCK_VERSION
2     #include "lock_stack.h"
3 #endif
4 #ifdef __LOCK_FREE_VERSION
5     #include "lock_free_stack.h"
6 #endif
7 #include <stdlib.h>
8 #include <stdio.h>
9 #include <unistd.h>
10 #include <time.h>
11
12 void* thr_fn(void *fp)
13 {
14     time_t t1 = time(NULL);
15     for(int i = 0; i != 500000; i++)
16     {
17         int r = rand()%2;
18         struct cell *cl;
19         
20       if(r == 0)
21       {
22           if((cl = (struct cell*)malloc(sizeof(struct cell))) != NULL)
23           {
24             stack_push((struct stack*)fp,cl);
25           }
26           printf("push!\n");
27       }
28       else
29       {
30           cl = stack_pop((struct stack*)fp);
31           if(cl != NULL)
32           {
33                 //free(cl);
34           }
35           printf("pop!\n");
36         }
37     }
38
39     printf("cost time: %d\n",time(NULL)-t1);
40
41     return NULL;
42 }
43
44 int main()
45 {
46     struct stack *fp;
47     fp = stack_alloc();
48
49     pthread_t tid;
50     for(int i = 0; i < 100; i++)
51     {
52         pthread_create(&tid,NULL,thr_fn,(void *)fp);
53     }
54     
55     sleep(1000);
56
57     return 0;
58 }

makefile文件如下:1 lock_free:
2     g++ lock_free_stack.c main.c -lpthread -D__LOCK_FREE_VERSION -march=nocona -o lock_free
3 lock:
4     g++ lock_stack.c main.c -lpthread -D__LOCK_VERSION -o lock
5
6 clean:
7     rm lock lock_free

得到最终的测试数据如下所示:

相比较而言,在性能上lock_free_stack有微弱的优势。
注:
1)大家在看代码时可能已经注意到了,main.c 33行free()函数被注释掉了,这是有意为之,在主动进行内存管理时,Lock Free结构会遇到ABA问题,需要CAS2配合Sequence Number加以解决。感兴趣的读者可以继续阅读文后的参考文献;
2)主线程需要等待所有的子线程执行完毕,这里不好使用pthread_join,本文作者采用取巧的办法,调用sleep函数进行等待,具体等待时间可能和测试环境相关;
3)__sync_bool_compare_and_swap 是由GCC提供的内置函数(GCC 4.1或更高版本),编译时需要指定-march参数;

参考资料:
[1]搜狗实验室   C10K与高性能程序续篇  http://www.sogou.com/labs/report/4-2.pdf
[2]W.Richard Stevens     Advanced Programming in the Unix Environment  11.6节 线程同步
[3]设计不适用互斥锁的并发数据结构  http://www.ibm.com/developerworks/cn/aix/library/au-multithreaded_structures2/
[4]解决了“undefined reference to `__sync_bool_compare_and_swap_4”的编译错误   http://raylinn.iteye.com/blog/347323
[5]compare_and_swap  http://en.wikipedia.org/wiki/Compare-and-swap
本文链接
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-5-4 05:23 , Processed in 0.018528 second(s), 7 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表