0%

互斥锁的实现

本文整理自 OSTEP 第 28 章。

在并发编程中,为了能够原子的执行一系列指令,避免单处理器的中断和多处理器的并行执行带来的问题,程序员通常使用互斥锁放在临界区周围,保证了临界区能够像单条指令一样原子执行。

自旋锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
typedef struct __lock_t {int flag;} lock_t;

void init(lock_t *mutex) {
// 0 -> lock is available, 1 -> held
mutex->flag = 0;
}

void lock(lock_t *mutex) {
while (mutex->flag == 1) // TEST the flag (Line 1)
; // spin-wait (do nothing)
mutex->flag = 1; // now SET it! (Line 2)
}

void unlock(lock_t *mutex) {
mutex->flag = 0;
}

这个版本的锁实现很简单。当程序加锁的时候,flag 置为 1,当程序释放锁时,flag 置为 0。当一个线程想要获得锁,但是 flag==0,那么这个线程就会不停的执行 while 循环,直到获得这个锁。这个过程称为自旋。

这个实现有一个很严重的问题,由于处理器的中断,不能保证 Line 1 和 Line 2 之间不会插入别的线程的执行。假设有两个线程 A 和 B,它们都需要访问临界区。如果线程 A 在 Line 1 执行之后被操作系统切换,线程 B 开始执行并且获得了锁,然后再次切换线程 A 执行,这个时候同时有两个线程持有一把锁,锁的正确性不能保证。

因此,锁的实现通常需要借助硬件的特殊指令,保证 Line 1 和 Line 2 之间不会插入其他线程的代码。这种指令称为原子交换(atomic exchange)。一种原子交换指令 TestAndSet 的 C 伪代码实现如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static int flag=0;

void lock(){
while(TestAndSet(&flag,1)==1); // spin-wait
}
void unlock(){
flag=0;
}

int TestAndSet(int *ptr, int new) {
int old = *ptr;
*ptr = new;
return old;
}

硬件还提供了一种原子交换指令 CompareAndSwap,使用的范围更广。

1
2
3
4
5
6
7
int CompareAndSwap(int *ptr, int expected, int new) {
int actual = *ptr;
if (actual == expected) {
*ptr = new;
}
return actual;
}

在处理器层面,CompareAndSwap 依赖于 cmpxchgl 指令实现。

1
2
3
4
5
6
7
8
9
10
11
12
char compare_and_swap(int *ptr, int old, int new) {
unsigned char ret;
// Note that sete sets a ’byte’ not the word
__asm__ __volatile__ (
" lock\n"
" cmpxchgl %2,%1\n"
" sete %0\n"
: "=q" (ret), "=m" (*ptr)
: "r" (new), "m" (*ptr), "a" (old)
: "memory");
return ret;
}

上面两种自旋锁的实现不能保证等待锁的线程最终能够一定获得锁。一种解决思路是采用彩票机制,为每个等待锁的线程分配一个彩票,每个彩票对应一个值,这个值是递增的。锁的内部维护一个标识位。当标识位和线程的彩票对应的数值相等时,线程获得锁,当线程释放锁时,标识位加一。

我们可以通过硬件提供的 FetchAndAdd 指令实现。下面是这种方法的伪代码描述。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
int FetchAndAdd(int *ptr) {
int old = *ptr;
*ptr = old + 1;
return old;
}

typedef struct __lock_t {
int ticket;
int turn;
} lock_t;

void lock_init(lock_t *lock) {
lock->ticket = 0;
lock->turn = 0;
}

void lock(lock_t *lock) {
int myturn = FetchAndAdd(&lock->ticket);
while (lock->turn != myturn)
; // spin
}

void unlock(lock_t *lock) {
lock->turn = lock->turn + 1;
}

休眠锁

自旋锁存在一个问题:当一个线程拿不到锁时,会不停的执行 while 循环,造成时间的浪费。一种改进思路是:当一个线程没有拿到锁时,直接睡眠,当锁可用时再唤醒这个线程。

我们使用 queue 作为底层数据结构,使用链表保存所有等待锁的线程。

数据结构

1
2
3
4
5
6
7
8
9
10
11
typedef struct __lock_t {
int flag;
int guard;
queue_t *q;
} lock_t;

void lock_init(lock_t *m) {
m->flag = 0;
m->guard = 0;
queue_init(m->q);
}

其中,guard 是一个基于 TestAndSet 思路实现的自旋锁,用于访问 flag。flag 表明这个锁是否被占用。

lock

1
2
3
4
5
6
7
8
9
10
11
void lock(lock_t *m) {
while (TestAndSet(&m->guard, 1) == 1) ;
if (m->flag == 0) {
m->flag = 1; // lock is acquired
m->guard = 0;
} else {
queue_add(m->q, gettid());
m->guard = 0;
park();
}
}

flag 为 0:把 flag 置为 1,表明锁被占用。
flag 为 1 :把线程加入到休眠队列中,等待锁的释放后被唤醒。

unlock

1
2
3
4
5
6
7
8
9
10
void unlock(lock_t *m) {
while (TestAndSet(&m->guard, 1) == 1)
; //acquire guard lock by spinning
if (queue_empty(m->q))
m->flag = 0; // let go of lock; no one wants it
else
unpark(queue_remove(m->q)); // hold lock
// (for next thread!)
m->guard = 0;
}

当等待队列为空:把 flag 设置为 0,释放锁。
当等待队列不为空:唤醒一个等待队列中的线程。

存在的问题

如果 queue_add()park() 执行之间,有一个线程释放了锁,那么当前的线程在加入到阻塞队列后,立马就会被唤醒,这样会造成无谓的上下文切换,带来时间上的浪费。

一种解决方法是:使用一个新的系统调用 setpark(),让线程进入准备休眠的状态,如果此时有另一个线程执行了唤醒操作,当前线程立刻返回。

两阶段锁

实际情况中,锁的实现融合了两者的实现思路,Linux 中的 mutex 锁实现如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
void mutex_lock (int *mutex) {
int v;
/* Bit 31 was clear, we got the mutex (the fastpath) */
if (atomic_bit_test_set (mutex, 31) == 0)
return;
atomic_increment (mutex);
while (1) {
if (atomic_bit_test_set (mutex, 31) == 0) {
atomic_decrement (mutex);
return;
}
/* We have to waitFirst make sure the futex value
we are monitoring is truly negative (locked). */

v = *mutex;
if (v>= 0)
continue;
futex_wait (mutex, v);
}
}

void mutex_unlock (int *mutex) {
/* Adding 0x80000000 to counter results in 0 if and
only if there are not other interested threads */
if (atomic_add_zero (mutex, 0x80000000))
return;

/* There are other threads waiting for this mutex,
wake one of them up. */
futex_wake (mutex);
}

两阶段锁的设计思路如下:如果线程等待锁的过程中,另一个线程将要马上释放锁,这种情况更加适合使用自旋锁。

第一阶段会自旋若干次,试图获得锁。

如果在第一阶段没有获得锁,那么线程就会休眠,一直等待直到另一个线程唤醒它。

参考资料