自定义线程池

上一篇文章简单的介绍了一个线程池的基本实现思路。在上一篇的代码实现中,其实还有很多问题。第一,线程没有被重用,我们只是模拟了一个任务拿走一个线程,执行完毕后再“还”回去的过程,其实是在线程池中新创建了一个线程类,被使用过的线程将等待被Python的垃圾回收机制回收;第二,如果执行的任务的数量小于线程池的容量,那我们在线程池的构造方法中,预先填满的线程类就显得非常浪费了

改进思路:

  • 之前是将线程放入到队列中,启动线程池后,整个队列就会充满待使用的线程
    • 改进:将任务放入到队列中,然任务排队进入队列。线程的数量通过列表的长度控制,每创建一个线程,就在列表中append一下
  • 之前队列中的线程,每一个任务取走之后,执行完任务都会创建一个新的线程放入到队列中
    • 改进:每个线程执行完任务后,不再释放线程资源,循环去任务队列中获取任务去执行,直到判断拿到的是空值而不是一个任务时,退出自己的线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
import queue
import threading
import contextlib
import time

# 队列中的空值
StopEvent = object()


class ThreadPool(object):

def __init__(self, max_num, max_task_num = None):
"""
构造方法
:param max_num: 最多有多少个线程
:param max_task_num: 接收任务队列的长度
"""
if max_task_num:
self.q = queue.Queue(max_task_num)
else:
self.q = queue.Queue()
self.max_num = max_num
self.cancel = False
self.terminal = False
self.generate_list = [] # 当前已经创建的线程
self.free_list = [] # 当前还空闲多少线程

def run(self, func, args, callback=None):
"""
线程池执行一个任务
:param func: 任务函数
:param args: 任务函数所需参数
:param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
:return: 如果线程池已经终止,则返回True否则None
"""

# self.cancel是在构造方法和self.close方法中设置的值
# 在close方法中将该值设置为False,意为当前任务队列中的所有任务全部已经执行完毕
if self.cancel:
return
if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
# 如果空闲的线程为0 且 当前已经创建的线程数量小于设置的最大线程数量
# 调用本类的其他方法创建一个新的线程
self.generate_thread()
# 将run方法收到的三个参数组成一个数据
w = (func, args, callback,)
# 放入到任务队列当中
self.q.put(w)

def generate_thread(self):
"""
创建一个线程
"""
# 每一个线程被创建出来后都去执行了本类中的call方法
t = threading.Thread(target=self.call)
# 启动线程
t.start()

def call(self):
"""
循环去获取任务函数并执行任务函数
"""

# 获取当前线程
current_thread = threading.currentThread
# 将当前线程加入到"当前已创建线程"列表中
self.generate_list.append(current_thread)

# 在队列中获取一个任务
event = self.q.get()

# while循环,当event不为空对象的时候
while event != StopEvent:
# 队列中的每个元素由一个元祖构成(func, args, callback,)
func, arguments, callback = event
try:
# 去执行函数
# 这里func对应的是action函数
# arguments参数对应了i
# result接收这个函数的返回值
result = func(*arguments)
success = True
except Exception as ex:
success = False
result = None

# 判断callback函数是否为空
if callback is not None:
try:
# 执行这个回调函数,将执行的状态和返回的结果传递给回调函数
callback(success, result)
except Exception as ex:
pass

# 以上action函数执行完后,该线程变为空闲状态
# 以下代码将该线程加入到"当前空闲线程"列表中,等待线程复用
with self.worker_state(self.free_list, current_thread):
# 默认的self.terminal为False,会一直去队列中取任务执行
# 在terminate方法中,设置了self.terminal的值为True,也就是说,一旦调用terminate方法,这个线程将被回收,不再执行新的任务
if self.terminal:
event = StopEvent
else:
# 复用当前线程,再去队列中获取一个任务
event = self.q.get()
else:
# event为空对象,说明当前已经没有需要执行的任务了
# 每个线程把自己的线程对象从"当前已创建线程"列表中移除(不再复用)
self.generate_list.remove(current_thread)

def close(self):
"""
执行完所有的任务后,所有线程停止
"""
self.cancel = True
# 获取当前线程列表的长度(有多少个复用的线程)
full_size = len(self.generate_list)
# 因为任务已经执行完毕,任务队列为空,所有的线程都在等待获取新的任务
# 向任务队列中,插入StopEvent空值,让每个线程拿到这个任务的时候,退出掉自己的线程
while full_size:
# 当前可以复用的线程列表中有多少个线程对象就插入多少个空值
self.q.put(StopEvent)
full_size -= 1

def terminate(self):
"""
无论是否还有任务,终止线程
"""
self.terminal = True

# 把队列中的所有任务全部设置为空对象(结束线程的标识)
# 每个拿到这个空对象任务的线程都会结束自己的线程
while self.generate_list:
self.q.put(StopEvent)
# 所有的线程都被结束后,清空队列中的所有内容
self.q.empty()

# 上下文管理的装饰器,以下函数可以被with调用执行
@contextlib.contextmanager
def worker_state(self, state_list, worker_thread):
"""
用于记录线程中正在等待的线程数
"""
state_list.append(worker_thread)
try:
yield
finally:
state_list.remove(worker_thread)


if __name__ == '__main__':
# 创建一个线程池
# 最多同时创建3个线程在处理任务(消费者从队列中消费数据)
# 任务队列中同一时间最多存放5个任务(生产者向队列中存放数据)
pool = ThreadPool(3, max_task_num=5)

# 回调函数,每一个子线程处理完一个任务之后,都会调用这个方法
# 用来处理子线程执行任务后的执行状态和返回结果
def callback(status, result):
# status, execute action status
# result, execute action return value
pass

# 真正实现多线程的函数
def action(i):
print(i)

# 创建30个任务
for i in range(30):
# 每一次迭代就执行一次run方法
ret = pool.run(action, (i,), callback)

time.sleep(5)
print(len(pool.generate_list), len(pool.free_list))
print(len(pool.generate_list), len(pool.free_list))

# 关闭线程池
pool.close()

# 终止线程池并清空任务队列
# pool.terminate()