Python下使用rabbitmq之提高消息安全性

消息不丢失

程序在去消息队列中取数据时,如果在没有对该消息处理完毕时,机器宕机,那么将丢失掉对这条消息的处理,下面介绍如何避免这样的情况发生

no_ack

no_ack = False, 如果消费者遇到问题,没有完成对消息的处理,那么rabbitmq会重新将该任务添加到队列中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import pika
import time

# 创建连接对象
connection = pika.BlockingConnection(pika.ConnectionParameters(host='123.57.233.243'))

# 创建频道
channel = connection.channel()

# 切换到指定的队列中,如果队列不存在,则创建
channel.queue_declare(queue='hello')

# 取到消息后执行的回调函数
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(10)
print('ok')
# 通知rabbitmq server,已经对消息处理完毕,可以释放掉保存的这个消息资源
ch.basic_ack(delivery_tag=method.delivery_tag)

# no_ack=False 设置为消息处理完毕后,消费者必须明确告知rabbitmq server已经处理完毕
# 否则rabbitmq server将视为消息处理失败,把该消息重新放回到队列当中
channel.basic_consume(callback, queue='hello', no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

durable

上面处理了消费者宕机保证数据安全的情况,那么rabbitmq server如果宕机怎么办?rabbitmq提供了数据持久化的机制,利用server端的数据持久化机制和消费者端的no_ack特性,可以更高的保障数据的安全

1
2
3
4
5
6
7
8
9
10
11
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='123.57,233,243'))
channel = connection.channel()

# make message persistent
channel.queue_declare(queue='hello', durable=True)

channel.basic_publish(exchange='', routing_key='hello', body='Hello World!', properties=pika.BasicProperties(delivery_mode=2,))
print(" [x] Sent 'Hello World!'")
connection.close()

获取消息的顺序

rabbitmq消费者默认获取消息的顺序是根据消息的索引

假如有4个消费者去消费同一个队列中的数据,那么

  • 第一个消费者消费:0 4 8 12 16 20……
  • 第二个消费者消费:1 5 9 13 17 21……
  • 第三个消费者消费:2 6 10 14 18 22……
  • 第四个消费者消费:3 7 11 15 19 23……

如果第一个消费者处理消息的速度非常慢,有可能发生下面的情况

1
2
3
4
第一个消费者:0 4
第二个消费者:1 5 9 13 17 21
第三个消费者:2 6 10 14 18 22
第四个消费者:3 7 11 15 19 23

就是其他消费者已经处理了很多消息了,第一个消费者还在处理第4个消息

以下代码为更改索引的方式取数据顺序方式取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='123.57.233.243'))
channel = connection.channel()

# make message persistent
channel.queue_declare(queue='hello')


def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
import time
time.sleep(10)
print('ok')
ch.basic_ack(delivery_tag = method.delivery_tag)

# 加入这一行就可以保证rabbitmq上的消息是顺序取出的
channel.basic_qos(prefetch_count=1)

channel.basic_consume(callback, queue='hello', no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()