Python下使用rabbitmq之exchange属性

生产者向rabbitmq server发送消息,其实消息并不是由生产者直接向rabbitmq server中的队列中插入数据,在队列的前面,还挡着一层exchange,数据是由生产者发送给rabbitmq的exchange,然后由exchange存入对应的队列的

发布与订阅-fanout

exchange type= fanout

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 消费者
import pika

# 创建连接对象
connection = pika.BlockingConnection(pika.ConnectionParameters(host='123.57.233.243'))
# 创建频道对象
channel = connection.channel()

# 指定exchange类型
channel.exchange_declare(exchange='logs', type='fanout')

# 随机创建一个队列名称
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# 将该队列与前面创建的exchange做绑定
# 当绑定完成后,生产者再向 exchange='logs' 中发送消息时,将自动将该消息插入到该队列中
channel.queue_bind(exchange='logs', queue=queue_name)


print(' [*] Waiting for logs. To exit press CTRL+C')

# 取到数据后的回调函数
def callback(ch, method, properties, body):
print(" [x] %r" % body)

channel.basic_consume(callback, queue=queue_name, no_ack=True)

channel.start_consuming()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 生产者
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
host='123.57.233.243'))
channel = connection.channel()

# 指定exchange='logs'
channel.exchange_declare(exchange='logs', type='fanout')

message = "PolarSnow"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent %r" % message)
connection.close()

启动多个消费者,那么多个消费者就创建了多个队列(因为每一个消费者都是随机的队列名字),且所有的消费者创建的队列都和名字为logs的exchange做了绑定,也就是说,如果有生产者向exchange='logs'中发送消息,那么所有的消费者对应的队列都会收到消息

关键字匹配-direct

exchange type=direct

在指定exchange名称的基础之上,还可以通过匹配关键字来分流消息

就像上图一样,同一个队列可以绑定多个关键字,且同一个关键字可以被不同的队列重复绑定

下面就来实现一个上图效果的队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 消费者1[info, error, warning]
import pika

# 创建连接对象
connection = pika.BlockingConnection(pika.ConnectionParameters(host='123.57.233.243'))
# 创建频道对象
channel = connection.channel()

# 指定exchange的名称和类型
# 类型direct, 绑定关键字
channel.exchange_declare(exchange='direct_logs', type='direct')

# 创建一个名字随机的队列
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# 这里需要为一个队列绑定多个关键字,在这里输入需要绑定的关键字列表
severities = ['info', 'error', 'warning']

# 循环绑定所有的关键字routing_key=severity
for severity in severities:
channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback, queue=queue_name, no_ack=True)

channel.start_consuming()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 消费者2[error,]
import pika

# 创建连接对象
connection = pika.BlockingConnection(pika.ConnectionParameters(host='123.57.233.243'))
# 创建频道对象
channel = connection.channel()

# 指定exchange的名称和类型
# 类型direct, 绑定关键字
channel.exchange_declare(exchange='direct_logs', type='direct')

# 创建一个名字随机的队列
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# 这里需要为一个队列绑定多个关键字,在这里输入需要绑定的关键字列表
severities = ['error',]

# 循环绑定所有的关键字routing_key=severity
for severity in severities:
channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback, queue=queue_name, no_ack=True)

channel.start_consuming()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 生产者
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
host='123.57.233.243'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', type='direct')

# 指定关键字为info时,只有第二个队列可以收到
severity = 'info'
# 指定关键字为error时,两个指定了exchange='direct_logs'的队列都可以收到
# serverity = 'error'

message = 'PolarSnow'
channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

模糊匹配-topic

exchange type = topic

在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

  • # 表示可以匹配 0 个 或 多个 单词
    • 表示只能匹配 一个 单词
1
2
3
发送者路由值              队列中
docs.20150509.cn docs.* -- 不匹配
docs.20150509.cn docs.# -- 匹配
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 消费者1[*.info.*, *.*.*error, warning.#]
import pika

# 创建连接对象
connection = pika.BlockingConnection(pika.ConnectionParameters(host='123.57.233.243'))
# 创建频道对象
channel = connection.channel()

# 指定exchange的名称和类型
# 类型direct, 绑定关键字
channel.exchange_declare(exchange='topic_logs', type='direct')

# 创建一个名字随机的队列
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# 跟上面的关键字匹配不同,这里需要绑定通配符
severities = ['*.info.*', '*.*.*error', 'warning.#']

# 循环绑定所有的关键字routing_key=severity
for severity in severities:
channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback, queue=queue_name, no_ack=True)

channel.start_consuming()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 消费者2[*.*.*error,]
import pika

# 创建连接对象
connection = pika.BlockingConnection(pika.ConnectionParameters(host='123.57.233.243'))
# 创建频道对象
channel = connection.channel()

# 指定exchange的名称和类型
# 类型direct, 绑定关键字
channel.exchange_declare(exchange='topic_logs', type='direct')

# 创建一个名字随机的队列
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# 跟上面的关键字匹配不同,这里需要绑定通配符
severities = ['*.*.*error',]

# 循环绑定所有的关键字routing_key=severity
for severity in severities:
channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback, queue=queue_name, no_ack=True)

channel.start_consuming()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 生产者
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
host='123.57.233.243'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', type='direct')

# 指定关键字为info时,只有第二个队列可以收到
# rabbitmq绑定了topic,支持模糊匹配
severity = 'a.info.b'
# 指定关键字为error时,两个指定了exchange='topic_logs'的队列都可以收到
# serverity = 'a.b.c.error'

message = 'PolarSnow'
channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

基于消息队列的主机管理系统

  • CLI在向server端发送命令时,指定了主机列表和一个随机的md5值
  • 消息队列中匹配主机对应的队列,将需要执行的命令插入到队列中
  • 对应的消费者(agent)监听到有新的消息后,在本地执行命令,并将执行结果发送给server端。在server端创建以md5为名字的临时队列,并将执行结果放在该队列中
  • 此次任务的所有消费者拿到的md5值都是相同的,所以,所有的消费者执行命令之后,都会将执行结果放入以md5命令的临时消息队列中
  • 全部消费者执行完毕后,在TempQ(md5)中保存了此次任务,所有消费者的执行结果
  • 最后临时队列中的结果可以一一被读出展示给CLI,最后删除该临时队列