首页 > 7.RabbitMQ RFC同步调用

7.RabbitMQ RFC同步调用

RabbitMQ RFC同步调用是使用了两个异步调用完成的,生产者调用消费者的同时,自己也作为消费者等待某一队列的返回消息,消费者接受到生产者的消息同时,也作为消息发送者发送一消息给生产者。参考下图:

7.RabbitMQ <wbr>RFC同步调用

调用流程如下:
7.RabbitMQ <wbr>RFC同步调用

其他的消息服务器实现同步调用也是类似的原理,比如ActiveMQ。
下面编写消费者类Server
7.RabbitMQ <wbr>RFC同步调用

7.RabbitMQ <wbr>RFC同步调用

生产者Client代码
7.RabbitMQ <wbr>RFC同步调用

7.RabbitMQ <wbr>RFC同步调用

启动一命令行,将当前目录转移到项目所在的目录
7.RabbitMQ <wbr>RFC同步调用

在Eclipse中运行Client
7.RabbitMQ <wbr>RFC同步调用

代码:
package com.test.rfc;
import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class Server {
public static void main(String[] argv) {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("admin");
factory.setPassword("admin");
factory.setHost("192.168.169.142"); //使用默认端口5672
Connection connection = null;
try {
connection = factory.newConnection();
final Channel channel = connection.createChannel();
String queueName = "queue_rpc";
channel.queueDeclare(queueName, false, false, false, null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException 
{
System.out.println("test1" + new String(body));
AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
.correlationId(properties.getCorrelationId())
.build();
String response = "hello client,I'm rfc server";
channel.basicPublish("", properties.getReplyTo(),
replyProps, response.getBytes());
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(queueName, false, consumer);
} catch (IOException e) {
e.printStackTrace();
}
}
}
package com.test.rfc;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import com.rabbitmq.client.*;
public class Client {
public static void main(String[] argv) {
try
{
//发送消息的队列,Server在这个队列上接受消息
String queueName = "queue_rpc";
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("admin");
factory.setPassword("admin");
factory.setHost("192.168.169.142"); //使用默认端口5672
Connection connection = null;
connection = factory.newConnection();
Channel channel = connection.createChannel();
//生成临时的队列,Client在这队列上等待Server返回信息,Server向这个队列发消息
String replyQueueName = channel.queueDeclare().getQueue();
//生成唯一ID
final String corrId = UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.correlationId(corrId).replyTo(replyQueueName).build();
//客户端发送RFC请求
channel.basicPublish("", queueName, props, "GetUserInfo".getBytes());
//Server返回消息
final BlockingQueue response = new ArrayBlockingQueue(1);
channel.basicConsume(replyQueueName, true,
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println(properties.getCorrelationId());
if (properties.getCorrelationId().equals(corrId)) {
response.offer(body);
}
}
});
byte[] b = response.take();
System.out.println(new String(b));
channel.close();
connection.close();
}
catch(Exception e)
{
e.printStackTrace();
}
}
}

转载于:https://www.cnblogs.com/zzpblogs/p/8168805.html

更多相关:

  • 有一天,我写了一个自信满满的自定义组件myComponent,在多个页面import使用了,结果控制台给我来这个 我特么裤子都脱了,你给我来这个提示是几个意思 仔细一看 The Component 'MyComponentComponent' is declared by more than one NgModule...

  • 创建一个带路由的项目,依次执行下面每行代码 ng n RouingApp --routingcd RouingAppng g c components/firstng g c components/secondng g m components/second --routing    代码拷贝: import {NgModul...

  •       cnpm install vue-quill-editor cnpm install quill-image-drop-module cnpm install quill-image-resize-module 执行上面的命令安装,然后在main.js下面加入 //引入quill-editor编辑器import...

  • 首先要理解Vue项目加载顺序: index.html → main.js → App.vue → nav.json→ routes.js → page1.vue index.html建议加入样式

  • 简单记录平时画图用到的python 便捷小脚本 1. 从单个文件输入 绘制坐标系图 #!/usr/bin/python # coding: utf-8 import matplotlib.pyplot as plt import numpy as np import matplotlib as mpl import sysf...

  • importjava.security.SecureRandom;importjavax.crypto.Cipher;importjavax.crypto.SecretKey;importjavax.crypto.SecretKeyFactory;importjavax.crypto.spec.DESKeySpec;//结果与DES算...

  • 题目:替换空格 请实现一个函数,把字符串 s 中的每个空格替换成"%20"。 输入:s = "We are happy." 输出:"We%20are%20happy." 限制: 0 <= s 的长度 <= 10000 解题: 时间复杂度:O(n) 空间复杂度:O(n) class Solution { public:s...

  • 在C++11标准库中,string.h已经添加了to_string方法,方便从其他类型(如整形)快速转换成字面值。 例如: for (size_t i = 0; i < texArrSize; i++)RTX_Shader.SetInt(string("TexArr[") + to_string(i) + "]", 7 + i);...

  • Ubuntu 14.04安装并升级之后,变成楷体字体非常难看,我昨天搞了一晚上,终于理了个头绪,这里整理一下。 经过网上调研,大家的一致看法是,使用开源字体库文泉驿的微黑字体效果比较理想,甚至效果不输windows平台的雅黑字体。下面我打算微黑来美化Ubuntu 14.04. 1.安装文泉驿微黑字体库 sudo aptitude...

  • 使用string时发现了一些坑。 我们知道stl 容器并不是线程安全的,所以在使用它们的过程中往往需要一些同步机制来保证并发场景下的同步更新。 应该踩的坑还是一个不拉的踩了进去,所以还是记录一下吧。 string作为一个容器,随着我们的append 或者 针对string的+ 操作都会让string内部的数据域动态增加,而动态增加的...

  • 首先对微擎的工作原理做简单描述, 微擎使用规则和模块的机制来处理公众平台的请求数据并返回响应的结果.执行流程描述为: 粉丝用户与公众号码进行对话或交互, 而后公众平台将粉丝用户的请求消息(当前包括: 文本, 图片, 位置, 链接, 事件. 请参阅消息类型)传递给微擎系统, 微擎系统按照消息类型和对应的公众号所设定的规则列表匹配到合适的...

  • 消息队列的使用场景以下介绍消息队列在实际应用常用的使用场景。异步处理、应用解耦、流量削锋和消息通讯四个场景。1】异步处理:场景说明:用户注册后,需要发注册邮件和注册短信。引入消息队列后架构如下:用户的响应时间=注册信息写入数据库的时间,例如50毫秒。发注册邮箱、发注册短信写入消息队列后,直接返回客户端,因写入消息队列的速度很快,基...

  • 下面是我凭记忆想到的几个题目,有需要的同学就拿去吧,我也算做了点善事. 中体骏彩C++笔试题 2013-11-18 1.指针的含义是:B A.名字 B.地址 C.名称 D.符号 2.给出下面的程序输出: #include #include #include ...

  • 双端通信描述 利用消息队列针对发送接受消息的类型唯一性 进行多个客户端之间消息传递,而不需要server端进行消息转发。 同时消息队列的读阻塞和写阻塞特性(消息队列中已经写入数据,如果再不读出来,则无法再次写入)让消息队列的实现过程只能如下: 客户端1的父进程用来处理类型1的消息写,子进程处理类型2的消息读客户端2的父进程处理类型...

  • 文章目录基本介绍编程接口代码实例消息队列的发送和接收消息队列中的消息对象的属性控制 基本介绍 支持不同进程之间以消息(messages)的形式进行数据交换,消息能够拥有自己的标识,且内核使用链表方式进行消息管理。进程之间的通信角色为:发送者和接受者 发送者: a. 获取消息队列的ID(key或者msgid) b. 将数据放入...