并发

《OSTEP》线程并发小结

理解多线程为什么会有竞争状态问题

多线程对共享资源访问,可能不是原子操作,导致不确定结果。

条件量使用

1
2
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_signal(pthread_cond_t *cond);

wait 操作传入参数除了条件变量还有锁,这样做的目的是为了把进程休眠同时释放锁。否则其他线程就没法获得锁,唤醒休眠线程。

下面代码做线程之间同步存在什么问题

  1. spin 忙等待浪费cpu

但是这个能够实现线程之间同步操作么?

1
2
3
4
5
6
7
//全局标志变量
int flag=0;
//thread A
while(flag==0); //spin

//thread B singal
flag=1;

锁的设计

在设计锁之前,有几个指标能够来衡量设计方案的好坏。

  1. 能否实现互斥

  2. 公平性(防止饥饿)

  3. 实现锁带来的额外负担

最直接的想法是通过关闭中断来实现互斥,竞态条件下执行结果的不稳定,就是因为线程执行过程中被打断,调度。我们通过关闭中断,就能够实现对资源的互斥访问,但是这个在实际情况不太可能实现(多CPU,以及关闭中断带来的巨大风险)。

原子操作

通过硬件支持的同步原语来实现互斥,首先我们看一个没有原子操作的互斥锁方案

1
2
3
4
5
6
7
8
9
10
11
12
typedef struct __lock_t { int flag; } lock_t;
void init(lock_t *mutex) {
// 0 -> lock is available, 1 -> held
mutex->flag = 0;}

void lock(lock_t *mutex) {
while (mutex->flag == 1);// TEST the flag spin-wait (do nothing)
mutex->flag = 1;// now SET it!
}

void unlock(lock_t *mutex) {
mutex->flag = 0;}

上述方案的问题在于检查flag和设置flag不是原子操作,可能存在一种情况线程A执行完while()然后被调度,导致两个线程都获得锁。

改进方案如下

CAS(compare and swap)一种原子操作,

bool CAS(V, A,B) // 检查现有V 是否与A== 如果== 将V设置为B 同时返回true

自旋锁伪码实现

1
2
3
4
5
6
7
8
9
10
11
CAS(compare and swap)一种原子操作,
bool CAS(V, A,B) // 检查现有V 是否与A== 如果==  将V设置为B 同时返回true

自旋锁伪码实现
bool flag =false; //锁为true 表示被占用

while(!CAS(flag,false,true));
//do-something

//release lock
flag=false;

自旋锁虽然能够实现对临界区的互斥访问,但是带来了性能问题,spin通过忙等待形式去获取锁,导致了CPU资源的浪费。

比如场景:单核cpu运行多个线程

线程1获得lock进入临界区,然后被打断,线程2被调度,一直自旋等待锁被释放。

yield

针对上述问题,一种解决方法是通过主动放弃CPU,当线程发现锁被占用,主动将自己的状态由RUNNING->READY, 避免了自旋等待占用CPU。

1
while(!CAS(flag,false,true)) yield();

yield 带来的问题

  1. 大量的上下文切换操作

  2. 仍然没有解决饥饿问题

sleep队列

无论自旋,还是yield, 调度器的选择很多,导致了资源的浪费。如果线程发现自己暂时不能获取锁,将自己休眠,就带来了效率提升。

下面实现代码还是很巧妙的

  • guard作用: 用于自旋等待变量

相比于自旋等待整个临界区,gaurd的获取和释放间隔很短;eg.多核CPU, 当一个进程A拿到gaurd,然后另一个进程B被调度,此时B就拿不到gaurd,自旋等待,但是由于A临界区代码很短,很快A就会释放gaurd.

  • unlock()中, 唤醒线程之后,没有将flag设置为0

这么做的原因是线程被唤醒之后,可以看作是从park()返回,此时线程没有拿到guard,锁没有释放就直接传递给了被唤醒线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
typedef struct __lock_t {
int flag;
int guard;
queue_t *q;
} lock_t;

void lock_init(lock_t *m) {
m->flag= 0;
m->guard = 0;
queue_init(m->q);
}

void lock(lock_t *m) {
while (TestAndSet(&m->guard, 1) == 1); //acquire guard lock by spinning
if (m->flag == 0) {
m->flag = 1; // lock is acquired
m->guard = 0;}
else {
queue_add(m->q, gettid());
m->guard = 0;
park(); //线程休眠
}
}

void unlock(lock_t *m) {
while (TestAndSet(&m->guard, 1) == 1); //acquire guard lock by spinning
if (queue_empty(m->q))
m->flag = 0; // let go of lock; no one wants it
else
unpark(queue_remove(m->q)); // hold lock (for next thread!)
m->guard = 0;}

基于锁的数据结构

并发链表设计

关键在于数据结构的定义,节点如何定义(int, node*) ,链表怎么定义(锁+链表头)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// basic node structure
typedef struct __node_t {
int key;
struct __node_t *next;
} node_t;

// basic list structure (one used per list)
typedef struct __list_t {
node_t * head;
pthread_mutex_t lock;
} list_t;

void List_Init(list_t * L) {
L->head = NULL;
pthread_mutex_init(&L->lock, NULL);
}

int List_Insert(list_t * L, int key) {
node_t * new = malloc(sizeof(node_t)); //先分配节点,只有在更新全局资源时候加锁,而不是一上来就全部加锁
if (new == NULL) {
perror("malloc");
return -1; // fail
}
pthread_mutex_lock(&L->lock);
new->key = key;
new->next = L->head;
L->head = new;
pthread_mutex_unlock(&L->lock);
return 0; // success
}
//链表查找需要加锁么? 不涉及到全局变量的修改,但是如果不加锁,一个线程在修改正在查找的链表,也会导致race condition
int List_Lookup(list_t * L, int key) {
pthread_mutex_lock(&L->lock);
node_t * curr = L->head;
int rv=-1;
while (curr) {
if (curr->key == key) {
rv=0;//返回0 表示找到
break;
}
curr = curr->next;
}
pthread_mutex_unlock(&L->lock);
return rv; // faulure
}
并发哈希表

有了这个基础,设计并发哈希表就很简单,通过拉链方法来避免哈希冲突

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#define BUCKETS (101)

typedef struct __hash_t {
list_t lists[BUCKETS];
} hash_t;

void Hash_Init(hash_t * H) {
int i;
for (i = 0; i < BUCKETS; i++) {
List_Init(&H->lists[i]);
}
}

int Hash_Insert(hash_t * H, int key) {
int bucket = key % BUCKETS;
return List_Insert(&H->lists[bucket], key);
}

int Hash_Lookup(hash_t * H, int key) {
int bucket = key % BUCKETS;
return List_Lookup(&H->lists[bucket], key);
}

信号量

什么是信号量?

信号量和条件变量区别

信号量用来做lock,也能够用来做条件变量,关键在于信号量的初始化。

信号量=变量+操作

信号量操作伪码

1
2
3
4
5
6
7
8
9
int sem_wait(sem_t *s) {
decrement the value of semaphore s by one
wait if value of semaphore s is negative
}

int sem_post(sem_t *s) {
increment the value of semaphore s by one
if there are one or more threads waiting, wake one
}

信号量做二元锁

1
2
3
4
5
sem_t m;
sem_init(&m, 0, 1); // initialize semaphore to 1
sem_wait(&m);
// critical section here
sem_post(&m);

信号量做条件变量

下面代码目的是同步父子进程,父进程等待子进程结束。

注意条件变量被初始化为0,分两个情况考虑,如果子进程被创建之后先执行,sem_post将信号量+1,父进程执行sem_wait正常返回;如果父进程先执行,sem-1小于0,父进程被挂起,子进程执行,将sem+1,然后唤醒父进程。 所以符合我们预定执行顺序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
sem_t s
void *child(void *arg) {
printf("child\n");
sem_post(&s); // signal here: child is done
return NULL;}

int main(int argc, char *argv[]) {
sem_init(&s, 0, 0); //
printf("parent: begin\n");
pthread_t c;
Pthread_create(c, NULL, child, NULL);
sem_wait(&s); // wait here for child
printf("parent: end\n");
return 0;}
读写锁

允许多个读,一个写,第一个读者自动获得写锁,最后一个读锁释放,写锁也释放。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
typedef struct __lock_t{
sem_t lock;
sem_t write;
int readers;
} rw_lock;

init(rw_lock*m){
sem_init(m->lock,0,1);
sem_init(m->write,0,1);
m->readers=0;
}

void rw_acquire_read(rw_lock*m){
sem_wait(m->lock);
m->readers++;
if(m->readers==1) sem_wait(m->write);
sem_post(m->lock);
}

void rw_release_read(rw_lock*m){
sem_wait(m->lock);
m->readers--;
if(m->readers==0) sem_post(m->write);
sem_post(m->lock);
}

void rw_acquire_write(rw_lock*m){
sem_wait(m->write);
}

void rw_acquire_write(rw_lock*m){
sem_post(m->write);
}