大数据学习
bigdata learning
Toggle navigation
大数据学习
主页
openGauss数据库
Flume
MongoDB
Hadoop
数据库实验
Kafka
Zookeeper
Hbase
Manual
Spark
Neo4j
InfluxDB
RabbitMQ
Flink
About Me
归档
标签
5 topic模式
无
2023-06-21 20:17:43
12
0
0
bigdata
## topic模式 producer只需要向topic交换机里发送信息,producer需往routing-key中发信息 routing-key与queue可以是多对一 ### producer > \# coding=utf-8 > > \### 生产线者 > > import json > > import pika > > import sys > > import datetime > > import time > > credentials = pika.PlainCredentials(**'admin'**, **'123'**) ***# mq用户名和密码*** > ***# 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。*** > > connection = pika.BlockingConnection(pika.ConnectionParameters(host = **'123.60.216.176'**,port = 5673,virtual_host = **'/'**,credentials = credentials)) > > channel=connection.channel() > > > > channel.exchange_declare(exchange=**'logs_topic'**,exchange_type=**'topic'**,durable=**True**) > > routing_key = sys.argv[1] **if** len(sys.argv) > 2 **else 'anonymous.info'** > > message = **' '**.join(sys.argv[2:]) **or 'Hello World!'** > > channel.basic_publish(exchange=**'topic_logs'**,routing_key=routing_key,body=message) > > print(**" [x] Sent %r:%r"** % (routing_key, message)) > > connection.close() ### consumer > \# coding=utf-8 > > \### 消费者 > > import pika > > import sys > > > > connection = pika.BlockingConnection(pika.ConnectionParameters( > > host=**'localhost'**)) > > channel = connection.channel() > ***#和发送端保持一致 注意:exchange_type='topic'*** > > channel.exchange_declare(exchange=**'topic_logs'**,exchange_type=**'topic'**) > > result = channel.queue_declare(queue=**''**,exclusive=**True**) > > queue_name = result.method.queue > > > > binding_keys = sys.argv[1:] > > **if not** binding_keys: > > sys.stderr.write(**"Usage: %s [binding_key]...\n"** % sys.argv[0]) > > sys.exit(1) > > > > **for** binding_key **in** binding_keys: > > channel.queue_bind(exchange=**'topic_logs'**,queue=queue_name,routing_key=binding_key) > > print(**' [\*] Waiting for logs. To exit press CTRL+C'**) > > > > **def** callback(ch, method, properties, body): > > print(**" [x] %r:%r"** % (method.routing_key, body)) > > channel.basic_consume(queue=queue_name,on_message_callback=callback) > > > > channel.start_consuming()
上一篇:
4 direct模式
下一篇:
6 RPC调用
文档导航