《OSTEP》线程并发小结
理解多线程为什么会有竞争状态问题 多线程对共享资源访问,可能不是原子操作,导致不确定结果。
条件量使用 1 2 int pthread_cond_wait (pthread_cond_t *cond, pthread_mutex_t *mutex) ;int pthread_cond_signal (pthread_cond_t *cond) ;
wait 操作传入参数除了条件变量还有锁,这样做的目的是为了把进程休眠同时释放锁。否则其他线程就没法获得锁,唤醒休眠线程。
下面代码做线程之间同步存在什么问题
spin 忙等待浪费cpu
但是这个能够实现线程之间同步操作么?
1 2 3 4 5 6 7 int flag=0 ;while (flag==0 ); flag=1 ;
锁的设计 在设计锁之前,有几个指标能够来衡量设计方案的好坏。
能否实现互斥
公平性(防止饥饿)
实现锁带来的额外负担
最直接的想法是通过关闭中断来实现互斥,竞态条件下执行结果的不稳定,就是因为线程执行过程中被打断,调度。我们通过关闭中断,就能够实现对资源的互斥访问,但是这个在实际情况不太可能实现(多CPU,以及关闭中断带来的巨大风险)。
原子操作 通过硬件支持的同步原语来实现互斥,首先我们看一个没有原子操作的互斥锁方案
1 2 3 4 5 6 7 8 9 10 11 12 typedef struct __lock_t { int flag; } lock_t ;void init (lock_t *mutex) { mutex->flag = 0 ;} void lock (lock_t *mutex) { while (mutex->flag == 1 ); mutex->flag = 1 ; } void unlock (lock_t *mutex) { mutex->flag = 0 ;}
上述方案的问题在于检查flag和设置flag不是原子操作,可能存在一种情况线程A执行完while()然后被调度,导致两个线程都获得锁。
改进方案如下
CAS(compare and swap)一种原子操作,
bool CAS(V, A,B) // 检查现有V 是否与A== 如果== 将V设置为B 同时返回true
自旋锁伪码实现
1 2 3 4 5 6 7 8 9 10 11 CAS(compare and swap)一种原子操作, bool CAS (V, A,B) 自旋锁伪码实现 bool flag =false ; while (!CAS(flag,false ,true ));flag=false ;
自旋锁虽然能够实现对临界区的互斥访问,但是带来了性能问题,spin通过忙等待形式去获取锁,导致了CPU资源的浪费。
比如场景:单核cpu运行多个线程
线程1获得lock进入临界区,然后被打断,线程2被调度,一直自旋等待锁被释放。
yield 针对上述问题,一种解决方法是通过主动放弃CPU,当线程发现锁被占用,主动将自己的状态由RUNNING->READY, 避免了自旋等待占用CPU。
1 while (!CAS(flag,false ,true )) yield ();
yield 带来的问题
大量的上下文切换操作
仍然没有解决饥饿问题
sleep队列 无论自旋,还是yield, 调度器的选择很多,导致了资源的浪费。如果线程发现自己暂时不能获取锁,将自己休眠,就带来了效率提升。
下面实现代码还是很巧妙的
相比于自旋等待整个临界区,gaurd的获取和释放间隔很短;eg.多核CPU, 当一个进程A拿到gaurd,然后另一个进程B被调度,此时B就拿不到gaurd,自旋等待,但是由于A临界区代码很短,很快A就会释放gaurd.
unlock()中, 唤醒线程之后,没有将flag设置为0
这么做的原因是线程被唤醒之后,可以看作是从park()返回,此时线程没有拿到guard,锁没有释放就直接传递给了被唤醒线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 typedef struct __lock_t { int flag; int guard; queue_t *q; } lock_t ; void lock_init (lock_t *m) { m->flag= 0 ; m->guard = 0 ; queue_init(m->q); } void lock (lock_t *m) { while (TestAndSet(&m->guard, 1 ) == 1 ); if (m->flag == 0 ) { m->flag = 1 ; m->guard = 0 ;} else { queue_add(m->q, gettid()); m->guard = 0 ; park(); } } void unlock (lock_t *m) { while (TestAndSet(&m->guard, 1 ) == 1 ); if (queue_empty(m->q)) m->flag = 0 ; else unpark(queue_remove(m->q)); m->guard = 0 ;}
基于锁的数据结构 并发链表设计 关键在于数据结构的定义,节点如何定义(int, node*) ,链表怎么定义(锁+链表头)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 typedef struct __node_t { int key; struct __node_t *next ; } node_t ; typedef struct __list_t { node_t * head; pthread_mutex_t lock; } list_t ; void List_Init (list_t * L) { L->head = NULL ; pthread_mutex_init(&L->lock, NULL ); } int List_Insert (list_t * L, int key) { node_t * new = malloc (sizeof (node_t )); if (new == NULL ) { perror("malloc" ); return -1 ; } pthread_mutex_lock(&L->lock); new ->key = key; new ->next = L->head; L->head = new ; pthread_mutex_unlock(&L->lock); return 0 ; } int List_Lookup (list_t * L, int key) { pthread_mutex_lock(&L->lock); node_t * curr = L->head; int rv=-1 ; while (curr) { if (curr->key == key) { rv=0 ; break ; } curr = curr->next; } pthread_mutex_unlock(&L->lock); return rv; }
并发哈希表 有了这个基础,设计并发哈希表就很简单,通过拉链方法来避免哈希冲突
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 #define BUCKETS (101) typedef struct __hash_t { list_t lists[BUCKETS]; } hash_t ; void Hash_Init (hash_t * H) { int i; for (i = 0 ; i < BUCKETS; i++) { List_Init(&H->lists[i]); } } int Hash_Insert (hash_t * H, int key) { int bucket = key % BUCKETS; return List_Insert(&H->lists[bucket], key); } int Hash_Lookup (hash_t * H, int key) { int bucket = key % BUCKETS; return List_Lookup(&H->lists[bucket], key); }
信号量
什么是信号量?
信号量和条件变量区别
信号量用来做lock,也能够用来做条件变量,关键在于信号量的初始化。
信号量=变量+操作
信号量操作伪码
1 2 3 4 5 6 7 8 9 int sem_wait (sem_t *s) { decrement the value of semaphore s by one wait if value of semaphore s is negative } int sem_post (sem_t *s) { increment the value of semaphore s by one if there are one or more threads waiting, wake one }
信号量做二元锁
1 2 3 4 5 sem_t m;sem_init(&m, 0 , 1 ); sem_wait(&m); sem_post(&m);
信号量做条件变量
下面代码目的是同步父子进程,父进程等待子进程结束。
注意条件变量被初始化为0,分两个情况考虑,如果子进程被创建之后先执行,sem_post将信号量+1,父进程执行sem_wait正常返回;如果父进程先执行,sem-1小于0,父进程被挂起,子进程执行,将sem+1,然后唤醒父进程。 所以符合我们预定执行顺序。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 sem_t svoid *child (void *arg) {printf ("child\n" );sem_post(&s); return NULL ;}int main (int argc, char *argv[]) { sem_init(&s, 0 , 0 ); printf ("parent: begin\n" ); pthread_t c; Pthread_create(c, NULL , child, NULL ); sem_wait(&s); printf ("parent: end\n" ); return 0 ;}
读写锁 允许多个读,一个写,第一个读者自动获得写锁,最后一个读锁释放,写锁也释放。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 typedef struct __lock_t { sem_t lock; sem_t write ; int readers; } rw_lock; init(rw_lock*m){ sem_init(m->lock,0 ,1 ); sem_init(m->write ,0 ,1 ); m->readers=0 ; } void rw_acquire_read (rw_lock*m) { sem_wait(m->lock); m->readers++; if (m->readers==1 ) sem_wait(m->write ); sem_post(m->lock); } void rw_release_read (rw_lock*m) { sem_wait(m->lock); m->readers--; if (m->readers==0 ) sem_post(m->write ); sem_post(m->lock); } void rw_acquire_write (rw_lock*m) { sem_wait(m->write ); } void rw_acquire_write (rw_lock*m) { sem_post(m->write ); }