消息不丢失
程序在去消息队列中取数据时,如果在没有对该消息处理完毕时,机器宕机,那么将丢失掉对这条消息的处理,下面介绍如何避免这样的情况发生
no_ack
no_ack = False, 如果消费者遇到问题,没有完成对消息的处理,那么rabbitmq会重新将该任务添加到队列中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| import pika import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='123.57.233.243'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(10) print('ok') ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(callback, queue='hello', no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
|
durable
上面处理了消费者宕机保证数据安全的情况,那么rabbitmq server如果宕机怎么办?rabbitmq提供了数据持久化的机制,利用server端的数据持久化机制和消费者端的no_ack特性,可以更高的保障数据的安全
1 2 3 4 5 6 7 8 9 10 11
| import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='123.57,233,243')) channel = connection.channel()
channel.queue_declare(queue='hello', durable=True)
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!', properties=pika.BasicProperties(delivery_mode=2,)) print(" [x] Sent 'Hello World!'") connection.close()
|
获取消息的顺序
rabbitmq消费者默认获取消息的顺序是根据消息的索引
假如有4个消费者去消费同一个队列中的数据,那么
- 第一个消费者消费:0 4 8 12 16 20……
- 第二个消费者消费:1 5 9 13 17 21……
- 第三个消费者消费:2 6 10 14 18 22……
- 第四个消费者消费:3 7 11 15 19 23……
如果第一个消费者处理消息的速度非常慢,有可能发生下面的情况
1 2 3 4
| 第一个消费者:0 4 第二个消费者:1 5 9 13 17 21 第三个消费者:2 6 10 14 18 22 第四个消费者:3 7 11 15 19 23
|
就是其他消费者已经处理了很多消息了,第一个消费者还在处理第4个消息
以下代码为更改索引的方式取数据
为顺序方式取数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='123.57.233.243')) channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(10) print('ok') ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='hello', no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
|