大数据学习
bigdata learning
Toggle navigation
大数据学习
主页
openGauss数据库
Flume
MongoDB
Hadoop
数据库实验
Kafka
Zookeeper
Hbase
Manual
Spark
Neo4j
InfluxDB
RabbitMQ
Flink
About Me
归档
标签
6 RPC调用
无
2023-06-21 20:17:43
22
0
0
bigdata
## RPC调用 Rpc是异步调用,client发送请求,同时告诉server处理完后要发送消息给:回调队列的ID:correlation_id=abc,并调用replay_to回调队列对应的回调函数。 不要对消息进行确认,反复确认会导致进入死循环 ### client > `import json` > > `import pika` > > `import uuid` > > `class RpcClient():` > > `def init(self):` > > `self.credentials = pika.PlainCredentials('guest', 'guest') # mq用户名和密码` > `#虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。` > > `self.connection = pika.BlockingConnection(pika.ConnectionParameters(host ='localhost',port = 5673,virtual_host = '/',credentials = self.credentials))` > > `self.channel=self.connection.channel()` > > `result = self.channel.queue_declare(queue='',exclusive=True)` > > `self.callback_queue = result.method.queue` > `# 这里:这个是消息发送方,当要执行回调的时候,它又是接收方` > > `# 使用callback_queue 实现消息接收。即是回调。注意:这里的回调` > > `# 不需要对消息进行确认。反复确认,没玩没了就成了死循环` > > `self.channel.basic_consume(on_message_callback=self.onResponse,` > > `auto_ack=True,` > > `queue=self.callback_queue)` > > > > `def onResponse(self,ch,props,method,body):` > > `if self.corr_id == props.correlation_id:` > > `self.response = body` > > `def call(self,n):` > > `self.response = None` > > `self.corr_id = str(uuid.uuid4())` > `# properties中指定replay_to:表示回调要调用那个函数` > > `# 指定correlation_id:表示回调返回的请求ID是那个` > > `# body:是要交给接收端的参数` > > `self.channel.basic_publish(exchange='',` > > `routing_key='rpc_queue',` > > `properties=pika.BasicProperties(` > > `reply_to=self.callback_queue,` > > `correlation_id=self.corr_id,` > > `),` > > `body = str(n))` > > `while self.response is None:` > > `self.connection.process_data_events()` > > `return int(self.response)` > > > > `rpc = RpcClient()` > > `print(" [x] Requesting 30")` > > `response = rpc.call(30)` > > `print(" [.] Got %r" % response)` ### server > `import pika` > > `connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))` > > `channel = connection.channel()` > > > > `channel.queue_declare(queue='rpc_queue')` > > `\#这里的处理是以进行求斐波那契数列的第n项为例` > > `def fib(n):` > > `if n == 0:` > > `return 0` > > `elif n == 1:` > > `return 1` > > `else:` > > `return fib(n - 1) + fib(n - 2)` > > `response = fib(30)` > > `def on_request(ch, method, props, body):` > > `n = int(body)` > > `response = fib(n)` > `#收到的消息` > > `print(" [.] fib(%s)" % n)` > > `ch.basic_publish(exchange=**''**, routing_key=props.reply_to,properties=pika.BasicProperties(correlation_id= props.correlation_id),body=str(response))` > > `print("routringkey",props.reply_to)` > `#手动响应` > > `ch.basic_ack(delivery_tag=method.delivery_tag)` > `channel.basic_qos(p`enter code here`refetch_count=1)` > > `channel.basic_consume( queue='rpc_queue',on_message_callback=on_request)` > > > > `print(" [x] Awaiting RPC requests")` > > `channel.start_consuming()`
上一篇:
5 topic模式
下一篇:
JDK安装
文档导航