Python中先进先出队列queue的基本使用

生活中非常常见的一种场景就是排队,早期的鸟儿🐦有虫🐛吃,越早排队就越早能办理业务。本篇文章介绍Python中的“排队系统”,先进先出队列的基本使用

put数据

消息队列的长度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 先进先出队列
import queue

# 最多接收10个数据
q = queue.Queue(10)

# put 向队列中添加数据
q.put(15)
q.put(59)

# 获取当前队列长度
print(q.qsize())

# 取出最前面的一个数据
print(q.get())

超时时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 先进先出队列
import queue

# 最多接收10个数据
q = queue.Queue(2)

# put 向队列中添加数据
q.put(15)
q.put(59)
print(q.qsize())

# 超时时间为2秒
q.put('PolarSnow', timeout=2)

# 获取当前队列长度
print(q.qsize())

# 取出最前面的一个数据
print(q.get())

------------
2
Traceback (most recent call last):
File "/Users/lvrui/PycharmProjects/untitled/10/temp.py", line 157, in <module>
q.put('PolarSnow', timeout=2)
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/queue.py", line 141, in put
raise Full
queue.Full

现在队列的最大长度设置为2,当第三个数据向里面插入时,最多等待两秒,两秒后还没有进入到队列中就报错

设置队列不阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 先进先出队列
import queue

# 最多接收10个数据
q = queue.Queue(2)

# put 向队列中添加数据
q.put(15)
q.put(59)

# 设置队列不阻塞(当队列满的时候再插入数据,直接报错)
q.put('PolarSnow', block=False)

# 获取当前队列长度
print(q.qsize())

# 取出最前面的一个数据
print(q.get())

默认程序会阻塞,等待新的值插入到队列当中,使用了block=False参数后,强制设置为不阻塞,一旦超出队列长度,立即抛出异常

get数据

设置超时时间

1
2
3
4
5
6
7
8
9
10
import queue

q = queue.Queue(2)
q.put(15)
q.put(59)

print(q.qsize())
print(q.get())
print(q.get())
print(q.get(timeout=2))

当取值的次数大于队列的长度的时候就会产生阻塞,设置超时时间意为最多等待x秒,队列中再没有数据,就抛出异常

设置不阻塞

1
2
3
4
5
6
7
8
9
10
import queue

q = queue.Queue(2)
q.put(15)
q.put(59)

print(q.qsize())
print(q.get())
print(q.get())
print(q.get(block=False))

获取队列的次数大于队列长度时,默认会阻塞,通过设置block=False来实现非阻塞,立即抛出异常

Queue类

__init__

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def __init__(self, maxsize=0):
self.maxsize = maxsize
self._init(maxsize)

# mutex must be held whenever the queue is mutating. All methods
# that acquire mutex must release it before returning. mutex
# is shared between the three conditions, so acquiring and
# releasing the conditions also acquires and releases mutex.
self.mutex = threading.Lock()

# Notify not_empty whenever an item is added to the queue; a
# thread waiting to get is notified then.
self.not_empty = threading.Condition(self.mutex)

# Notify not_full whenever an item is removed from the queue;
# a thread waiting to put is notified then.
self.not_full = threading.Condition(self.mutex)

# Notify all_tasks_done whenever the number of unfinished tasks
# drops to zero; thread waiting to join() is notified to resume
self.all_tasks_done = threading.Condition(self.mutex)
self.unfinished_tasks = 0

如果不给这个构造方法传参数,队列的长度为无限大

get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def get(self, block=True, timeout=None):
'''Remove and return an item from the queue.

If optional args 'block' is true and 'timeout' is None (the default),
block if necessary until an item is available. If 'timeout' is
a non-negative number, it blocks at most 'timeout' seconds and raises
the Empty exception if no item was available within that time.
Otherwise ('block' is false), return an item if one is immediately
available, else raise the Empty exception ('timeout' is ignored
in that case).
'''
with self.not_empty:
if not block:
if not self._qsize():
raise Empty
elif timeout is None:
while not self._qsize():
self.not_empty.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = time() + timeout
while not self._qsize():
remaining = endtime - time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
item = self._get()
self.not_full.notify()
return item

参数为是否阻塞和超时时间

get_nowait

1
2
3
4
5
6
7
def get_nowait(self):
'''Remove and return an item from the queue without blocking.

Only get an item if one is immediately available. Otherwise
raise the Empty exception.
'''
return self.get(block=False)

非阻塞获取队列中的值

put && put_nowait

get & get_nowait同理

empty

1
2
3
4
5
6
7
8
9
10
11
12
13
def empty(self):
'''Return True if the queue is empty, False otherwise (not reliable!).

This method is likely to be removed at some point. Use qsize() == 0
as a direct substitute, but be aware that either approach risks a race
condition where a queue can grow before the result of empty() or
qsize() can be used.

To create code that needs to wait for all queued tasks to be
completed, the preferred technique is to use the join() method.
'''
with self.mutex:
return not self._qsize()

检查队列是否为空,为空返回True,不为空返回False

full

1
2
3
4
5
6
7
8
9
10
def full(self):
'''Return True if the queue is full, False otherwise (not reliable!).

This method is likely to be removed at some point. Use qsize() >= n
as a direct substitute, but be aware that either approach risks a race
condition where a queue can shrink before the result of full() or
qsize() can be used.
'''
with self.mutex:
return 0 < self.maxsize <= self._qsize()

判断队列是否已经满了

qsize

1
2
3
4
def qsize(self):
'''Return the approximate size of the queue (not reliable!).'''
with self.mutex:
return self._qsize()

返回队列中元素的个数(真实个数)

构造方法中封装了maxsize字段,可以使用对象.maxsize来获取当前队列的最大值

join & task_done

1
2
3
4
5
import queue

q = queue.Queue(2)
q.put(15)
q.put(59)

程序执行完这5行代码后会退出,退出前队列中还有值,退出后被清空

1
2
3
4
5
6
7
import queue

q = queue.Queue(2)
q.put(15)
q.put(59)

q.join()

程序会一直卡在第7行,只要队列中还有值,程序就不会退出

1
2
3
4
5
6
7
8
9
10
import queue

q = queue.Queue(2)
q.put(15)
q.put(59)

q.get()
q.get()

q.join()

队列中插入两个元素,后面取出了两个元素,执行后你会发现,程序还是卡在第10行的那个join代码

1
2
3
4
5
6
7
8
9
10
11
12
import queue

q = queue.Queue(2)
q.put(15)
q.put(59)

q.get()
q.task_done() # get取完队列中的一个值后,使用task_done方法告诉队列,我已经取出了一个值并处理完毕
q.get()
q.task_done()

q.join()

在每次get取值之后,还需要在跟队列声明一下,我已经取出了数据并处理完毕,这样执行到join代码的时候才不会被卡住

Python提供的所有队列类型

  • 先进先出队列 queue.Queue
  • 后进先出队列 queue.LifoQueue (Queue的基础上进行的封装)
  • 优先级队列 queue.PriorityQueue (Queue的基础上进行的封装)
  • 双向队列 queue.deque

后进先出队列

1
2
3
4
5
6
7
8
9
10
import queue

q = queue.LifoQueue()
q.put('Polar')
q.put('Snow')

print(q.get())

------------
Snow

优先级队列

1
2
3
4
5
6
7
8
9
10
import queue

q = queue.PriorityQueue()
q.put((1, 'Snow'))
q.put((0, 'Polar'))

print(q.get())

------------
Polar

数字越小优先级越高,数字相同,先进先出

双向队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import queue

q = queue.deque()

q.append('A') # 在右侧追加 A
q.append('X') # 在右侧追加 A X
q.appendleft('Z') # 在左侧追加 Z A X
q.appendleft('V') # 在左侧追加 V Z A X

print(q.pop()) # 从右侧取 --> X
print(q.popleft()) # 从左侧取 --> V

------------
X
V