RabbitMQ 安装
在centos7上安装rabbitmq
- 安装erlang语言库
RabbitMQ使用了Erlang开发语言,Erlang是为电话交换机开发的语言,天生自带高并发光环,和高可用特性
下载&安装# 下载 wget https://github.com/rabbitmq/erlang-rpm/releases/download/v21.2.6/erlang-21.2.6-1.el7.x86_64.rpm # 安装 rpm -ivh erlang-21.2.6-1.el7.x86_64.rpm --force --nodeps
- 安装socat依赖
# 下载 socat rpm wget http://mirror.centos.org/centos/7/os/x86_64/Packages/socat-1.7.3.2-2.el7.x86_64.rpm # 安装 socat 依赖包 rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm
- 安装rabbitmq
# 下载 rpm 包 wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.13/rabbitmq-server-3.7.13-1.el7.noarch.rpm # 安装 rpm 包 rpm -ivh rabbitmq-server-3.7.13-1.el7.noarch.rpm
- rabbitmq 开机启动&启动&停止
# 设置服务,开机自动启动 chkconfig rabbitmq-server on # 启动服务 service rabbitmq-server start # 停止服务 service rabbitmq-server stop
- 启用UI管理界面
# 开启管理界面插件 rabbitmq-plugins enable rabbitmq_management # 防火墙打开 15672 管理端口 firewall-cmd --zone=public --add-port=15672/tcp --permanent firewall-cmd --reload
访问:http://XXX:15672
登录 分配权限 - 添加用户 并赋予权限
账号:admin
密码:admin
# 添加用户 rabbitmqctl add_user admin admin # 新用户设置用户为超级管理员 rabbitmqctl set_user_tags admin administrator
- 开放连接端口
# 打开客户端连接端口 firewall-cmd --zone=public --add-port=5672/tcp --permanent firewall-cmd --reload
RabbitMQ 主要作用
服务解耦
- 一般调用
例如有以下服务,服务B、C需要从服务A取数据:
看似合理,但是如果我的需求一旦变更。
例如:需要从新的服务器获取数据,或者服务器增加、规模扩大等情况,那么就需要进行代码的修改等操作,这就是因为服务之间的高耦合性导致。
- 消息中间件实现服务之间解耦(RabbitMQ)
同样拿上面例子说明:
这样我们两边数据只需要考虑数据存取即可,不必关心从谁取数据
&谁从我取数据
服务A向RabbitMQ
发送数据 服务B、C从 RabbitMQ
取数据,两边服务解耦
流量削峰
- 例如:网站每秒约有300个请求
例如这样我们的一台服务器每秒可以接受500
请求,这样可以轻松应对
- 如果是这样呢
网站每秒访问量不变,服务器不变,但是总有那么一两个小时访问量每秒3000
怎么办?
解决方案一:增加服务器数量,但是为了一两个小时的流量高峰增加大量服务器显然不合适。
解决方案二:消息中间件流量削峰,例如:
RabbitMQ 的瞬时吞吐量非常大 将请求放在队列中,由服务器慢慢进行处理
异步调用
- 例如:点外卖,用寻找骑手模拟耗时操作
支付成功是立即完成,但是寻找骑手可能是个耗时操作,这样就造成整条调用链路响应非常缓慢,这样整条链路耗时 30.2s
- RabbitMQ 异步调用
这样的话支付系统支付完成后,无需等待耗时操作,链路响应时长 205ms
RabbitMQ 基本概念
RabbitMQ是一种消息中间件,用于处理来自客户端的异步消息
。服务端将要发送的消息放入到队列池
中。接收端可以根据RabbitMQ配置的转发机制接收服务端发来的消息
。RabbitMQ依据指定的转发规则进行消息的转发、缓冲和持久化操作,主要用在多服务器间或单服务器的子系统间进行通信
,是分布式系统标准的配置。
Exchange(交换机)
接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为。在RabbitMQ中,ExchangeType常用的有direct、Fanout和Topic三种。
还有一种 Headers交换机允许你匹配AMQP消息的header而非路由键,除此之外headers交换器和direct交换器完全一致,但性能却很差,几乎用不到,不做了解了就。
RabbitMQ六种工作模式(官网已经更新了第七种)
1. 简单模式
- 一个生产者,一个消费者
- 新建maven项目 添加依赖,对这几种工作模式进行测试
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.4.3</version>
</dependency>
- 生产者
public class Test1 {
public static void main(String[] args) throws Exception {
//创建连接工厂,并设置连接信息
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.5");
f.setPort(5672);//可选,5672是默认端口缺省值
f.setUsername("admin");
f.setPassword("admin");
/*
* 与rabbitmq服务器建立连接,
* rabbitmq服务器端使用的是nio,会复用tcp连接,
* 并开辟多个信道与客户端通信
* 以减轻服务器端建立连接的开销
*/
Connection c = f.newConnection();
//建立信道
Channel ch = c.createChannel();
/*
* 声明队列,会在rabbitmq中创建一个队列
* 如果已经创建过该队列,就不能再使用其他参数来创建
*
* 参数含义:
* -queue: 队列名称
* -durable: 队列持久化,true表示RabbitMQ重启后队列仍存在
* -exclusive: 排他,true表示限制仅当前连接可用
* -autoDelete: 当最后一个消费者断开后,是否删除队列
* -arguments: 其他参数
*/
ch.queueDeclare("helloworld", false,false,false,null);
/*
* 发布消息
* 这里把消息向默认交换机发送.
* 默认交换机隐含与所有队列绑定,routing key即为队列名称
*
* 参数含义:
* -exchange: 交换机名称,空串表示默认交换机"(AMQP default)",不能用 null
* -routingKey: 对于默认交换机,路由键就是目标队列名称
* -props: 其他参数,例如头信息
* -body: 消息内容byte[]数组
*/
ch.basicPublish("", "helloworld", null, "Hello world!".getBytes());
System.out.println("消息已发送");
c.close();
}
}
- 接收者
public class Test2 {
public static void main(String[] args) throws Exception {
//连接工厂
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.5");
f.setUsername("admin");
f.setPassword("admin");
//建立连接
Connection c = f.newConnection();
//建立信道
Channel ch = c.createChannel();
//声明队列,如果该队列已经创建过,则不会重复创建
ch.queueDeclare("helloworld",false,false,false,null);
System.out.println("等待接收数据");
//收到消息后用来处理消息的回调对象
DeliverCallback callback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
String msg = new String(message.getBody(), "UTF-8");
System.out.println("收到: "+msg);
}
};
//消费者取消时的回调对象
CancelCallback cancel = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
//处理消息
//参数:要接收的队列名,是否自动确认,处理消息回调,不处理回调
ch.basicConsume("helloworld", true, callback, cancel);
}
}
2. 工作模式 work
- 一个生产者,多个消费者,每个消费者获取到的消息唯一,轮询策略
他的工作策略为,每个服务每次接收一条消息,进行分发,并不能做到负载均衡,需要进行额外的配置
- 将自动确认处理消息改为 false,处理完成后手动确认 在自动确认的情况下RabbitMQ并不能知晓当前服务器是否遇忙,只有确认了处理完成才会对服务重新分发
关闭自动确认: channel.basicConsume("队列名",false
, deliverCallback, cancelCallback);
//autoAck设置为false,则需要手动确认发送回执
channel.basicConsume("task_queue", false, deliverCallback, cancelCallback);
发送确认回执:channel.basicAck(message.getEnvelope().getDeliveryTag(), false)
;
//手动确认处理完成 给mq服务器发送回执
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
- 设置接收者每次接收的消息数量 1 每次处理一条配合手动确认使用
设置每次接收的数据量:channel.basicQos(1);
- 如果为订单等敏感数据可以开启队列持久化策略,
创建&获取持久化队列,channel.queueDeclare("队列名",true
,false,false,null);
channel.queueDeclare("队列名",true,false,false,null);
发送信息时头信息添加 MessageProperties.PERSISTENT_TEXT_PLAIN
:
channel.basicPublish("","task_queue",MessageProperties.PERSISTENT_TEXT_PLAIN,s.getBytes())
发送者
public class Test1 {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.64.5");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
/**
* 参数说明:
* 1. queue: 队列名称
* 2. durable: 是否持久化队列 true持久化 服务重启后仍存在
* 3. exclusive: 是否为独占队列 排他 true表示此队列仅当前连接可用
* 4. autoDelete: 当最后一个连接断开时是否删除队列
* 5. arguments: 其他参数
*/
channel.queueDeclare("task_queue",true,false,false,null);
while (true){
System.out.println("输入:");
String s = new Scanner(System.in).nextLine();
/**
* 参数说明:
* 1. exchange: 交换机名称,空串表示默认交换机"(AMQP default)",不能用 null
* 2. routingKey: 需要发送到那个队列
* 3. props: 其他消息 例如头信息
* 4. body: 消息内容数据 byte[] 类型
*/
channel.basicPublish("","task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN,s.getBytes());
System.out.println("消息已发送");
}
}
}
接收者:每遇到一个 .
停一秒模拟耗时操作
public class Test2 {
public static void main(String[] args) throws Exception {
//初始化连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.64.5");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
//建立连接
Connection connection = connectionFactory.newConnection();
//建立信道
Channel channel = connection.createChannel();
//创建队列 如果有则不创建
channel.queueDeclare("task_queue", true, false, false, null);
System.out.println("等待接收消息");
//每次只处理一条数据
channel.basicQos(1);
//收到消息后处理消息的回调对象 lambda 表达式
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("收到:" + msg + "\n正在处理...");
if (msg.endsWith(".")) {
for (int i = 0; i < msg.length(); i++) {
if (msg.charAt(i) == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//手动确认处理完成 给mq服务器发送回执 false关闭统一确认,逐条确认
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
System.out.println("处理完成!\n-------------------------------------------------");
};
//消费者取消时的回调对象 lambda 表达式
CancelCallback cancelCallback = consumerTag -> {
};
//autoAck设置为false,则需要手动确认发送回执
channel.basicConsume("task_queue", false, deliverCallback, cancelCallback);
}
}
- 打开 IDEA 允许 编辑运行设置 勾选
Allow parallel run
,一个main方法运行多个
消息确认(回执)
如果一个消费者没有返回消息确认就挂掉了(信道关闭,连接关闭或者TCP链接丢失),rabbitmq就会明白,这个消息没有被处理完成,rebbitmq就会把这条消息重新放入队列,如果在这时有其他的消费者在线,那么rabbitmq就会迅速的把这条消息传递给其他的消费者,这样就确保了没有消息会丢失。
这里不存在消息超时, rabbitmq只在消费者挂掉时重新分派消息, 即使消费者花非常久的时间来处理消息也可以
手动消息确认默认是开启的;
3.发布订阅模式(扇形交换机 fanout exchange)
一个生产者发送的消息会被多个消费者获取,由服务生产者
发送给交换机
交换机群发给订阅者
服务提供者
public class Test1 {
public static void main(String[] args) throws Exception {
//创建连接工厂并初始化
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.64.5");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
//获取连接&新建
Connection connection = connectionFactory.newConnection();
//打开信道
Channel channel = connection.createChannel();
//创建名为 logs 的 扇形交换机 fanout 可以直接传交换机名称
channel.exchangeDeclare("logs",BuiltinExchangeType.FANOUT);
//循环输入
while (true){
System.err.println("请输入:");
String s = new Scanner(System.in).nextLine();
//发送数据 参数: 发送的交换机名,需要发送的路由键 || 队列,字节数组(消息)
channel.basicPublish("logs","",null,s.getBytes());
System.out.println("\n");
}
}
}
服务消费者
public class Test2 {
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.64.5");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//创建交换机 有则跳过
channel.exchangeDeclare("logs","fanout");
//创建队列
//参数不指定默认为: 随机队列名,非持久,独占,无连接即删除,其他参数null
//channel.queueDeclare("随机名",false,true,true,null);
//.getQueue() 可以获取随机的队列名
String queueName = channel.queueDeclare().getQueue();
//绑定交换机和队列
channel.queueBind(queueName,"logs","");
DeliverCallback deliverCallback = (consumerTag, message) -> {
String string = new String(message.getBody(), "UTF-8");
System.out.println(string);
};
CancelCallback cancelCallback = consumerTag -> {
};
channel.basicConsume(queueName,true,deliverCallback,cancelCallback);
}
}
4. 路由模式(直连交换机 direct exchange)
发送消息到交换机并且要指定路由key ,消费者将队列绑定到交换机时需要指定绑定key 当 路由key == 绑定key 可以将两个都理解为 绑定key
,一个队列可以和多个路由key绑定,路由模式使用 direct exchange
直连交换机
服务提供者
public class Test1 {
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.64.5");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
//创建、获取连接
Connection connection = connectionFactory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//创建交换机 direct 直连交换机
channel.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
while (true) {
System.err.println("输入路由键:");
String key = new Scanner(System.in).nextLine();
System.err.println("请输入内容:");
String msg = new Scanner(System.in).nextLine();
//发送数据 参数: 发送的交换机名,需要发送的路由键 || 队列名,字节数组(消息)
channel.basicPublish("direct_logs",key,null,msg.getBytes());
}
}
}
服务消费者
public class Test2 {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.64.5");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//创建交换机
channel.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
//创建队列并获取队列名
String queueName = channel.queueDeclare().getQueue();
//获取字符串并以逗号隔开
System.err.println("请输入绑定键(多个请以逗号隔开):");
String[] keys = new Scanner(System.in).nextLine().split(",");
//绑定键 路由模式可以绑定多个
for (String i : keys) {
channel.queueBind(queueName, "direct_logs", i);
}
System.err.println("绑定完成,等待接收消息!");
//lambda 表达式
DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println(message.getEnvelope().getRoutingKey() + "绑定键收到消息:\n" + new String(message.getBody(), StandardCharsets.UTF_8) + "\n");
CancelCallback cancelCallback = consumerTag -> {
};
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
}
}
5. 主题模式(主题交换机 topic exchange)
将路由键和某模式进行匹配,此时队列需要绑定在一个模式上,#
匹配一个词或多个词,*
只匹配一个词,可以看作为增强版 路由模式 可以进行通配,当路由key匹配到了一个队列的两个或多个绑定key,消息也只会发送一次
上图所示情况为:三个绑定键
都可以匹配上 路由键
两者都会接受消息,但是 消费者2
只会接受到一条数据
服务提供者
public class Test1 {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.64.5");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
while (true) {
System.err.println("请输入路由键:");
String key = new Scanner(System.in).nextLine();
System.err.println("请输入内容:");
String msg = new Scanner(System.in).nextLine();
channel.basicPublish("topic_logs",key,null,msg.getBytes());
System.out.println("发送成功!\n");
}
}
}
服务消费者
public class Test2 {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.64.5");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
String queueName = channel.queueDeclare().getQueue();
System.err.println("请输入绑定键(多个请以逗号隔开):");
String[] keys = new Scanner(System.in).nextLine().split(",");
for (String key:keys) {
//绑定
channel.queueBind(queueName,"topic_logs",key);
}
DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println(message.getEnvelope().getRoutingKey() + "绑定键接收到消息:\n" + new String(message.getBody(), StandardCharsets.UTF_8));
CancelCallback cancelCallback = consumerTag -> {
};
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
}
}
6.RPC 模式
RPC 模式工作方式:
- 对于RPC请求,设置为仅为请求创建的匿名独占队列,客户端发送一条带有两个属性的消息:replyTo,和correlationId,设置为每个请求的惟一id值。
- 请求被发送到rpc_queue队列。
RPC工作进程(即:服务器)在队列上等待请求。当一个请求出现时,它执行任务,并使用replyTo字段中的队列将结果发回客户机。 - 客户机在回应消息队列上等待数据。当消息出现时,它检查correlationId属性。如果匹配请求中的值,则向程序返回该响应数据
消息属性 Message Properties
- AMQP 0-9-1协议定义了消息的14个属性。大部分属性很少使用,下面是比较常用的4个:
deliveryMode
:将消息标记为持久化(值为2)或非持久化(任何其他值)。
contentType
:用于描述mime类型。例如,对于经常使用的JSON格式,将此属性设置为:application/json。
replyTo
:通常用于指定回调队列。
correlationId
:将RPC响应与请求关联起来非常有用。
服务端
public class Test2 {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.64.5");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("rpc_queue", false, false, false, null);
//清除队列内容
channel.queuePurge("rpc_queue");
//每次接受一条
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, message) -> {
//获取传来的参数
String s = new String(message.getBody(), StandardCharsets.UTF_8);
//转换为int
int i = Integer.parseInt(s);
//计算结果并转回字符串
String res = String.valueOf(fbnq(i));
//构建 correlationId 标识 将发送来的标识获取到并进行构建
BasicProperties build = new BasicProperties.Builder().correlationId(message.getProperties().getCorrelationId()).build();
//向客户端发送结果
channel.basicPublish("", message.getProperties().getReplyTo(), build, res.getBytes(StandardCharsets.UTF_8));
//执行完毕确认
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
CancelCallback cancelCallback = consumerTag -> {
};
//处理接收参数 不自动确认
channel.basicConsume("rpc_queue", false, deliverCallback, cancelCallback);
}
/**
* 递归求斐波那契数 模拟耗时操作
*
* @param i 参数求第几个
* @return 结果
*/
private static long fbnq(int i) {
if (i == 1 || i == 2) return 1;
System.err.println(i);
return fbnq(i - 1) + fbnq(i - 2);
}
}
服务消费者
public class Test1 {
public static void main(String[] args) throws Exception {
System.out.println("请问求第几个斐波那契数列:");
String call = call(new Scanner(System.in).nextInt() + "");
System.out.println("结果:" + call);
}
private static String call(String msg) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.64.5");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//获取随机队列 不持久化,独占,无连接即删除 并返回队列名
String queueName = channel.queueDeclare().getQueue();
//生成随机的关联id
String uuid = UUID.randomUUID().toString();
//设置两个参数:
//构建所需内容
//1. 请求和响应的关联id correlationId
//2. 传递响应数据的queue replyTo
BasicProperties props = new BasicProperties.Builder().correlationId(uuid).replyTo(queueName).build();
//向rpc_queue队列发送数据 默认交换机
channel.basicPublish("", "rpc_queue", props, msg.getBytes());
//ArrayBlockingQueue 阻塞队列,先进先出,存放收到的结果保证发送和接收的顺序
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1);
DeliverCallback deliverCallback = (consumerTag, message) -> {
//判断返回的uuid是否为发送的uuid 如果是将他添加进 queue 这个阻塞队列
if (message.getProperties().getCorrelationId().equals(uuid)) {
//获取结果
String string = new String(message.getBody(), StandardCharsets.UTF_8);
//将结果存入阻塞队列
queue.offer(string);
}
};
CancelCallback cancelCallback = consumerTag -> {
};
//调用两个匿名内部类处理响应结果
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
//从阻塞队列中取出
String res = queue.take();
//关闭链接
channel.close();
connection.close();
//返回结果
return res;
}
}
virtual host
在RabbitMQ中叫做虚拟消息服务器VirtualHost,每个VirtualHost相当于一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通
创建虚拟主机
将虚拟机授权用户
SpringBoot 使用RabbitMQ 完成数据存取 Demo
添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置连接信息
spring:
rabbitmq:
host: 192.168.64.5
port: 5672
virtualHost: /abc
username: admin
password: admin
# 开启手动确认
listener:
simple:
acknowledge-mode: manual
编辑启动类
/**
* 新建SpringBoot封装的一个Queue实例,包含队列的参数
* 并把Queue实例访入Spring容器
*
* RabbitMQ的自动配置类,会从Spring容日获取所有的Queue实例
* 并创建这些队列
* @return
*/
@Bean
public Queue getQueue(){
//创建一个队列 队列名:orderQueue 是否为持久队列:true
return new Queue("队列名",true);
}
编辑需要使用的类
//RabbitAutoConfiguration中创建了AmpqTemplate实例
@Autowired
private AmqpTemplate amqpTemplate;
//存放数据
public void demo(Demo demo){
//在rabbitmq队列中放入数据 数据不必转化为二进制 底层会自动转换
amqpTemplate.convertAndSend("队列名",demo);
//从队列中获取数据
}
//获取数据 方法参数为要接受的数据类型
//添加该注解后,会从指定的orderQueue接收消息,
//并把数据转为 PdOrder 实例传递到此方法
@RabbitListener(queues="队列名")
public void demo(Demo demo){
//从队列中获取数据
System.out.print(demo);
}
开启手动确认
配置文件
spring:
rabbitmq:
host: 192.168.64.5
port: 5672
virtualHost: /abc
username: admin
password: admin
# 开启手动确认
listener:
simple:
acknowledge-mode: manual
Java demo
//获取数据 方法参数为要接受的数据类型
//添加该注解后,会从指定的orderQueue接收消息,
//并把数据转为 PdOrder 实例传递到此方法
@RabbitListener(queues="队列名")
public void demo(Demo demo, Channel channel, Message message){
//从队列中获取数据
System.out.print(demo);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}