线程锁

在多线程的使用过程中,需要考虑的一个问题就是线程安全。如果使用多线程对多个对象进行操作,也许不会有异常的情况的产生,因为每个对象是独立的,每个对象的改变对其他对象没有影响。但是如果使用多线程操作同一个对象,极有可能产生脏数据;但是同时访问同一个对象不会产生脏数据

多线程修改同一对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import threading
import time

NUM = 5

def func():
global NUM
NUM -= 1
time.sleep(1)
print(NUM)

for i in range(5):
t = threading.Thread(target=func)
t.start()

------------
00
0
0
0

上面的小程序中,定义了一个全局变量NUM,在func方法中对全局变量NUM做自减一的操作,我们理想中的结果是

1
2
3
4
5
4
3
2
1
0

但实际结果却全都是0

这是因为每个线程运行到sleep(1)的时候都在这里卡住,最后,所有的线程都执行完了NUM -= 1,这个时候全局变量NUM已经=0,而第一个线程还没有执行完sleep(1),所以最后所有的线程打印出来的都是0

Lock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import threading
import time

NUM = 5

# 函数里接收一个锁对象
def func(lock):
global NUM

# 获取锁
lock.acquire()
NUM -= 1
time.sleep(1)
print(NUM)

# 释放锁
lock.release()

# 创建一个互斥锁
lock = threading.Lock()

for i in range(5):
# 把创建的锁对象当做参数传递给函数
t = threading.Thread(target=func, args=(lock,))
t.start()

------------
4
3
2
1
0

RLock

RLock和Lock的使用方法相同,但却是Lock的”升级版”,RLock支持在函数内的多层嵌套锁,例如

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import threading
import time

NUM = 5

# 函数里接收一个锁对象
def func(lock):
global NUM

# 获取锁
lock.acquire()
NUM -= 1

# 嵌套锁
lock.acquire()
time.sleep(1)
lock.release()

print(NUM)

# 释放锁
lock.release()

# 创建一个互斥锁
lock = threading.RLock()

for i in range(5):
# 把创建的锁对象当做参数传递给函数
t = threading.Thread(target=func, args=(lock,))
t.start()

Semaphore

信号量,可以放出一批锁,类似于批量“授权”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import threading
import time

NUM = 5

# 函数里接收一个锁对象
def func(i, lock):
global NUM

# 获取锁,这里最多可以发放3把锁
lock.acquire()
NUM -= 1
time.sleep(1)
print(i, NUM)

# 释放锁
lock.release()

# 创建一个锁,锁的数量有3个
lock = threading.BoundedSemaphore(3)

for i in range(5):
# 把创建的锁对象当做参数传递给函数
t = threading.Thread(target=func, args=(i, lock,))
t.start()

------------
0 2
1 2
2 2
4 0
3 0

从结果中可以看出,总共有3把锁被放出,所以前三个线程打印值是相同的,三个线程依次修改NUM的值,然后依次进入time.sleep(1)等待(此时NUM已经被自减了3次,值=2),最后依次打印出NUM的值

event

事件, 同样是“批量授权”,但是和信号量指定创建锁的数量不同的是,事件要么锁住所有的线程,要么释放所有的线程,不能自定义锁的数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import threading

# 接收一个event对象
def func(i, event):
print(i)

# 这里的wait()方法会阻塞所有的线程,但是是否阻塞取决于event维护的全局Flag
event.wait() # wait会去检测"Flag"的值的真假
print(i + 100)

# 创建一个event对象
event = threading.Event()

# 设置event对象中的"Flag"为False
# 默认情况下就是False
event.clear()

for i in range(5):
t = threading.Thread(target=func, args=(i, event))
t.start()

inp = input("> ")
if inp == "go":
# 设置event对象中的"Flag"为True
# 释放了所有的锁
event.set()

------------
0
1
2
3
4
> go
100
103
101
104
102
  • wait:检测“Flag”的值
  • clear:将“Flag”设置为False
  • set:将“Flag”设置为True

Condition

条件,上面说了事件的特性,就是要阻塞就阻塞住所有的子线程,要释放就释放掉所有的锁,不支持信号量中的自定义释放锁的数量

条件锁,既支持事件阻塞所有子线程的特性,又具有信号量指定释放锁数量的特性

condition的第一种用法

指定释放锁的数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import threading

# 接收一个condition对象
def func(i, con):
print(i)

# 给线程上锁
con.acquire()
# wait会根据notify指定的数量去释放锁,默认锁住所有线程
con.wait()
print(i + 100)
con.release()


condition = threading.Condition()


for i in range(5):
t = threading.Thread(target=func, args=(i, condition))
t.start()

while True:
inp = input("> ")
if inp == "q":
break

# 指定释放锁数量的三剑客
# 语法中规定他们必须按照这个顺序写
condition.acquire()
condition.notify(int(inp)) # 这里的int类型的inp变量,就是指定释放锁的数量
condition.release()

------------
0
1
2
3
4
> 1
> 100
2
> 102
101
3
> 104
103

最后只打印两个数字,是因为总共有5个测试数据,前面已经累计释放了三个子线程处理的测试数据,所以第三次,我即使指定了释放3个锁(当时程序中只剩两个子线程被锁住),也只会出现2个值

condition的第二种用法

如果条件为真,释放一个锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import threading

def select():
ret = False
inp = input("> ")
if inp == "true":
ret = True
else:
ret = False
return ret

# 接收一个condition对象
def func(i, con):
print(i)

# 给线程上锁
con.acquire()
# wait_for会根据参数的值的真假来选择是否释放一个锁
# 这里传入的是一个函数,也就是说如果函数的返回值为真,就释放一个锁
con.wait_for(select)
print(i + 100)
con.release()

# 创建一把条件锁
condition = threading.Condition()

for i in range(5):
t = threading.Thread(target=func, args=(i, condition))
t.start()

------------
0
> 1
2
3
4
true
100
> true
101
>

con.wait_for(select)中,我们传入了一个函数名,当第一个子线程执行到这一行的时候,会自动去执行这个函数,从而打印出了第一个>符号,之后就停在了等待用户输入的那一行input()

其他所有的子线程都停在了con.acquare()这一行,等待前面的子线程释放锁

这时,我输入一个trueselect函数返回了一个True, wait_for()接收到这个True之后,就继续向下执行,最后释放掉自己的锁

接着下一个被调度到的子线程执行到wait_for()又去执行了里面传入的函数……以此类推

Timer

Timer不是锁,是定时器,用来指定x秒后执行某些操作

1
2
3
4
5
6
7
8
9
10
11
12
from threading import Timer


def hello():
print("hello, world")


t = Timer(1, hello)
t.start()

------------
hello, world

结果没有立即输入,而是等了一秒钟之后输出了hello, world