大数据学习
bigdata learning
Toggle navigation
大数据学习
主页
openGauss数据库
Flume
MongoDB
Hadoop
数据库实验
Kafka
Zookeeper
Hbase
Manual
Spark
Neo4j
InfluxDB
RabbitMQ
Flink
About Me
归档
标签
2 生产者和消费者
无
2023-06-21 20:17:43
12
0
0
bigdata
## 生产者和消费者 ### 生产者 ```python \# coding=utf-8 \### 生产者 import json import pika import datetime import time credentials = pika.PlainCredentials('guest', 'guest') # mq用户名和密码 **\# 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。** connection = pika.BlockingConnection(pika.ConnectionParameters(host = '123.60.216.176',port = 5673,virtual_host = '/',credentials = credentials)) channel=connection.channel() **\# 声明消息队列,消息将在这个队列传递,如不存在,则创建** result = channel.queue_declare(queue = 'python-test') for i in range(3): time.sleep(3) current_time = datetime.datetime.now() message=json.dumps({'xiny':str(current_time)}) **\# 向队列插入数值 routing_key是队列名** channel.basic_publish(exchange = '',routing_key = 'python-test',body = message) print(message) connection.close() ``` ### 消费者 > \# coding=utf-8 > > \### 消费者 > > import pika > > user_info = pika.PlainCredentials('admin', '123') > > connection = pika.BlockingConnection(pika.ConnectionParameters('123.60.216.176', 5673, '/', user_info)) > > channel = connection.channel() > **\# 如果指定的queue不存在,则会创建一个queue,如果已经存在 则不会做其他动作,生产者和消费者都做这一步的好处是** > > **\# 这样生产者和消费者就没有必要的先后启动顺序了** > > channel.queue_declare(queue='python-test') > **\# 回调函数** > > def callback(ch, method, properties, body): > > print('消费者收到:{}'.format(body)) > **\# channel: 包含channel的一切属性和方法** > > **\# method: 包含 consumer_tag, delivery_tag, exchange, redelivered, routing_key** > > **\# properties: basic_publish 通过 properties 传入的参数** > > **\# body: basic_publish发送的消息** > > channel.basic_consume(queue='python-test', # 接收指定queue的消息 > > auto_ack=True, # 指定为True,表示消息接收到后自动给消息发送方回复确认,已收到消息 > > on_message_callback=callback # 设置收到消息的回调函数 > > ) > > print('Waiting for messages. To exit press CTRL+C') > **\# 一直处于等待接收消息的状态,如果没收到消息就一直处于阻塞状态,收到消息就调用上面的回调函数** > > channel.start_consuming() ### 设置手动输入 ##### 消费和生产都要启动 **durable = True** > channel.queue_declare(queue='python-test',durable=True) ##### 在消费端 需要auto_ack 需要为 False > channel.basic_consume(queue='python-test', ***# 接收指定queue的消息*** > > auto_ack=False, ***# 指定为True,表示消息接收到后自动给消息发送方回复确认,已收到消息*** > > on_message_callback=callback ***# 设置收到消息的回调函数*** > > ) ##### 公平派遣 代码前加: > channel.basic_qos(prefetch_count=1) 如果不加上面这一条,每个人的数量是平均的,加了以后,工作快的consumer的数量会变多,按能力分配
上一篇:
14-Flume案例-Ganglia的安装与部署
下一篇:
3 扇出模式
文档导航