大数据学习
bigdata learning
Toggle navigation
大数据学习
主页
openGauss数据库
Flume
MongoDB
Hadoop
数据库实验
Kafka
Zookeeper
Hbase
Manual
Spark
Neo4j
InfluxDB
RabbitMQ
Flink
About Me
归档
标签
3 扇出模式
无
2023-06-21 20:17:43
13
0
0
bigdata
## 扇出模式 producer不需要指定队列,只需要指定exchange consumer才需要指定队列和交换机,consumer的队列 一般会用默认队列 连接 交换机 channel.exchange_declare(exchange=**'logs'**,exchange_type=**'fanout'**) result = channel.queue_declare(queue=**''**,exclusive=True) ### consumer > ***# coding=utf-8*** > > ***### 消费者*** > > import pika > > user_info = pika.PlainCredentials(**\'admin'**, **'123'**) > > connection = pika.BlockingConnection(pika.ConnectionParameters(**'123.60.216.176'**, 5673, **'/'**, user_info)) > > channel = connection.channel() > ***# 如果指定的queue不存在,则会创建一个queue,如果已经存在 则不会做其他动作,生产者和消费者都做这一步的好处是*** > > ***# 这样生产者和消费者就没有必要的先后启动顺序了*** > > channel.exchange_declare(exchange=**'logs'**,exchange_type=**'fanout'**) > > result = channel.queue_declare(queue=**''**,exclusive=True) > > queue_name = result.method.queue > > > > channel.queue_bind(exchange=**'logs'**,queue=queue_name) > ***# 回调函数*** > > def callback(ch, method, properties, body): > > print(**'消费者收到:{}'**.format(body)) > > ch.basic_ack(delivery_tag=method.delivery_tag) > ***# channel: 包含channel的一切属性和方法*** > > ***# method: 包含 consumer_tag, delivery_tag, exchange, redelivered, routing_key*** > > ***# properties: basic_publish 通过 properties 传入的参数*** > > ***# body: basic_publish发送的消息*** > > ***#channel.basic_qos(prefetch_count=1)*** > > channel.basic_consume(queue=queue_name, ***# 接收指定queue的消息*** > > auto_ack=False, ***# 指定为True,表示消息接收到后自动给消息发送方回复确认,已收到消息*** > > on_message_callback=callback ***# 设置收到消息的回调函数*** > > ) > > print(**'Waiting for messages. To exit press CTRL+C'**) > ***# 一直处于等待接收消息的状态,如果没收到消息就一直处于阻塞状态,收到消息就调用上面的回调函数*** > > channel.start_consuming() ### producer > ***# coding=utf-8*** > > ***### 生产者*** > > import json > > import pika > > import datetime > > import time > > credentials = pika.PlainCredentials(**'guest'**, **'guest'**) ***# mq用户名和密码*** > ***# 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。*** > > connection = pika.BlockingConnection(pika.ConnectionParameters(host = **'123.60.216.176'**,port = 5673,virtual_host = **'/'**,credentials = credentials)) > > channel=connection.channel() > > > > channel.exchange_declare(exchange=**'logs'**, > > exchange_type=**'fanout'**) > ***# 声明消息队列,消息将在这个队列传递,如不存在,则创建*** > > ***# result = channel.queue_declare(queue = 'python-test')*** > > for i in range(300): > > time.sleep(3) > > current_time = datetime.datetime.now() > > message=json.dumps({**'xiny'**:str(current_time)}) > ***# 将消息发送到名为log的exchange中*** > > ***# 因为是fanout类型的exchange,所以无需指定routing_key*** > > channel.basic_publish(exchange=**'logs'**, > > routing_key=**''**, > > body=message) > > print(message) > > connection.close()
上一篇:
2 生产者和消费者
下一篇:
4 direct模式
文档导航