生产者向rabbitmq server发送消息,其实消息并不是由生产者直接向rabbitmq server中的队列中插入数据,在队列的前面,还挡着一层exchange
,数据是由生产者发送给rabbitmq的exchange,然后由exchange存入对应的队列的
发布与订阅-fanout
exchange type= fanout
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='123.57.233.243'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', type='fanout')
result = channel.queue_declare(exclusive=True) queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body): print(" [x] %r" % body)
channel.basic_consume(callback, queue=queue_name, no_ack=True)
channel.start_consuming()
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| import pika
connection = pika.BlockingConnection(pika.ConnectionParameters( host='123.57.233.243')) channel = connection.channel()
channel.exchange_declare(exchange='logs', type='fanout')
message = "PolarSnow" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close()
|
启动多个消费者,那么多个消费者就创建了多个队列(因为每一个消费者都是随机的队列名字),且所有的消费者创建的队列都和名字为logs的exchange做了绑定,也就是说,如果有生产者向exchange='logs'
中发送消息,那么所有的消费者对应的队列都会收到消息
关键字匹配-direct
exchange type=direct
在指定exchange名称的基础之上,还可以通过匹配关键字来分流消息
就像上图一样,同一个队列可以绑定多个关键字,且同一个关键字可以被不同的队列重复绑定
下面就来实现一个上图效果的队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='123.57.233.243'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', type='direct')
result = channel.queue_declare(exclusive=True) queue_name = result.method.queue
severities = ['info', 'error', 'warning']
for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback, queue=queue_name, no_ack=True)
channel.start_consuming()
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='123.57.233.243'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', type='direct')
result = channel.queue_declare(exclusive=True) queue_name = result.method.queue
severities = ['error',]
for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback, queue=queue_name, no_ack=True)
channel.start_consuming()
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| import pika
connection = pika.BlockingConnection(pika.ConnectionParameters( host='123.57.233.243')) channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', type='direct')
severity = 'info'
message = 'PolarSnow' channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
|
模糊匹配-topic
exchange type = topic
在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。
1 2 3
| 发送者路由值 队列中 docs.20150509.cn docs.* -- 不匹配 docs.20150509.cn docs.# -- 匹配
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='123.57.233.243'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', type='direct')
result = channel.queue_declare(exclusive=True) queue_name = result.method.queue
severities = ['*.info.*', '*.*.*error', 'warning.#']
for severity in severities: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback, queue=queue_name, no_ack=True)
channel.start_consuming()
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='123.57.233.243'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', type='direct')
result = channel.queue_declare(exclusive=True) queue_name = result.method.queue
severities = ['*.*.*error',]
for severity in severities: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback, queue=queue_name, no_ack=True)
channel.start_consuming()
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| import pika
connection = pika.BlockingConnection(pika.ConnectionParameters( host='123.57.233.243')) channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', type='direct')
severity = 'a.info.b'
message = 'PolarSnow' channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
|
基于消息队列的主机管理系统
- CLI在向server端发送命令时,指定了主机列表和一个随机的md5值
- 消息队列中匹配主机对应的队列,将需要执行的命令插入到队列中
- 对应的消费者(agent)监听到有新的消息后,在本地执行命令,并将执行结果发送给server端。在server端创建以md5为名字的临时队列,并将执行结果放在该队列中
- 此次任务的所有消费者拿到的md5值都是相同的,所以,所有的消费者执行命令之后,都会将执行结果放入以md5命令的临时消息队列中
- 全部消费者执行完毕后,在TempQ(md5)中保存了此次任务,所有消费者的执行结果
- 最后临时队列中的结果可以一一被读出展示给CLI,最后删除该临时队列