-->
侧边栏壁纸
博主头像
断钩鱼 博主等级

行动起来,活在当下

  • 累计撰写 28 篇文章
  • 累计创建 34 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

RabbitMQ

halt
2020-01-13 / 0 评论 / 4 点赞 / 1933 阅读 / 0 字

RabbitMQ 安装

在centos7上安装rabbitmq

  1. 安装erlang语言库
    RabbitMQ使用了Erlang开发语言,Erlang是为电话交换机开发的语言,天生自带高并发光环,和高可用特性
    下载&安装
    # 下载
    wget https://github.com/rabbitmq/erlang-rpm/releases/download/v21.2.6/erlang-21.2.6-1.el7.x86_64.rpm
    # 安装
    rpm -ivh erlang-21.2.6-1.el7.x86_64.rpm --force --nodeps
    
  2. 安装socat依赖
    	# 下载 socat rpm
    	wget http://mirror.centos.org/centos/7/os/x86_64/Packages/socat-1.7.3.2-2.el7.x86_64.rpm
    	# 安装 socat 依赖包
    	rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm
    
  3. 安装rabbitmq
    	# 下载 rpm 包
    	wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.13/rabbitmq-server-3.7.13-1.el7.noarch.rpm
    	# 安装 rpm 包
    	rpm -ivh rabbitmq-server-3.7.13-1.el7.noarch.rpm
    
  4. rabbitmq 开机启动&启动&停止
    	# 设置服务,开机自动启动
    	chkconfig rabbitmq-server on
    
    	# 启动服务
    	service rabbitmq-server start
    
    	# 停止服务
    	service rabbitmq-server stop
    
  5. 启用UI管理界面
    	# 开启管理界面插件
    	rabbitmq-plugins enable rabbitmq_management
    	# 防火墙打开 15672 管理端口
    	firewall-cmd --zone=public --add-port=15672/tcp --permanent
    	firewall-cmd --reload
    

    访问:http://XXX:15672 登录 分配权限

  6. 添加用户 并赋予权限
    账号:admin 密码:admin
    	# 添加用户
    	rabbitmqctl add_user admin admin
    
    	# 新用户设置用户为超级管理员
    	rabbitmqctl set_user_tags admin administrator
    
  7. 开放连接端口
    	# 打开客户端连接端口
    	firewall-cmd --zone=public --add-port=5672/tcp --permanent
    	firewall-cmd --reload
    

RabbitMQ 主要作用

服务解耦

  • 一般调用
    例如有以下服务,服务B、C需要从服务A取数据:
graph LR A[服务A] --数据--> B[服务B] A --数据--> C[服务C]

看似合理,但是如果我的需求一旦变更。
例如:需要从新的服务器获取数据,或者服务器增加、规模扩大等情况,那么就需要进行代码的修改等操作,这就是因为服务之间的高耦合性导致。

  • 消息中间件实现服务之间解耦(RabbitMQ)
    同样拿上面例子说明:
graph LR A[服务A] --数据-->B((RabbitMQ)) B --数据--> D[服务B] B --数据--> C[服务C]

这样我们两边数据只需要考虑数据存取即可,不必关心从谁取数据 &谁从我取数据
服务A向RabbitMQ发送数据 服务B、C从 RabbitMQ取数据,两边服务解耦

流量削峰

  • 例如:网站每秒约有300个请求
graph LR A[访问量] -- 300/s --> B((服务器))

例如这样我们的一台服务器每秒可以接受500请求,这样可以轻松应对

  • 如果是这样呢
    网站每秒访问量不变,服务器不变,但是总有那么一两个小时访问量每秒3000怎么办?
    解决方案一:增加服务器数量,但是为了一两个小时的流量高峰增加大量服务器显然不合适。
    解决方案二:消息中间件流量削峰,例如:
    RabbitMQ 的瞬时吞吐量非常大 将请求放在队列中,由服务器慢慢进行处理
graph LR A[访问量] --3000/s--> B[RabbitMQ] B-- 300/s --> C((服务器))

异步调用

  • 例如:点外卖,用寻找骑手模拟耗时操作
graph LR A[用户] --支付请求 耗时200ms--> B[支付成功 未找到骑手] B-- 寻找骑手 耗时30s --> C[骑手接单] C-- 业务执行完成返回提示 支付成功 --> A

支付成功是立即完成,但是寻找骑手可能是个耗时操作,这样就造成整条调用链路响应非常缓慢,这样整条链路耗时 30.2s

  • RabbitMQ 异步调用
graph LR A[用户] --支付请求 耗时200ms--> B[支付成功 未找到骑手] B-- 支付成功等待骑手接单-->A B--将寻找骑手操作放入队列 耗时 5ms-->C[RabbitMQ] C -- 寻找骑手 耗时30s --> D[骑手接单] D -- 骑手已接单 --> A

这样的话支付系统支付完成后,无需等待耗时操作,链路响应时长 205ms

RabbitMQ 基本概念

RabbitMQ是一种消息中间件,用于处理来自客户端的异步消息服务端将要发送的消息放入到队列池中。接收端可以根据RabbitMQ配置的转发机制接收服务端发来的消息。RabbitMQ依据指定的转发规则进行消息的转发、缓冲和持久化操作,主要用在多服务器间或单服务器的子系统间进行通信,是分布式系统标准的配置。

Exchange(交换机)

接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为。在RabbitMQ中,ExchangeType常用的有direct、Fanout和Topic三种。

还有一种 Headers交换机允许你匹配AMQP消息的header而非路由键,除此之外headers交换器和direct交换器完全一致,但性能却很差,几乎用不到,不做了解了就。
交换机

RabbitMQ六种工作模式(官网已经更新了第七种)

1. 简单模式

  • 一个生产者,一个消费者
graph LR A((生产者)) -- 发送 --> B[消息队列] B--接收-->C((消费者))
  • 新建maven项目 添加依赖,对这几种工作模式进行测试
<dependency>
	<groupId>com.rabbitmq</groupId>
	<artifactId>amqp-client</artifactId>
	<version>5.4.3</version>
</dependency>
  • 生产者
public class Test1 {
	public static void main(String[] args) throws Exception {
		//创建连接工厂,并设置连接信息
		ConnectionFactory f = new ConnectionFactory();
		f.setHost("192.168.64.5");
		f.setPort(5672);//可选,5672是默认端口缺省值
		f.setUsername("admin");
		f.setPassword("admin");

		/*
		 * 与rabbitmq服务器建立连接,
		 * rabbitmq服务器端使用的是nio,会复用tcp连接,
		 * 并开辟多个信道与客户端通信
		 * 以减轻服务器端建立连接的开销
		 */
		Connection c = f.newConnection();
		//建立信道
		Channel ch = c.createChannel();

		/*
		 * 声明队列,会在rabbitmq中创建一个队列
		 * 如果已经创建过该队列,就不能再使用其他参数来创建
		 * 
		 * 参数含义:
		 *   -queue: 队列名称
		 *   -durable: 队列持久化,true表示RabbitMQ重启后队列仍存在
		 *   -exclusive: 排他,true表示限制仅当前连接可用
		 *   -autoDelete: 当最后一个消费者断开后,是否删除队列
		 *   -arguments: 其他参数
		 */
		ch.queueDeclare("helloworld", false,false,false,null);

		/*
		 * 发布消息
		 * 这里把消息向默认交换机发送.
		 * 默认交换机隐含与所有队列绑定,routing key即为队列名称
		 * 
		 * 参数含义:
		 * 	-exchange: 交换机名称,空串表示默认交换机"(AMQP default)",不能用 null 
		 * 	-routingKey: 对于默认交换机,路由键就是目标队列名称
		 * 	-props: 其他参数,例如头信息
		 * 	-body: 消息内容byte[]数组
		 */
		ch.basicPublish("", "helloworld", null, "Hello world!".getBytes());

		System.out.println("消息已发送");
		c.close();
	}
}
  • 接收者
public class Test2 {
	public static void main(String[] args) throws Exception {
		//连接工厂
		ConnectionFactory f = new ConnectionFactory();
		f.setHost("192.168.64.5");
		f.setUsername("admin");
		f.setPassword("admin");
		//建立连接
		Connection c = f.newConnection();
		//建立信道
		Channel ch = c.createChannel();
		//声明队列,如果该队列已经创建过,则不会重复创建
		ch.queueDeclare("helloworld",false,false,false,null);
		System.out.println("等待接收数据");
		
		//收到消息后用来处理消息的回调对象
		DeliverCallback callback = new DeliverCallback() {
			@Override
			public void handle(String consumerTag, Delivery message) throws IOException {
				String msg = new String(message.getBody(), "UTF-8");
				System.out.println("收到: "+msg);
			}
		};
		
		//消费者取消时的回调对象
		CancelCallback cancel = new CancelCallback() {
			@Override
			public void handle(String consumerTag) throws IOException {
			}
		};
		//处理消息 
		//参数:要接收的队列名,是否自动确认,处理消息回调,不处理回调
		ch.basicConsume("helloworld", true, callback, cancel);
	}
}

2. 工作模式 work

  • 一个生产者,多个消费者,每个消费者获取到的消息唯一,轮询策略
    他的工作策略为,每个服务每次接收一条消息,进行分发,并不能做到负载均衡,需要进行额外的配置
  1. 将自动确认处理消息改为 false,处理完成后手动确认 在自动确认的情况下RabbitMQ并不能知晓当前服务器是否遇忙,只有确认了处理完成才会对服务重新分发
    关闭自动确认: channel.basicConsume("队列名", false, deliverCallback, cancelCallback);
//autoAck设置为false,则需要手动确认发送回执
channel.basicConsume("task_queue", false, deliverCallback, cancelCallback);

发送确认回执:channel.basicAck(message.getEnvelope().getDeliveryTag(), false);

//手动确认处理完成 给mq服务器发送回执
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
  1. 设置接收者每次接收的消息数量 1 每次处理一条配合手动确认使用
    设置每次接收的数据量: channel.basicQos(1);
  2. 如果为订单等敏感数据可以开启队列持久化策略,
    创建&获取持久化队列,channel.queueDeclare("队列名",true,false,false,null);
channel.queueDeclare("队列名",true,false,false,null);

发送信息时头信息添加 MessageProperties.PERSISTENT_TEXT_PLAIN

channel.basicPublish("","task_queue",MessageProperties.PERSISTENT_TEXT_PLAIN,s.getBytes())

发送者

public class Test1 {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.64.5");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        Connection connection =  connectionFactory.newConnection();

        Channel channel = connection.createChannel();
        /**
         * 参数说明:
         *  1. queue: 队列名称
         *  2. durable: 是否持久化队列 true持久化 服务重启后仍存在
         *  3. exclusive: 是否为独占队列 排他 true表示此队列仅当前连接可用
         *  4. autoDelete: 当最后一个连接断开时是否删除队列
         *  5. arguments: 其他参数
         */
        channel.queueDeclare("task_queue",true,false,false,null);

        while (true){
            System.out.println("输入:");
            String s = new Scanner(System.in).nextLine();
            /**
             * 参数说明:
             *   1. exchange: 交换机名称,空串表示默认交换机"(AMQP default)",不能用 null
             *   2. routingKey: 需要发送到那个队列
             *   3. props: 其他消息 例如头信息
             *   4. body: 消息内容数据 byte[] 类型
             */
            channel.basicPublish("","task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN,s.getBytes());

            System.out.println("消息已发送");
        }
    }
}

接收者:每遇到一个 . 停一秒模拟耗时操作

public class Test2 {
    public static void main(String[] args) throws Exception {
        //初始化连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.64.5");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");

        //建立连接
        Connection connection = connectionFactory.newConnection();
        //建立信道
        Channel channel = connection.createChannel();
        //创建队列 如果有则不创建
        channel.queueDeclare("task_queue", true, false, false, null);
        System.out.println("等待接收消息");
        //每次只处理一条数据
        channel.basicQos(1);
        //收到消息后处理消息的回调对象 lambda 表达式
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            String msg = new String(message.getBody(), StandardCharsets.UTF_8);
            System.out.println("收到:" + msg + "\n正在处理...");
            if (msg.endsWith(".")) {
                for (int i = 0; i < msg.length(); i++) {
                    if (msg.charAt(i) == '.') {
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
            //手动确认处理完成 给mq服务器发送回执 false关闭统一确认,逐条确认
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);

            System.out.println("处理完成!\n-------------------------------------------------");
        };

        //消费者取消时的回调对象 lambda 表达式
        CancelCallback cancelCallback = consumerTag -> {

        };

        //autoAck设置为false,则需要手动确认发送回执
        channel.basicConsume("task_queue", false, deliverCallback, cancelCallback);

    }
}
  • 打开 IDEA 允许 编辑运行设置 勾选 Allow parallel run,一个main方法运行多个
    勾选 Allow parallel run

消息确认(回执)

如果一个消费者没有返回消息确认就挂掉了(信道关闭,连接关闭或者TCP链接丢失),rabbitmq就会明白,这个消息没有被处理完成,rebbitmq就会把这条消息重新放入队列,如果在这时有其他的消费者在线,那么rabbitmq就会迅速的把这条消息传递给其他的消费者,这样就确保了没有消息会丢失。

这里不存在消息超时, rabbitmq只在消费者挂掉时重新分派消息, 即使消费者花非常久的时间来处理消息也可以

手动消息确认默认是开启的;

3.发布订阅模式(扇形交换机 fanout exchange)

一个生产者发送的消息会被多个消费者获取,由服务生产者发送给交换机交换机群发给订阅者

graph LR A[服务生产者] -- 消息 --> B((fanout exchange)) B --订阅--> C(服务消费者) B --订阅--> D(服务消费者)

服务提供者

public class Test1 {
    public static void main(String[] args) throws Exception {
    	//创建连接工厂并初始化
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.64.5");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
		//获取连接&新建
        Connection connection = connectionFactory.newConnection();
        //打开信道
        Channel channel = connection.createChannel();
        //创建名为 logs 的 扇形交换机 fanout 可以直接传交换机名称
        channel.exchangeDeclare("logs",BuiltinExchangeType.FANOUT);
		//循环输入
        while (true){
            System.err.println("请输入:");
            String s = new Scanner(System.in).nextLine();
            //发送数据 参数: 发送的交换机名,需要发送的路由键 || 队列,字节数组(消息)
            channel.basicPublish("logs","",null,s.getBytes());
            System.out.println("\n");
        }
    }
}

服务消费者

public class Test2 {
    public static void main(String[] args) throws Exception{
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.64.5");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //创建交换机 有则跳过
        channel.exchangeDeclare("logs","fanout");
        //创建队列
        //参数不指定默认为: 随机队列名,非持久,独占,无连接即删除,其他参数null
        //channel.queueDeclare("随机名",false,true,true,null);
        //.getQueue() 可以获取随机的队列名
        String queueName = channel.queueDeclare().getQueue();

        //绑定交换机和队列
        channel.queueBind(queueName,"logs","");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            String string = new String(message.getBody(), "UTF-8");
            System.out.println(string);
        };
        CancelCallback cancelCallback = consumerTag -> {

        };
        
        channel.basicConsume(queueName,true,deliverCallback,cancelCallback);
    }
}

4. 路由模式(直连交换机 direct exchange)

发送消息到交换机并且要指定路由key ,消费者将队列绑定到交换机时需要指定绑定key 当 路由key == 绑定key 可以将两个都理解为 绑定key,一个队列可以和多个路由key绑定,路由模式使用 direct exchange 直连交换机

graph LR A[服务生产者] -- 消息+key --> B((direct exchange)) B --key 绑定--> C(服务消费者) B --key 绑定--> D(服务消费者) B --key 绑定--> D

服务提供者

public class Test1 {
    public static void main(String[] args) throws Exception{
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.64.5");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        //创建、获取连接
        Connection connection = connectionFactory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();
        //创建交换机 direct 直连交换机
        channel.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
        while (true) {
            System.err.println("输入路由键:");
            String key = new Scanner(System.in).nextLine();
            System.err.println("请输入内容:");
            String msg = new Scanner(System.in).nextLine();
			//发送数据 参数: 发送的交换机名,需要发送的路由键 || 队列名,字节数组(消息)
            channel.basicPublish("direct_logs",key,null,msg.getBytes());
        }
    }
}

服务消费者

public class Test2 {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.64.5");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //创建交换机
        channel.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
        //创建队列并获取队列名
        String queueName = channel.queueDeclare().getQueue();
        //获取字符串并以逗号隔开
        System.err.println("请输入绑定键(多个请以逗号隔开):");
        String[] keys = new Scanner(System.in).nextLine().split(",");
        //绑定键 路由模式可以绑定多个
        for (String i : keys) {
            channel.queueBind(queueName, "direct_logs", i);
        }
        System.err.println("绑定完成,等待接收消息!");
        //lambda 表达式
        DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println(message.getEnvelope().getRoutingKey() + "绑定键收到消息:\n" + new String(message.getBody(), StandardCharsets.UTF_8) + "\n");
        
        CancelCallback cancelCallback = consumerTag -> {
        };
        channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
    }
}

5. 主题模式(主题交换机 topic exchange)

将路由键和某模式进行匹配,此时队列需要绑定在一个模式上,#匹配一个词或多个词,*只匹配一个词,可以看作为增强版 路由模式 可以进行通配,当路由key匹配到了一个队列的两个或多个绑定key,消息也只会发送一次

graph LR A[服务生产者] -- 消息+key 例如: aa.bb.cc --> B((topic exchange)) B --key 绑定 例如: aa.# --> C(服务消费者1) B --key 绑定 例如: aa.bb.*--> D(服务消费者2) B --key 绑定 例如: aa.*.cc--> D

上图所示情况为:三个绑定键 都可以匹配上 路由键 两者都会接受消息,但是 消费者2 只会接受到一条数据

服务提供者

public class Test1 {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.64.5");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
        while (true) {
            System.err.println("请输入路由键:");
            String key = new Scanner(System.in).nextLine();
            System.err.println("请输入内容:");
            String msg = new Scanner(System.in).nextLine();
            channel.basicPublish("topic_logs",key,null,msg.getBytes());
            System.out.println("发送成功!\n");
        }
    }
}

服务消费者

public class Test2 {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.64.5");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
        String queueName = channel.queueDeclare().getQueue();
        System.err.println("请输入绑定键(多个请以逗号隔开):");
        String[] keys = new Scanner(System.in).nextLine().split(",");
        for (String key:keys) {
            //绑定
            channel.queueBind(queueName,"topic_logs",key);
        }

        DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println(message.getEnvelope().getRoutingKey() + "绑定键接收到消息:\n" + new String(message.getBody(), StandardCharsets.UTF_8));
        CancelCallback cancelCallback = consumerTag -> {
        };
        channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
    }
}

6.RPC 模式

RPC 模式工作方式:

  • 对于RPC请求,设置为仅为请求创建的匿名独占队列,客户端发送一条带有两个属性的消息:replyTo,和correlationId,设置为每个请求的惟一id值。
  • 请求被发送到rpc_queue队列。
    RPC工作进程(即:服务器)在队列上等待请求。当一个请求出现时,它执行任务,并使用replyTo字段中的队列将结果发回客户机。
  • 客户机在回应消息队列上等待数据。当消息出现时,它检查correlationId属性。如果匹配请求中的值,则向程序返回该响应数据

消息属性 Message Properties

  • AMQP 0-9-1协议定义了消息的14个属性。大部分属性很少使用,下面是比较常用的4个:
    deliveryMode:将消息标记为持久化(值为2)或非持久化(任何其他值)。
    contentType:用于描述mime类型。例如,对于经常使用的JSON格式,将此属性设置为:application/json。
    replyTo:通常用于指定回调队列。
    correlationId:将RPC响应与请求关联起来非常有用。

服务端

public class Test2 {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.64.5");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare("rpc_queue", false, false, false, null);
        //清除队列内容
        channel.queuePurge("rpc_queue");
        //每次接受一条
        channel.basicQos(1);

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            //获取传来的参数
            String s = new String(message.getBody(), StandardCharsets.UTF_8);
            //转换为int
            int i = Integer.parseInt(s);
            //计算结果并转回字符串
            String res = String.valueOf(fbnq(i));
            //构建 correlationId 标识 将发送来的标识获取到并进行构建
            BasicProperties build = new BasicProperties.Builder().correlationId(message.getProperties().getCorrelationId()).build();
            //向客户端发送结果
            channel.basicPublish("", message.getProperties().getReplyTo(), build, res.getBytes(StandardCharsets.UTF_8));
            //执行完毕确认
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        };
        CancelCallback cancelCallback = consumerTag -> {
        };
        //处理接收参数 不自动确认
        channel.basicConsume("rpc_queue", false, deliverCallback, cancelCallback);
    }

    /**
     * 递归求斐波那契数 模拟耗时操作
     *
     * @param i 参数求第几个
     * @return 结果
     */
    private static long fbnq(int i) {
        if (i == 1 || i == 2) return 1;
        System.err.println(i);
        return fbnq(i - 1) + fbnq(i - 2);
    }
}

服务消费者

public class Test1 {

    public static void main(String[] args) throws Exception {
        System.out.println("请问求第几个斐波那契数列:");
        String call = call(new Scanner(System.in).nextInt() + "");
        System.out.println("结果:" + call);
    }

    private static String call(String msg) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.64.5");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //获取随机队列 不持久化,独占,无连接即删除 并返回队列名
        String queueName = channel.queueDeclare().getQueue();
        //生成随机的关联id
        String uuid = UUID.randomUUID().toString();

        //设置两个参数:
        //构建所需内容
        //1. 请求和响应的关联id   correlationId
        //2. 传递响应数据的queue  replyTo
        BasicProperties props = new BasicProperties.Builder().correlationId(uuid).replyTo(queueName).build();

        //向rpc_queue队列发送数据 默认交换机
        channel.basicPublish("", "rpc_queue", props, msg.getBytes());
        //ArrayBlockingQueue 阻塞队列,先进先出,存放收到的结果保证发送和接收的顺序
        BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1);


        DeliverCallback deliverCallback = (consumerTag, message) -> {
            //判断返回的uuid是否为发送的uuid 如果是将他添加进 queue 这个阻塞队列
            if (message.getProperties().getCorrelationId().equals(uuid)) {
                //获取结果
                String string = new String(message.getBody(), StandardCharsets.UTF_8);
                //将结果存入阻塞队列
                queue.offer(string);
            }
        };
        CancelCallback cancelCallback = consumerTag -> {

        };

        //调用两个匿名内部类处理响应结果
        channel.basicConsume(queueName, true, deliverCallback, cancelCallback);

        //从阻塞队列中取出
        String res = queue.take();
        //关闭链接
        channel.close();
        connection.close();
        //返回结果
        return res;
    }
}

virtual host

在RabbitMQ中叫做虚拟消息服务器VirtualHost,每个VirtualHost相当于一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通

创建虚拟主机

创建

将虚拟机授权用户

选择新创建的虚拟主机
授权

SpringBoot 使用RabbitMQ 完成数据存取 Demo

添加依赖

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置连接信息

spring:
  rabbitmq:
    host: 192.168.64.5
    port: 5672
    virtualHost: /abc
    username: admin
    password: admin
    # 开启手动确认
    listener:
      simple:
        acknowledge-mode: manual

编辑启动类

	/**
	 * 新建SpringBoot封装的一个Queue实例,包含队列的参数
	 * 并把Queue实例访入Spring容器
	 *
	 * RabbitMQ的自动配置类,会从Spring容日获取所有的Queue实例
	 * 并创建这些队列
	 * @return
	 */
	@Bean
	public Queue getQueue(){
		//创建一个队列 队列名:orderQueue 是否为持久队列:true
		return new Queue("队列名",true);
	}

编辑需要使用的类

	//RabbitAutoConfiguration中创建了AmpqTemplate实例
	@Autowired
	private AmqpTemplate amqpTemplate;
	//存放数据
	public void demo(Demo demo){
		//在rabbitmq队列中放入数据 数据不必转化为二进制 底层会自动转换
		amqpTemplate.convertAndSend("队列名",demo);
		//从队列中获取数据
	}

	//获取数据 方法参数为要接受的数据类型
	//添加该注解后,会从指定的orderQueue接收消息,
    //并把数据转为 PdOrder 实例传递到此方法
	@RabbitListener(queues="队列名")
	public void demo(Demo demo){
		//从队列中获取数据
		System.out.print(demo);
	}

开启手动确认

配置文件

spring:
  rabbitmq:
    host: 192.168.64.5
    port: 5672
    virtualHost: /abc
    username: admin
    password: admin
    # 开启手动确认
    listener:
      simple:
        acknowledge-mode: manual

Java demo

//获取数据 方法参数为要接受的数据类型
	//添加该注解后,会从指定的orderQueue接收消息,
    //并把数据转为 PdOrder 实例传递到此方法
	@RabbitListener(queues="队列名")
	public void demo(Demo demo, Channel channel, Message message){
		//从队列中获取数据
		System.out.print(demo);
		channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
	}
4
博主关闭了所有页面的评论