RabbitMQ – 基础使用说明

简介

RabbitMQ 是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收, 存储和转发消息数据。

 

案例代码下载:https://www.tzming.com/wp-content/uploads/2023/filesdown/RabbitMQDemo.rar

 

安装

erlang 运行时及 RabbitMQ

因为RabbitMQ 是基于erlang 语言开发的,因此要安装RabbitMQ之前需要安装erlang运行库软件

CentOS 7.9:
rpm -ivh erlang-21.3.1.el7.x86_64.rpm
yum install socat -y  // 安装 erlang 的依赖项
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm  // 安装rabbitmq包

添加开机启动 RabbitMQ 服务
chkconfig rabbitmq-server on

Ubuntu 20.04 
yum install socat -y  // 安装 erlang 的依赖项
安装 新版 erlang

# 1. 在/etc/apt/sources.list.d 下新建erlang-solution.list文件
sudo touch /etc/apt/sources.list.d/erlang-solution.list

# 2. 加入如下内容: deb https://packages.erlang-solutions.com/ubuntu trusty contrib
echo "deb https://packages.erlang-solutions.com/ubuntu trusty contrib" | sudo tee /etc/apt/sources.list.d/erlang-solution.list

# 3 安装key
wget -O- https://packages.erlang-solutions.com/ubuntu/erlang_solutions.asc | sudo apt-key add -

# 4 安装方式
sudo apt-get update
sudo apt-get install erlang

安装最新版 RabbitMQ-server
sudo apt-get -y install wget
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.12.4/rabbitmq-server_3.12.4-1_all.deb
sudo dpkg -i rabbitmq-server_3.12.4-1_all.deb

启动服务
/sbin/service rabbitmq-server start

查看服务状态
/sbin/service rabbitmq-server status

停止服务
/sbin/service rabbitmq-server stop

开启web管理插件(先停止运行再执行)
rabbitmq-plugins enable rabbitmq_management
用默认账号密码(guest)访问地址 http://xxxx:15672/出现权限问题

添加账户
rabbitmqctl add_user admin 123

设置用户角色
rabbitmqctl set_user_tags admin administrator

设置用户权限
set_permissions [-p <vhostpath>] <user> <conf> <write> <read>  // 设置权限规则
rabbitmqctl set_permissions -p "/" admin ",*" ",*" ",*"
用户user admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限

查看当前用户和角色
rabbitmqctl list_users

 

引入Maven依赖

    <!--指定 jdk 编译版本-->
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <dependencies>
        <!--rabbitmq 依赖客户端-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.8.0</version>
        </dependency>
        <!--操作文件流的一个依赖-->
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.6</version>
        </dependency>
    </dependencies>

 

基本操作

 

生产者开发

生产者主要为生产消息,并把消息发送到MQ中的过程。

public class Producer {
    // 定义要使用的队列
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] args) throws Exception {
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setUsername("admin");
        factory.setPassword("123");
        //channel 实现了自动 close 接口 自动关闭 不需要显示关闭
        //1.创建一个连接
        //2.获取一个默认的信道
        try(Connection connection = factory.newConnection();
            Channel channel = connection.createChannel()){
            /**
             * 生成一个队列
             * 1.队列名称
             * 2.队列里面的消息是否持久化 默认消息存储在内存中,默认不持久化
             * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
             * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
             * 5.队列的其他参数
             */
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            String message="hello world";

            /**
             * 发送一个消息
             * 1.发送到那个交换机(本次不考虑使用默认交换机,所以不写)
             * 2.路由的 key 是哪个(发到哪个队列中)
             * 3.其他的参数信息
             * 4.发送消息的消息体
             */
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println("消息发送完毕");
        }
    }
}

 

 

消费者开发

public class Consumer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("182.92.234.71");
        factory.setUsername("admin");
        factory.setPassword("123");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        System.out.println("等待接收消息	");
        //推送的消息如何进行消费的接口回调
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            System.out.println(message);
        };
        //取消消费的一个回调接口 如在消费的时候队列被删除掉了
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("消息消费被中断");
        }
        
        /**
         *	消费者消费消息
         *	1.消费哪个队列
         *	2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
         *	3.消费者未成功消费的回调
         */
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);

    }
}

 

 

轮询队列处理方式

当生产者产生非常多的消息需要被消费时,凭单个消费者无法大量处理消息
因此我们可以创建多个线程的消费者,通过轮询的方式向多个线程消费者发送消息
轮询方式是指,轮流派送任务给所在的线程,假如现在有两个线程处理消息,则轮询就是,你一个我一个你一个我一个这样的方式派发任务处理。

Tasker 产生大时生产数据的类

package cn.unsoft.two;

import cn.unsoft.utils.RabbitMQUtil;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

/**
 * 生产数据类
 */
public class Tasker {
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtil.getChannel();
        // 声明队列 
        channel.queueDeclare(RabbitMQUtil.QUEUE_NAME,false,false,false,null);
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String message = scanner.next();

            // 通过输入来生产处理消息
            channel.basicPublish("", RabbitMQUtil.QUEUE_NAME, null, message.getBytes());
        }
    }
}

 

Worker 能产生多个线程的消费者的类

package cn.unsoft.two;

import cn.unsoft.utils.RabbitMQUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 多线程消费者
 */
public class Worker {
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtil.getChannel();
        DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
            System.out.println("接收并处理消息:" + new String(message.getBody()));
        };

        CancelCallback cancelCallback = (String consumerTag) -> {
            System.out.println("已取消消费数据");
        };

        System.out.println("当前是线程A");
//        System.out.println("当前是线程B");
        channel.basicConsume(RabbitMQUtil.QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
}

本类需要运行两个以上的,这样就可以做到轮询处理生产数据了。

 

消息应答

假如消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费这的消息,因为它无法接收到。

为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。

 

自动应答

消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制, 当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用

通俗的讲,自动应答是:选答应收到,再做事情,若在做事情的过程中出了问题,消息就被丢失了。

 

手动应答

手动应答可以避免自动应答带来的消息丢失问题,我们可以把消息处理完成后,再进行应答,当在处理消息的过程中发生错误时,应答自然就不会提交,这时RabbitMQ会认为消息未被处理,会重新把消息分配给其它能处理的消费者线程上。

手动应答有三种情况:

Channel.basicAck(deliveryTag,multiple)
用户处理完之后需要手动提交处理成功应答

Channel.basicNack(deliveryTag,multiple)
用户处理完之后,明确提交处理失败应签

Channel.basicReject(deliveryTag)
用户明确的提交拒绝处理应答

basicNack 与 basicReject 的区别在于,basicNack是处理后明确提交失败的应答,而basicReject是明确拒绝处理应答。

multiple 是指是否批量应答,如果为 true 则表示,该 Channel 上的所有消息会被一次性完成应答,提高性能。

比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时5-8 的这些还未应答的消息都会被确认收到消息应答。

建议不要使用批量应答。

 

消息持久化

我们已经看到了如何处理任务不丢失的情况,但是如何保障当 RabbitMQ 服务停掉以后消

息生产者发送过来的消息不丢失。默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久化

队列持久化

之前我们创建的队列都是非持久化的,rabbitmq 如果重启的化,该队列就会被删除掉,如果要队列实现持久化 需要在声明队列的时候把 durable 参数设置为持久化

但是需要注意的就是如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新创建一个持久化的队列,不然就会出现错误

当使用了队列持久化后,在管理页面中会出现一个 D 的标示

这个时候即使重启 rabbitmq 队列也依然存在

 

消息持久化

要想让消息实现持久化需要在消息生产者修改代码,MessageProperties.PERSISTENT_TEXT_PLAIN 添加这个属性

将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候 但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。如果需要更强有力的持久化策略,参考发布确认章节

 

不公平分发

在最开始的时候我们学习到 RabbitMQ 分发消息采用的轮训分发,但是在某种场景下这种策略并不是很好,比方说有两个消费者在处理任务,其中有个消费者 1 处理任务的速度非常快,而另外一个消费者 2 处理速度却很慢,这个时候我们还是采用轮训分发的化就会到这处理速度快的这个消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下其实就不太好,但是RabbitMQ 并不知道这种情况它依然很公平的进行分发。

为了避免这种情况,我们可以设置参数 channel.basicQos(1);

不设置或设置为0时,则表示通过轮询分发。

意思就是如果这个任务我还没有处理完或者我还没有应答你,你先别分配给我,我目前只能处理一个任务,然后 rabbitmq 就会把该任务分配给没有那么忙的那个空闲消费者,当然如果所有的消费者都没有完成手上任务,队列还在不停的添加新任务,队列有可能就会遇到队列被撑满的情况,这个时候就只能添加新的 worker 或者改变其他存储任务的策略

 

预取值

当 channel.basicQos 的值设为0或不设值时,MQ会默认为是轮询方式进行任务分发

当 channel.basicQos 的值设为1时,MQ会默认对该线程设置不公平分发,其线程每次只会存储1条任务

当 channel.basicQos 的值设大于1时,MQ会对该线程预存储相应数量的任务,该线程所预存储的数量上限值为basicQos 设置的值。

假如 A 线程设置basicQos 为 2,B线程设置basicQos 为 5 时,则表示,A线程最多能预存储2条任务,B线程最多能预存5条任务,当A线程处理比较慢时,A中会最多被分配2条任务,这2条任务被分配给A线程后,B线程是不会接收到的。

basicQos 细节:

假设A线程处理速度比较快,当MQ在向A线程发送第三条任务之前,A就把前两条任务处理完了,则A线程预存池中任务线程数为0,那么MQ会再次给A线程发送任务,并非A线程就只执行2条任务,basicQos 指的是MQ向A线程发送任务到任务池时最多只能放2条。

 

发布确认

以往的生产者发送消息到队列,都只是单向的由生产者发到队列,至于生产者是否真正的发到队列里了,生产者是不知道的,如果当生产者在发送过程中,RabbitMQ宕机了,这时这条消息将被丢失,为了解决生产者发送的消息能确保发送成功,需要使用发布确认机制。

发布确认原理是,生产者对队列发送消息,队列接收到数据后需要对生产者发送确认信息,当生产者收到队列发来的确认信息后,才会被认定该消息被直接的发布。

开启发布确认

发布确认默认是没有开启的,如果要开启需要调用方法 confirmSelect,每当你要想使用发布确认,都需要在 channel 上调用该方法

 

单个确认发布

发布确认机制有三种方式,方式一是单个确认发布

单个确认发布指的是,每一条消息被发送到队列后,队列都会做一次发布确认给到生产者,优点是每一条消息都会明确知道发布是否成功,缺点是性能低下,吞吐量不高。

案例如下:

    // 单个确认案例
    public static void ConfirmSimple() throws IOException, TimeoutException, InterruptedException {
        String queue_name = UUID.randomUUID().toString();

        Channel channel = RabbitMQUtil.getChannel();
        // 声明队列
        channel.queueDeclare(queue_name,false,false,false,null);
        // 声明发布确认
        channel.confirmSelect();
        int confirmCount = 1000;
        for (int i = 0; i < confirmCount; i++) {
            channel.basicPublish("",queue_name,null,String.valueOf(i).getBytes());
            // 对每一次后消息发布后都要等待发布确认
            boolean flag = channel.waitForConfirms();
            if (!flag){
                System.out.println("发布未得到确认");
            }
        }

    }

 

 

批量确认发布

因为批量确认的操作有点不太理解,暂时不写。

 

异步确认发布

原理,生产者发布消息给队列时,不需要考虑确认问题,尽管发布就可。当队列接收到消息后,会主动的向生产者发送确认回调,生产者可在lambda中接收到确认回调

案例:

    // 异步发布确认
    public static void ConfirmAsync() throws IOException, TimeoutException {
        String queue_name = UUID.randomUUID().toString();
        Channel channel = RabbitMQUtil.getChannel();
        channel.queueDeclare(queue_name, false, false, false, null);
        channel.confirmSelect();


        // 定义确认完成的异步回调
        ConfirmCallback confirmCallback = (long deliveryTag, boolean multiple) -> {
            if (multiple) {
                System.out.println("批量异步确认成功,最后的 tag 为 " + deliveryTag);
            } else {
                System.out.println("异步确认成功,当前确认的 tag 为 " + deliveryTag);
            }
        };

        // 定义未确认的异步回调
        ConfirmCallback nackCallback = (long deliveryTag, boolean multiple) -> {
            if (multiple) {
                System.out.println("批量异步确认失败,最后的 tag 为 " + deliveryTag);
            } else {
                System.out.println("异步确认失败,当前确认失败的 tag 为 " + deliveryTag);
            }
        };

        // 新增一个异步确认发布的回调
        channel.addConfirmListener(confirmCallback, nackCallback);

        int confirmCount = 1000;
        for (int i = 0; i < confirmCount; i++) {
            channel.basicPublish("", queue_name, null, (i + "").getBytes());
        }

    }

 

 

关于异步确认判断未成功确认的消息tag

在使用异步确认时,当出现在某些发布存在失败的情况(在失败回调中会收到失败通知),我们需要知道到底是那个消息发布失败了,同时对消息进行补发操作,这时因为回调和发布消息不在同一个线程上,可能会出现线程安全的问题,我们可以使用线程安全的Map对象,在发布时记录发布的消息,当发布确认的消息我们可以删除Map对象中对应的tag,当发布完成后,我们可以通过线程安全的Map中取得未被删除的tag,我们就知道那些消息未发布成功(可在失败回调中知道是否有发布未成功)

ConcurrentSkipListMap 为线程安全的Map

    // 异步发布确认
    public static void ConfirmAsync() throws IOException, TimeoutException {
        String queue_name = UUID.randomUUID().toString();
        Channel channel = RabbitMQUtil.getChannel();
        channel.queueDeclare(queue_name, false, false, false, null);
        channel.confirmSelect();

        // 创建一个线程安全的 Map 用于记录发布的消息,在发布成功时删除成员,那么我们就能得到发布不成功的成员了
        ConcurrentSkipListMap<Long,String> confirmMap = new ConcurrentSkipListMap<>();

        // 定义确认完成的异步回调
        ConfirmCallback confirmCallback = (long deliveryTag, boolean multiple) -> {
            if (multiple) {
                // 取出这个消息tag之前的所有map成员
                ConcurrentNavigableMap<Long, String> longStringConcurrentNavigableMap = confirmMap.headMap(deliveryTag);
                longStringConcurrentNavigableMap.clear();
                System.out.println("批量异步确认成功,最后的 tag 为 " + deliveryTag);
            } else {
                confirmMap.remove(deliveryTag);
                System.out.println("异步确认成功,当前确认的 tag 为 " + deliveryTag);
            }
        };

        // 定义未确认的异步回调
        ConfirmCallback nackCallback = (long deliveryTag, boolean multiple) -> {
            if (multiple) {
                System.out.println("批量异步确认失败,最后的 tag 为 " + deliveryTag);
            } else {
                System.out.println("异步确认失败,当前确认失败的 tag 为 " + deliveryTag);
            }
        };

        // 新增一个异步确认发布的回调
        channel.addConfirmListener(confirmCallback, nackCallback);

        int confirmCount = 1000;
        for (int i = 0; i < confirmCount; i++) {
            channel.basicPublish("", queue_name, null, (i + "").getBytes());

            // 对每一次发布的消息都存到线程安全的Map中
            confirmMap.put(channel.getNextPublishSeqNo(),(i + ""));
        }

    }

 

交换机

在上面章节中,我们知道,通过RabbirMQ创建队列,向队列中发送消息,就能使多个消费者轮流或不公平分发任务等方式,去消费队列中的消息,但这种消费消息模式是基于不重复原则,即每个消费者拿到的消息都是队列中唯一的,当A消费者拿到队列中的某个消息时,B消费者是绝对拿不到这个消息的,避免了重复消费。

但有时我们在某些业务逻辑上,希望它能让我们有规律的指定消息的分发模式,这时单只有队列是无法做到的。这时我们需要在中间加设一个交换机,我们控制交换机,由交换机决定这条消息是以广播的形式发布,还是把消息单独发布给谁去处理。

通过接入交换机后,我们所发布的消息将不会直接发布到队列中,而是先发布到交换机中,由交换机去管理队列和消息发布模式。

无名交换机(无名 Exchange)

在RabbitMQ中,存在着一个默认的无名交换,当我们发布消息时,在交换机处设置为空时,则表示我们的消息会被发布到无名交换机中,这类消息就如上面章节使用那样,消息会直接发布到队列中去。

 

绑定队列

如同网络交换机设备一样,一个交换机可以连接多个网口。也就是说,MQ的一个交换机对应多个队列,而这些队列需要绑定到交换机中,当这些队列绑定到交换机后,这些队列就受到了该交换机的管理。

我们可以通过以下方法绑定交换机与队列的关系:

 

临时队列

在上面的章节我们都是属于声明长久化的队列,即使消费者断开连接了,队列依然还在。所谓的临时队列是指当消费者连接断开后,队列就会自动删除。

获取临时的队列可以通过以下代码获得

String queueName = channel.queueDeclare().getQueue();

返回值是该临时队列的名称。

 

 

Fanout 发布、订阅模式

Fanout 模式是交换机中的某中一种模式,它类似于广播模式,也就是说,当生产者发送消息给交换机后,该交换机中所绑定的所有队列,都会被收到同样的消息,类似于聊天群,当一个人发消息时,群里所有成员都会收到同样的消息。同理,所有队列都收到同样消息,意味着连接到该交换机的所有消费者都会处理同样的消息。

消费者1和消费者2分别创建不同的临时队列,并绑定交换机,方式为Fanout,这时是不需要加 routerKey 的,生产者只要发布消息,消费者1和消费者2都会同时收到消息。

案例代码如下:

生产者创建交换机 logs , 模式为 fanout:

/**
 * 生产者类
 * 生产者产生消息后把消息发送给交换机
 */
public class EmitLogs {
    // 声明交换机名称
    public static final String exchangeDeclare = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtil.getChannel();
        // 使用发布、订阅模式
        channel.exchangeDeclare(exchangeDeclare, BuiltinExchangeType.FANOUT);

        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String message = scanner.next();
            // 向交换机中所有绑定了的队列发送同样的消息,因为交换机的绑定key为空
            channel.basicPublish(exchangeDeclare, "", null, message.getBytes("UTF-8"));
            System.out.println("向交换机中发送消息:" + message);
        }
    }
}

 

消费者1和消费者2创建临时队列,并把队列绑定到交换机 logs 中

/**
 * 消费者01
 *
 */
public class ReceiveLogs01 {
    public static final String exchange_name = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = RabbitMQUtil.getChannel();
        // 消费者1创建一个临时队列
        String queueName = channel.queueDeclare().getQueue();
        // 消费者1创建的队列绑定到交换机 logs 中
        channel.queueBind(queueName,exchange_name,"");
        channel.basicConsume(queueName, (String consumerTag, Delivery message)->{
            System.out.println("消费者1收到消息:"+new String(message.getBody()));
        },consumerTag -> {});

    }
}

 

/**
 * 消费者02
 */
public class ReceiveLogs02 {
    public static final String exchange_name = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = RabbitMQUtil.getChannel();
        // 消费者2创建一个临时队列
        String queueName = channel.queueDeclare().getQueue();
        // 消费者2创建的队列绑定到交换机 logs 中
        channel.queueBind(queueName, exchange_name, "");
        channel.basicConsume(queueName, (String consumerTag, Delivery message) -> {
            System.out.println("消费者2收到消息:" + new String(message.getBody()));
        }, consumerTag -> {
        });

    }
}

 

注意,当使用 fanout 模式时,队列中不需要加入 routerKey. 因为 routerKey 是队列绑定交换机的别名,如果我们的队列中设置了 routerKey 则在发布消息时,就接收不到广播了,如果需要接收,就变成了路由模式,即 Direct 模式。

注:如果使用 Fanout 模式的同时,也设置 RouterKey 时。RouterKey 会失效,这时,会采用 Fanout 订阅模式。

 

Direct 直接模式(路由模式)

在某些情况下,我们希望把消息发布给指定的队列中,由指定的消费者进行处理,这时我们就可以设置交换机,并绑定队列,在绑定队列时,需要设置绑定的队列别名RouterKey,这样只要指定交换机中的某个RouterKey时,交换机就会把消息发送给指定RouterKey的队列中,实现准确控制消息流向。

案例如下:

生产者使用 Direct 模式

/**
 * 生产者
 * 使用 Direct 模式发送消息
 */
public class EmitLogs {
    public static final String exchangeName = "logs_direct";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtil.getChannel();

        // 声明交换机,并把交换机的模式设置为 Direct 直接模式
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT);

        Scanner scanner = new Scanner(System.in);

        while (scanner.hasNext()) {
            String message = scanner.next();
            // 分别使用不同的 routingKey 发布消息,只有绑定了 logs 交换机下的别名为 c1 或 c2 的队列才会接收到对应的消息
            channel.basicPublish(exchangeName, "c1", null, ("针对c1的消息" + message + "c1").getBytes());
            channel.basicPublish(exchangeName, "c2", null, ("针对c2的消息" + message + "c2").getBytes());
        }
    }
}

 

消费者1和消费者2把队列绑定到交换机中,并使用不同的别名key

/**
 * 消费者01
 *
 */
public class ReceiveLogs01 {
    public static final String exchange_name = "logs_direct";

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = RabbitMQUtil.getChannel();
        // 消费者1创建一个临时队列
        String queueName = channel.queueDeclare().getQueue();
        // 消费者1创建的队列绑定到交换机 logs 并设置队列的别名c1,此时这个消费者只能接收来自 logs 交换机的 c1 的消息
        channel.queueBind(queueName,exchange_name,"c1");
        channel.basicConsume(queueName, (String consumerTag, Delivery message)->{
            System.out.println("消费者1收到消息:"+new String(message.getBody()));
        },consumerTag -> {});

    }
}

 

/**
 * 消费者02
 */
public class ReceiveLogs02 {
    public static final String exchange_name = "logs_direct";

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = RabbitMQUtil.getChannel();
        // 消费者2创建一个临时队列
        String queueName = channel.queueDeclare().getQueue();
        // 消费者2创建的队列绑定到交换机 logs 并设置队列的别名c1,此时这个消费者只能接收来自 logs 交换机的 c2 的消息
        channel.queueBind(queueName, exchange_name, "c2");
        channel.basicConsume(queueName, (String consumerTag, Delivery message) -> {
            System.out.println("消费者2收到消息:" + new String(message.getBody()));
        }, consumerTag -> {
        });

    }
}

 

此时当生产者发送消息到 logs 的 c1 时,只有绑定了队列别名为 c1 的消费者能接收到消息

 

注:如果使用 Fanout 模式的同时,也设置 RouterKey 时。RouterKey 会失效,这时,会采用 Fanout 订阅模式。

 

Topis 主题模式

与 direct 模式的区别

  • direct 模式是指定向某个带有 RoutingKey 的队列发送消息,但仅能对一个RoutingKey队列发送

与 fanout 模式的区别

  • fanout 模式是不管队列中的 RoutingKey ,反正只有在交换机中的队列都统一发送

topic 模式

  • topic 模式是对 direct 模式的升级版,它会对 RoutingKey 作通配符,所有匹配成功的 RoutingKey 的队列都会收到消息

通配符单词使用 .(点) 做隔开,如 RoutingKey = aaa.bbb.ccc

  • * (星号)通配符是匹配 单个单词,如 RoutingKey = *.bbb.* 可以匹配 aaa.bbb.ccc 111.bbb.ccc 等等
  • # (井号)通配符是匹配 0个或多个单词,如 RoutingKey = aaa.# 可以匹配 aaa 、aaa.bbb.ccc 、aaa.bbb.ccc.ddd 、aaa.ccc 等等

案例代码如下:

生产者中使用 topic 模式

/**
 * 使用 Topic 模式的生产者
 */
public class TopicLogs {
    public static final String exchange_name = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = RabbitMQUtil.getChannel();
        // 设置为 topic 模式
        channel.exchangeDeclare(exchange_name, BuiltinExchangeType.TOPIC);

        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String message = scanner.next();
            // 发送的消息中RoutingKey为 cn.unsoft.rabbit 那么,以下的通配符会被通配
            /**
             * cn.#
             * cn.*.*
             * *.unsoft.*
             * *.*.rabbit
             * #.rabbit
             * *.unsoft.#
             */
            // 以下通配符不会被通配
            /**
             * cn.*
             * cn.*.*.*
             * unsoft.*
             */
            channel.basicPublish(exchange_name, "cn.unsoft.rabbit", null, message.getBytes());
            // 以下只能匹配到c1
            channel.basicPublish(exchange_name,"cn.unsoft.rabbit.tzming",null,message.getBytes());
            // 以下只能匹配到c2
            channel.basicPublish(exchange_name,"com.tzming.rabbit",null,message.getBytes());
            System.out.println("发送消息成功");
        }
    }
}

 

消费者1和消费者2通过设置通配符接收特定 routerKey 的消息

/**
 * 消费者01
 * 使用 topic 模式
 */
public class ReceiveLogs01 {
    public static final String exchange_name = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = RabbitMQUtil.getChannel();
        // 消费者1创建一个临时队列
        String queueName = channel.queueDeclare().getQueue();
        // 消费者1创建的队列绑定到交换机 logs 中
        channel.queueBind(queueName,exchange_name,"cn.unsoft.#");
        channel.basicConsume(queueName, (String consumerTag, Delivery message)->{
            System.out.println("消费者1收到消息:"+new String(message.getBody())+ "当前的routingKey:" + message.getEnvelope().getRoutingKey());
        },consumerTag -> {});

    }
}

 

/**
 * 消费者02
 * 使用 topic 模式
 */
public class ReceiveLogs02 {
    public static final String exchange_name = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = RabbitMQUtil.getChannel();
        // 消费者2创建一个临时队列
        String queueName = channel.queueDeclare().getQueue();
        // 消费者2创建的队列绑定到交换机 logs 中
        channel.queueBind(queueName, exchange_name, "#.rabbit");
        channel.basicConsume(queueName, (String consumerTag, Delivery message) -> {
            System.out.println("消费者2收到消息:" + new String(message.getBody()) + "当前的routingKey:" + message.getEnvelope().getRoutingKey());
        }, consumerTag -> {
        });

    }
}

 

死信队列

我们平时在生产者中生产消息发给交换机,由交换机发给队列,由消费者在队列中取出消息并进行消费。

但有几种情况,可能会让队列中的某些或某个消息并不能正确的地消费,其原因有如下三大类

1.消息TTL存活时间超时而未被消费的

2.消息被消费者明确拒绝的

3.队列存储达到上限的

这类不能被消费的消息,就被称为死信消息,这些死信消息我们应该把它们转传到死信队死加以后续处理。

死信队列实际是一个普通的队列,只是这个队列用于存储来自其它队列所产生的死信消息,然后配备一个消费者专问消费死信队列中的消息。

上图中的解析:

1.Producer 是生产者,它负责生产消息给交换机 normal_exchange,而队列 normal_queue 与 交换机 normal_exchange 绑定,别名为 zhangsan,队列 normal_queue 的消息由 C1 进行消费。

2.其中若某些原因,C1没有成功消费来自生产者的消息,因此这些消息应当转存给 死信交换机 dead_exchange,而死信队列 dead_queue 与 死信交换机 dead_exchange 进行绑定,别名为 lisi,死信队列 dead_queue 中的消息由 C2 进行消费

针对以上的图,我们来创建这三者的关系代码:

首先创建消费者1,当它出现以上三大类情况下,应该把死信消息转发至死信交指机中的死信队列中去,详细代码如下:

/**
 * 消费者1
 * 消费者1对正常接收数据,和当出现问题时,做死信转发工作
 */
public class Consumer01 {

    // 创建正常交换机和死信交换机
    public static final String normal_exchange = "normal_exchange";
    public static final String dead_exchange = "dead_exchange";


    // 创建正常的 RoutingKey 和死信 RoutingKey
    public static final String normal_key = "zhangsan";
    public static final String dead_key = "lisi";

    // 创建正常的队列,和死信队列
    public static final String normal_queue = "normal_queue";
    public static final String dead_queue = "dead_queue";


    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = RabbitMQUtil.getChannel();

        // 声明正常交换机和死信交换机
        channel.exchangeDeclare(normal_exchange, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(dead_exchange, BuiltinExchangeType.DIRECT);

        /**
         * 声明正常的队列和死信队列
         * 重点:
         * 当队列中的消息无法被消费时,我们应该让正常的队列中的死信消息转到死信队列中
         */
        Map<String, Object> arguments = new HashMap<String, Object>() {{
            // 声明出现死信时,把死信传给那个交换机中
            put("x-dead-letter-exchange", dead_exchange);
            // 声明死信传到那个 RoutingKey 中。如果不设置,死信到达死信交换机后,就不知道该传到那个队列中了。
            put("x-dead-letter-routing-key", dead_key);
            // 声明队列中最多存储的消息数
            put("x-max-length", 10);
            // 声明队列中的消息最多存储多长时间,单位为毫秒,当然也不一定非要在队列中设置,可以在生产者发布消息处定义消息的生存时间
            put("x-message-ttl", 10000);
        }};
        channel.queueDeclare(normal_queue, false, false, false, arguments);
        channel.queueDeclare(dead_queue, false, false, false, null);

        // 绑定正常的key和死信的key
        channel.queueBind(normal_queue, normal_exchange, normal_key);
        channel.queueBind(dead_queue, dead_exchange, dead_key);

        // 接收到消息成功后的回调
        DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
            System.out.println("消息被消费:" + new String(message.getBody()));
            // 当消费者明确拒绝消费消息,且不再重新回到原来队列时,也会被判为死信
            if (new String(message.getBody()).equals("info5")){
                channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
            }else {
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
            }
        };

        CancelCallback cancelCallback = consumeTag -> {
        };
        // 设置为不自动应答
        channel.basicConsume(normal_queue, false, deliverCallback, cancelCallback);
    }
}

 

生产者和正常的生产者没区别:

/**
 * 生产者
 */
public class Producer {
    public static final String exchange_name = "normal_exchange";
    public static final String routingKey = "zhangsan";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtil.getChannel();

        // 创建发布消息的属性对象
        AMQP.BasicProperties properties = new AMQP.BasicProperties();
        // 设置超时时间TTL为10秒
        properties.builder().expiration("10000").build();

        // 发送消息给交换机,其key为zhangsan
        for (int i = 1; i < 11; i++) {
            String message = "info" + i;
            channel.basicPublish(exchange_name, routingKey, properties, message.getBytes());
        }

    }
}

 

用于处理死信的消费者:

/**
 * 消费者2
 * 用于处理死信队列的消费者
 */
public class Consumer02 {
    public static final String dead_queue = "dead_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtil.getChannel();

        DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
            System.out.println("消费了死信消息:" + new String(message.getBody()));
        };
        channel.basicConsume(dead_queue,true,deliverCallback,consumerTag -> {});
    }
}

 

延迟队列

从上面的死信队列章节我们可以知道,当有一些特殊原因引起的消息没有被消费的情况,可以被转储到死信队列中。利用这样的特性,我们可以在某些业务需求上使用到这种特性,比如我们可以把这种特性应用在定时取消定单之类的业务中,如

  1. 订单在十分钟之内未支付则自动取消
  2. 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
  3. 用户注册成功后,如果三天内没有登陆则进行短信提醒。
  4. 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
  5. 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

要实现延迟队列的功能,我们可以把设置消息的TTL存活时间,当时间到了,消息会被转储到死信队列,由死信队列处理超时消息。

 

在 SpringBoot 中使用延迟队列

1.引入依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2.在yaml文件中配置RabbitMQ服务器信息

spring:
  rabbitmq:
    host: 127.0.0.1
    username: admin
    password: tzminglove
    port: 5672

 

3.创建配置项,分别创建普通交换机、死信交指机、普通队列、死信队列,以及他们的绑定关系。

package cn.unsoft.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMQConfig {

    // 普通交换机名称
    public static final String XexChange_Name = "X";

    // 死信交换机名称
    public static final String Y_Dead_exChange_Name="Y";

    public static final String queueAName = "QA";
    public static final String queueBName = "QB";
    public static final String queueDeadName = "QD";

    // 普通队列key一:消息10秒后到期
    public static final String queueAKey = "XA";

    // 普通队列key二:消息40秒后到期
    public static final String queueBKey = "XB";

    // 死信队列key:用于处理到期消息数据
    public static final String deadQueueKey = "YD";

    /**
     * 创建普通交换机 XexChange
     * 普通交换机需要接收来自生产者发过来消息
     * 普通交换机没有消费者,因为在普通交换机中的队列中存在的消息会有时间限制
     * 一旦消息时间到期,就会成为死信传到死信交换机中,由死信队列消费者进行消费
     */
    @Bean("XexChange")
    public DirectExchange XexChange(){
        return new DirectExchange(XexChange_Name);
    }

    /**
     * 创建死信交换机 YexChange
     * 死信交换机接收来自普通队列QA、QB中已过期的消息
     * @return
     */
    @Bean("YexChange")
    public DirectExchange YexChange(){
        return new DirectExchange(Y_Dead_exChange_Name);
    }

    // -------------
    /**
     * 声明队列QA,并把QB的消息存活时间设置为10秒,10秒后成为死信会传给死信交换机
     * 在QA队列中,我们需要设置队列中的消息过期时间,并把过期后的消息死信转储到死信交换机中
     * @return
     */
    @Bean("queueA")
    public Queue queueA(){
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-message-ttl",10000);
        arguments.put("x-dead-letter-exchange",Y_Dead_exChange_Name);
        arguments.put("x-dead-letter-routing-key",deadQueueKey);

        //QueueBuilder.durable(queueAKey).withArguments(arguments).build();

        // 可以使用 Map 来存储多个队列属性,也可以使用 .xxx 来指定
        return QueueBuilder
                .durable(queueAName)
                .ttl(10000)
                .deadLetterExchange(Y_Dead_exChange_Name)
                .deadLetterRoutingKey(deadQueueKey)
                .build();
    }

    /**
     * 绑定QA到普通交换机中
     * @return
     */
    @Bean
    public Binding queueABindX(@Qualifier("queueA") Queue qa,
                               @Qualifier("XexChange") DirectExchange XexChange){
        return BindingBuilder.bind(qa).to(XexChange).with(queueAKey);
    }


    //-------------------

    /**
     * 声明队列QB,并把QB的消息存活时间设置为40秒,40秒后成为死信会传给死信交换机
     * @return
     */
    @Bean("queueB")
    public Queue queueB(){
        return QueueBuilder
                .durable(queueBName)
                .ttl(40000)
                .deadLetterExchange(Y_Dead_exChange_Name)
                .deadLetterRoutingKey(deadQueueKey)
                .build();
    }

    /**
     * 把队列绑定到普通交换机XexChange中
     * @param qb
     * @param XexChange
     * @return
     */
    @Bean
    public Binding queueBBindX(
            @Qualifier("queueB") Queue qb,
            @Qualifier("XexChange") DirectExchange XexChange
    ){
        return BindingBuilder.bind(qb).to(XexChange).with(queueBKey);
    }


}

 

4.死信队列中需要有一个消费者处理死信消息,我们可以创建一个消费者,监听队列中的消息并进行消费死信消息

package cn.unsoft.rabbitmq.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
@Slf4j
public class DeadLetterQueueConsumer {

    /**
     * 接收队列YD的消息并处理。
     * @param message
     * @param channel
     */
    @RabbitListener(queues = "YD")
    public void receiveDeadMessage(Message message, Channel channel){
        log.info("时间:{},接收到死信消息:{}",new Date().toString(),new String(message.getBody()));
    }
}

 

5.创建生产者,本次使用的是来自controller中的请求接收消息,并把消息发到普通交换机中

/**
 * 测试RabbitMQ延迟队列
 */
@RestController
@RequestMapping("/ttl")
@Slf4j
public class RabbitMQController {

    @Resource
    private RabbitTemplate rabbitTemplate;
    
    @GetMapping("/sendMsg/{msg}")
    public void receiveMsg(@PathVariable("msg") String msg) {
        log.info("时间:{},接收到请求信息:{}", new Date().toString(), msg);
        // 发送消息到普通交换机的XA(也就是QA队列中的RoutingKey别名)中
        rabbitTemplate.convertAndSend("XexChange","XA",msg.getBytes());

        // 发送消息到普通交换机的XB(也就是QB队列中的RoutingKey别名)中
        rabbitTemplate.convertAndSend("XexChange","XB",msg.getBytes());

        /**
         * 当发到XA中后,因为QA中没有消费者,且设置消息有效时间为10秒,则在QA队列中的消息会在10秒后转储到死集交换机中
         * 同理,XB有效时间为40秒,所以死信消费者会在40秒后处理XB中的死信消息
         */
    }

}

 

自定义存活时间消息

从上个章节中我们可以看到,当需要使用某些过期处理的业务时,我们可以通过设置队列的TTL过期时间来实现消息过期处理。

但是这样做有一个很明显的问题,就是TTL过期处理的细粒度局限在队列中,也就是说,当我们设置了过期时间为10秒的队列,那么这个队列只能存放处理只有10秒过期的消息

而在实际项目中,我们的消息并不一定固定是10秒或20秒,可能会出现动态设置消息的TTL过期时间,这对于队列来说,是无法做到的,因此我们需要把TTL过期时间细粒度应用到消息本身。

由于除了队列可以设置TTL外,发送消息时我们也可以设置TTL,因此我们可以尝试通过在发消息时设置单个消息的TTL过期时间。

案例如下:

1.通过创建一个交换机XexChange,和队列QC,绑定RoutingKey为XC,当生产者对XC发送消息时,单独对不同的消息设置不同的TTL时间。

@Component
public class RabbitMQConfig {

    // 普通交换机
    public static final String XexChange_Name = "XexChange";
    // 死信交换机
    public static final String DeadExChange_Name = "YexChange";

    // 普通队列
    public static final String queueC_Name = "QC";
    public static final String queueC_Routing_Name = "XC";

    // 死信队列
    public static final String queueDead_Name = "QD";
    public static final String queueDead_Routing_Name = "YD";

    // 声明普通交换机
    @Bean("XexChange")
    public DirectExchange XexChange(){
        return new DirectExchange(XexChange_Name);
    }

    // 声明普通队列
    @Bean("QC")
    public Queue queueC(){
        // 声明普能队列中,没有设置TTL过期
        return QueueBuilder.durable(queueC_Name).build();
    }


    // 绑定QC到普通交换机
    @Bean
    public Binding QCBindiingXexChange(
            @Qualifier(XexChange_Name) DirectExchange XexChange,
            @Qualifier(queueC_Name) Queue qc
    ){
        return BindingBuilder.bind(qc).to(XexChange).with(queueC_Routing_Name);
    }

    // 声明死信交换机
    @Bean("YexChange")
    public DirectExchange YexChange(){
        return new DirectExchange(DeadExChange_Name);
    }

    // 声明死信队列
    @Bean("QD")
    public Queue queueD(){
        return QueueBuilder.durable(queueDead_Name).build();
    }


    // 绑定QD到死信交换机
    @Bean
    public Binding QDBindiingYexChange(
            @Qualifier(DeadExChange_Name) DirectExchange YexChange,
            @Qualifier(queueDead_Name) Queue qd
    ){
        return BindingBuilder.bind(qd).to(YexChange).with(queueDead_Routing_Name);
    }

}

 

2.在生产者中发消息前,设置不同消息不同的过期时间。

@RestController
@RequestMapping("/sendMsg")
public class RabbitMQConftoller {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendTTLMsg/{msg}/{ttl}")
    public void sendMsg(
            @PathVariable String msg,
            @PathVariable String ttl
    ) {
        rabbitTemplate.convertAndSend("XexChange", "XC", msg, 
                
                // MessagePostProcessor 是一个接口,提供一个 Message 对象,要求返回一个 Message 对象
                // 也就是说,提供一个 message 让你往消息里面添加该消息的属性,如TTL过期时间等
                message -> {
            message.getMessageProperties().setExpiration("10000");
            return message;
        });
    }

}

4.发送不同时间的消息即可

http://xxxx/sendMsg/hello/100000
http://xxxx/sendMsg/hello/200000

 

无法实现消息TTL问题发现

上一节章我们使用消息中设置过期TTL,但当你测试后会发现,队列中并不能真正的实现不同的消息设置不同的TTL过期时间。

这是因为,队列是单线处理消息的,当上一条消息TTL没有过期时,即使后面的消息的TTL已经过期了,只要上一条还没被过期,后面的消息都要等待。直到上一条消息TTL过期后,队列把上一条消息处理完后,再发现后面的消息也过期了,就会一并处理

这种情况直接影响到消息的自定义TTL过期需求无法实现!

 

使用插件实现消息自定义TTL过期

我们可以使用插件 rabbitmq_delayed_message_exchange 来实现TTL消息延迟

解压放置到 RabbitMQ 的插件目录。进入 RabbitMQ 的安装目录下的 plgins 目录,执行下面命令让该插件生效,然后重启 RabbitMQ

/usr/lib/rabbitmq/lib/rabbitmq_server-xxx/plugins

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

 

实现基本概念

rabbitmq_delayed_message_exchange 的实现基本概念是,它实际上自己创立了一种新的交换机模式,

当我们的消息被设置了TTL后,消息发到交换机后,是由交换机帮我们计算消息的过期时间,而非在队列中计算过期时间。当交换机中发现消息已经过期后,就会把消息发给队列,而队列即转发给消费者,没有了死信队列的概念了,如下图

 

 

实现代码案例:

1.因为我们需要创建插件的交换机,我们需要创建一个【自定义交换机】,并把队列绑定到自定义交换机中

/**
 * 本例为使用RabbitMQ延迟消息插件的使用过程
 */
@Configuration
public class RabbitDelayConfig {
    // 延迟交换机的名称
    public static final String DelayExChange_Name = "DelayExChange";

    // 延迟队列的名称
    public static final String DelayQueue_Name = "DelayQueue";

    // 绑定RountingKey
    public static final String DelayQueue_RoutingKey = "DelayQueueRoutingKey";

    /**
     * 创建自定义交换机
     * @return
     */
    @Bean(DelayExChange_Name)
    public CustomExchange DelayExChange() {
        /**
         * 参数说明:String name, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments
         * 1.交换机名称
         * 2.交换机类型(就是自定义交换机的类型名称,如插件提供的交换机为“x-delay-message”)
         * 3.是否自动删除
         * 4.是否持久化
         * 5.交换机的属性配置
         */
        Map<String, Object> arguments = new HashMap<String, Object>() {{
            // 虽然设置了插件交换机,但是还是要配置该交换机的派发类型,为直接类型
            put("x-delayed-type", "direct");
        }};
        return new CustomExchange(DelayExChange_Name, "x-delay-message", false, false, arguments);
    }

    /**
     * 创建队列
     * @return
     */
    @Bean(DelayQueue_Name)
    public Queue delayQueue(){
        return QueueBuilder.durable(DelayQueue_Name).build();
    }

    /**
     * 绑定队列与交换机
     */
    @Bean
    public Binding DelayQueueBingingDelayExChange(
            @Qualifier(DelayExChange_Name) CustomExchange delayExChange,
            @Qualifier(DelayQueue_Name) Queue delayQueue
    ){
        return BindingBuilder.bind(delayQueue).to(delayExChange).with(DelayQueue_RoutingKey).noargs();
    }
}

 

2.在生产者中,加入 setDelay() 属性并发送给插件交换机即可

    @GetMapping("/sendTTLMsg/{msg}/{delay}")
    public void sendDelayMsg(
            @PathVariable String msg,
            @PathVariable Integer delay
    ){
        rabbitTemplate.convertAndSend(RabbitDelayConfig.DelayExChange_Name, RabbitDelayConfig.DelayQueue_RoutingKey, msg,

                // MessagePostProcessor 是一个接口,提供一个 Message 对象,要求返回一个 Message 对象
                // 也就是说,提供一个 message 让你往消息里面添加该消息的属性,如TTL过期时间等
                message -> {
                    message.getMessageProperties().setDelay(delay);
                    return message;
                });
    }

 

 

SpringBoot 中使用发布确认功能

在前面章节中我们尝试过在普通情况下对交换机接收消息的确认功能,该功能可以查找章节【发布确认】

本章节中是把发布确认功能在SpringBoot中实现。

交换机接收确认

当生产者发送消息给交换机后,我们可以设置交换机是否接收到消息,然后给我们一个回调函数告诉我们它接收到消息了。

1.在配置项中开启回调功能:

spring:
  rabbitmq:
    host: 127.0.0.1
    username: admin
    password: tzminglove
    port: 5672
    publisher-confirm-type: correlated

其中 publisher-confirm-type 有三种选择,分别如下:

none : 交换机收到消息后不进行确认

correlated : 交指机收到消息后进行确认(收到和不收到都会发送回调确认)

simple : 经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,交换机收到消息后进行确认,但是如果收不到消息,就会关闭通道(类似 waitForConfirmsOrDie)

其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker

 

2.创建一个类,用于实现Rabbit交换机接收到数据后的确认回调函数

/**
 * 用于实现交换机与消息确认功能的回调类
 */
@Component
@Slf4j
public class RabbitMQConfirmConfig implements RabbitTemplate.ConfirmCallback {
    
    
    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 注意:增加了确认回调实现后,Rabbit 并不能知道
     * 需要在创建后,对确认回调的实现类进行设置,否则Rabbit不会调用这个回调实现
     */
    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
    }


    /**
     * 实现消息发送到交换机时的确认回调
     *
     * @param correlationData 来自生产者在发送消息时主动打的标记对象(需要自定义传递)
     * @param ack             该消息是否被交换机所接收,接收了返回 true 否则返回 false
     * @param cause           接收出错的原因。如果接收成功,为null,如果接收失败,会包含失败原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            log.info("交换机接收到消息,其id为:{}",correlationData.getId());
        } else {
            log.info("交换机接收不到消息,原因是:{}", cause);
        }
    }
}

要注意的是,回调实现方法的类,一定要设置给Rabbit中,否则Rabbit是不知道回调实现方法在那里的,从而不会执行我们的实现回调方法

 

方法二:使用 ApplicationContextAware 拿到 IoC 容器,并获取 RabbitTemplate 对象,在 RabbitTemplate 对象中添加 ConfirmCallback 回调

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取RabbitTemplate
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 设置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}", 
                    replyCode, replyText, exchange, routingKey, message.toString());
        });
    }
}

 

 

3.在生产者发送消息的过程中,我们需要自定义CorrelationData对象,当交换机接收并回调时,我们就可以通过CorrelationData对象来获取到交换机接收成功、失败的是那个消息了,从而实现重发功能

@RestController
@RequestMapping("/sendConfirmMsg")
@Slf4j
public class RabbitMQConfirmController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/{msg}")
    public void sendConfirmMsg(@PathVariable String msg) {
        log.info("发送消息到交换机:{}", msg);

        /**
         * 创建一个CorrelationData对象,用于记录当前发送的消息数据,用于回调时获取追踪
         */
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId("1");

        rabbitTemplate.convertAndSend(RabbitDelayConfig.DelayExChange_Name, RabbitDelayConfig.DelayQueue_RoutingKey, msg, postprocessor -> {
            postprocessor.getMessageProperties().setDelay(10000);
            return postprocessor;
        }, correlationData);
    }

 

 

队列接收确认

除了交换机是否能回调确认函数外,队列也可以做到同样的功能,当队列接收到消息后,也可以让队列给我们发送确认回调函数。

1.在配置项中开启队列确认功能

spring:
  rabbitmq:
    host: 127.0.0.1
    username: admin
    password: tzminglove
    port: 5672
    publisher-confirm-type: correlated
    publisher-returns: true   # 开启队列消息确认机制
    template:
        mandatory: true       # 定义消息路由失败时的策略,设为true则调用ReturnCallback,false则直接丢弃

 

2.创建一个类,用于实现Rabbit队列接收到数据后的确认回调函数

@Component
@Slf4j
public class RabbitMQReturnConfig implements RabbitTemplate.ReturnsCallback {
    
    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 要先对实现类进行设置到Rabbit中
     * 否则Rabbit回调时不知道执行那个回调方法
     */
    @PostConstruct
    public void init(){
        rabbitTemplate.setReturnsCallback(this);
    }


    /**
     * 实现队列确认回调函数
     * @param returned the returned message and metadata.
     *                 ReturnedMessage 对象中包含了消息的各功信息
     *                 Message message -> 消息对象
     *                 int replyCode -> 队列接收错误时的错误码
     *                 String replyText -> 队列接收错误时的错误原因
     *                 String exchange -> 交换机的名称
     *                 String routingKey -> 队列绑定别名
     */
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        log.info("队列接收到消息确认回调,原因:{}",returned.getReplyText());
    }
}

要注意的是,回调实现方法的类,一定要设置给Rabbit中,否则Rabbit是不知道回调实现方法在那里的,从而不会执行我们的实现回调方法

 

消费者消费确认

针对生产者发消息给RabbitMQ,可以接收到来自Rabbit的确认信息,但同样,消费者也可以接收消息后,自定义确认信息。

消费者确认信息有三种模式,可以在yaml文件中进行配置:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: auto # none,关闭ack;manual,手动ack;auto:自动ack
  • manual:手动ack,需要在业务代码结束后,调用api发送ack。
  • auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
  • none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

 

消息持久化

MQ默认是内存存储消息,开启持久化功能可以确保缓存在MQ中的消息不丢失。

交换机持久化

@Bean
public DirectExchange simpleExchange(){
    // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除 
    return new DirectExchange("simple.direct", true, false);
}

注意:通常默认创建交换机,Rabbit都是默认持久化的。

 

队列持久化

@Bean
public Queue simpleQueue(){
    // 使用QueueBuilder构建队列,durable就是持久化的
    return QueueBuilder.durable("simple.queue").build();
}

注意:使用 new Queue("") 创建的队列,默认也是持久化的

但是队列的持久化,不是存放在队列中的消息也持久化,默认消息是不持久化的,所以重启Rabbit后,队列还存在,但消息会丢失。

 

消息持久化

使用构建 Message 时设置消息持久化

Message msg = MessageBuilder
        .withBody(message.getBytes(StandardCharsets.UTF_8)) // 消息体
        .setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 持久化 
        .build();

 

 

SpringBoot 中使用注解接收处理队列任务

我们可以在SpringBoot中创建任务的消费者,对我们存在队列中的任务进行消费,通常我们可以使用@Bean注解队列、交换机、绑定等到IoC中,同时我们也可以使用注解 @RabbitTemplate  进行定义队列、交换机、绑定功能。

通过 @RabbitTemplate 一次实现定义队列、交换机、绑定三个操作:

    /**
     * 使用 @RabbitListener 直接定义绑定队列和交换机操作
     * 使用 bindings 属性,bindings 属性要求提供一个 QueueBinding 数组,可以使用 @QueueBinding 定义
     * 其中 @QueueBinding 中包含了value、exchange、key属性值
     * value 指定队列,可以使用 @Queue 注解定义
     * exchange 指定交换机,可以使用 @Exchange 注解定义
     * key 指定 routingKey
     * 直接代替了 xxxExchange,Queue,Binding 的Bean方法定义
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @org.springframework.amqp.rabbit.annotation.Queue("queueName"),
            exchange = @org.springframework.amqp.rabbit.annotation.Exchange(value = "unsoft.exchange",type = ExchangeTypes.DIRECT),
            key = "routingKey"
    ))
    public void handlerAnno(Message message, Channel channel) {
        System.out.println(message.getBody().toString());
    }

注解下的方法就是消费者处理方法。

 

消费者消费失败的重试机制

在一般情况下,我们的消息,都会被消费者正常消费,但当消费者因某种原因,发生消费失败,返回 nack 确认时,应该要如何应对消费失败的情况?

在默认情况下,当消费者发送 nack 确认时,RabbitMQ 会认为消息未被正确的消费,此时RabbitMQ会对消息重新入列,并重新发送消息给消费者,若消费者依然无法正确消费该消息时,又会返回 nack 确认,这样就会出现无休止的循环入列发送,严重影响性能。

为了能处理死循环入列消费问题,我们可以在Spring中配置重试机制:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000 # 初始的失败等待时长为1秒
          multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          max-interval: 10  # 最大的重试等待时长
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

配置失败重试机制之后,Spring会使用AOP对我们的业务进行监控,当我们的业务发生异常抛出时,会自定为我们发出 nack 确认,而无需我们自己主动发送 nack 确认。

否则,我们需要在业务中,手动设置 ack 的确认。

 

对重试超过次数的消息处理

如果我们的消息,在重试多次后,超出了我们设置的重试机制后,这个消息该如何处理?

RabbitMQ 默认情况下,对超出重试次数的消息,会实行直接丢弃的处理

RabbitMQ 提供了以下三种后期消息处理机制:

RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

在Spring中,RabbitMQ 默认初始化了 RejectAndDontRequeueRecoverer 类作为后期处理。

为了防止重要的消息发生丢失,我们对重要的消息使用 RepublishMessageRecoverer 处理类,即当消息超过重试次数后,应当把它存放到指定的交换机中,做其它处理(如人工处理)。

具体操作:

1.创建用于存放失败消息的队列和交换机以久绑定:

@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
    return new Queue("error.queue", true); 
}
@Bean
public Binding errorBinding(){
    return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
}

 

2.覆盖原有RabbitMQ设置的失败消息处理机制:

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); 
}

 

 

RabbitMQ 中的消息转换器

RabbitMQ 除了可以直接传递字符串类数据,其实RabbitMQ也支持把一整个Java对象存到队列中。

但是RabbitMQ在把Java对象存到队列之前,会把Java对象进行序列化,并把序列化的数据以BASE64方式转换为字符串,并存储到队列中,所使用的序列化工具是JDK默认的序列化类。

使用序列化并Base64转换字符串有两个缺点:

1.序列化Java对象和反序列化会使用大量的计算资源,影响性能。

2.Base64序列化的对象后会占用相对较大的队列存储空间

因此我们可以使用jackson对java对象进行json化,并存到队列中,这样就可以节省很多序列化时间,同时也减少非常多的存储空间。

具体操作如下:

1.引入依赖,引入jackson的依赖,用于创建转换对象,替代RabbitMQ默认的序列化转换对象

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>

2.在配置项中,配置消息转换对象,使用Jackson2JsonMessageConverter实现覆盖原有的转换对象,当RabbitMQ转换对象时会使用jackson进行转化

@Bean
public MessageConverter jsonMessageConverter(){
    return new Jackson2JsonMessageConverter(); 
}

3.当发布者发布消息所使用的什么对象类型,在消费者中也应接收相同的对象类型,如发布者使用了 Map<String,Object> 类型发布数据,消费者在接收数据时,也应把接收类型设置为 Map<String,Object> 类型。

 

Spring 代码中创建仲裁队列

@Bean
public Queue quorumQueue(){
    return QueueBuilder.durable("queueName")
           .quorum()
           .build();
}

 

 

 

备份交换机

有了队列确认回调,我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递时发现并处理。但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。而且设置 队列确认 参数会增加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。如果既不想丢失消息,又不想增加生产者的复杂性,该怎么做呢?前面在设置死信队列的文章中,我们提到,可以为队列设置死信交换机来存储那些处理失败的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息。在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。什么是备份交换机呢?备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时, 就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。

 

解析:当普通交换机接收到消息但无法对消息进行投递到指定的队列时,通常可以执行消息确认来回调知道哪些消息没有被队列接收

但我们也可以创建一个备用交换机,当普通交换机收到消息但无法投递时,就可以把消息转发给备份交换机进行处理,报警等。

1.创建一个备份交换机

    /**
     * 创建一个备份交换机,当普通交换机接收到的消息不能送达指定的队列时会把消息转发到备份交换机中
     * @return
     */
    @Bean("backupExChange")
    public FanoutExchange BackupExChange(){
        return ExchangeBuilder.fanoutExchange("backupExChange").build();
    }

   ...其它设置队列和绑定队列等略

2.在普通交换机中设置指定备份交换机

    // 声明普通交换机,并设定备份交换机,两种方法都可以
    @Bean("XexChange")
    public DirectExchange XexChange(){
        // return ExchangeBuilder.directExchange(XexChange_Name).withArgument("alternate-exchange","backupExChange").build();
        return ExchangeBuilder.directExchange(XexChange_Name).alternate("backupExChange").build();
    }

注意:当使用备份交换机后,队列消息确认回调将不生效,在这两者中,备份交换机的优先级更高。

 

队列优先级

使用场景

在我们系统中有一个订单催付的场景,我们的客户在天猫下的订单,淘宝会及时将订单推送给我们,如果在用户设定的时间内未付款那么就会给用户推送一条短信提醒,很简单的一个功能对吧,但是,tmall 商家对我们来说,肯定是要分大客户和小客户的对吧,比如像苹果,小米这样大商家一年起码能给我们创造很大的利润,所以理应当然,他们的订单必须得到优先处理,而曾经我们的后端系统是使用 redis 来存
放的定时轮询,大家都知道 redis 只能用 List 做一个简简单单的消息队列,并不能实现一个优先级的场景,

所以订单量大了后采用 RabbitMQ 进行改造和优化,如果发现是大客户的订单给一个相对比较高的优先级, 否则就是默认优先级。

 

设置具有优先级的队列

在后台中我们可以创建一个具有优先级的队列

 

在代码中我们也可以声明一个具有优先级的队列

    /**
     * 创建具有优先级功能的队列
     * @return
     */
    @Bean("priority")
    public Queue pirority(){
        // return QueueBuilder.durable("priority").withArgument("x-max-priority",10).build();
        return QueueBuilder.durable("priority").maxPriority(10).build();
    }

注意:队列需要设置优先级,且需要设置最大优先级值,RabbitMQ默认可以设置优先级为0~256,通常我们只设置最大为10的优先级就够了

 

在消息发送时,我们就可以设定该消息的优先级,消息的优先级必须在队列最大优先级的范围内。

@RestController
@RequestMapping("/sendConfirmMsg")
@Slf4j
public class RabbitMQConfirmController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/{msg}")
    public void sendConfirmMsg(@PathVariable String msg) {
        log.info("发送消息到交换机:{}", msg);

        /**
         * 创建一个CorrelationData对象,用于记录当前发送的消息数据,用于回调时获取追踪
         */
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId("1");

        rabbitTemplate.convertAndSend(RabbitDelayConfig.DelayExChange_Name, RabbitDelayConfig.DelayQueue_RoutingKey, msg, postprocessor -> {
            postprocessor.getMessageProperties().setDelay(10000);
            postprocessor.getMessageProperties().setPriority(5);
            return postprocessor;
        }, correlationData);
    }
}

 

惰性队列

消息堆积问题

 

惰性队列是指该队列应该存放在内存中,还是存放在磁盘中。

RabbitMQ 从 3.6.0 版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。

默认情况下,当生产者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存之中, 这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。当 RabbitMQ 需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息。虽然 RabbitMQ 的开发者们一直在升级相关的算法, 但是效果始终不太理想,尤其是在消息量特别大的时候。

队列具备两种模式:default 和 lazy。默认的为 default 模式,在 3.6.0 之前的版本无需做任何变更。lazy 模式即为惰性队列的模式,可以通过调用 channel.queueDeclare 方法的时候在参数中设置,也可以通过Policy 的方式设置,如果一个队列同时使用这两种方式设置的话,那么 Policy 的方式具备更高的优先级。如果要通过声明的方式改变已有队列的模式的话,那么只能先删除队列,然后再重新声明一个新的。

在队列声明的时候可以通过“x-queue-mode”参数来设置队列的模式,取值为“default”和“lazy”。下面示例中演示了一个惰性队列的声明细节:

Map<String, Object> args = new HashMap<String, Object>(); 
args.put("x-queue-mode", "lazy"); 
channel.queueDeclare("myqueue", false, false, false, args);

 

Bean 方式声明惰性队列

@bean
public Queue lazyQueue(){
    return QueueBuilder
               .durable("lazy.queue")
               .lazy()
               .build();
}

 

注解声明方式惰性队列

@RabbitTemplate(
  queuesToDeclare = @Queue(
       name = "lazy.queue",
       durable = "true",
       arguments = @Argument( name = "x-queue-mode", value = "lazy" )
   )
)
public void liatenLazyQueue(Message msg){
    
}

 

 

RabbitMQ 集群

当Rabbit集群加入后,这些节点将成为一个整体,在任意一个节点上做任意操作,都会影响全部节点

详情:https://www.rabbitmq.com/ha.html

 

RabbitMQ 搭建集群

1.修改 3 台机器的主机名称

vim /etc/hostname

2.配置各个节点的 hosts 文件,让各个节点都能互相识别对方

 

vim /etc/hostnamevim /etc/hosts
10.211.55.74	node1
10.211.55.75	node2
10.211.55.76	node3

 

3.Erlang是的底层是分布式的,RabbitMQ是使用cookie作为通讯,只有相同的cookie才能互相通讯,所以以确保各个节点的 cookie 文件使用的是同一个值

在 node1 上执行远程操作命令
scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/.erlang.cookie 
scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/.erlang.cookie

 

3.1 配置RabbitMQ的服务器配置:

loopback_users.guest = false  # 关闭 guest 账号
listeners.tcp.default = 5672
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
cluster_formation.classic_config.nodes.1 = rabbit@mq1 # 定义集群的服务地址,如果用了docker net 可以直接使用实例名称
cluster_formation.classic_config.nodes.2 = rabbit@mq2
cluster_formation.classic_config.nodes.3 = rabbit@mq3

 

 

 

4. 启动 RabbitMQ 服务,顺带启动 Erlang 虚拟机和 RbbitMQ 应用服务(在三台节点上分别执行以下命令)

rabbitmq-server -detached

5. 在节点 2 执行

rabbitmqctl stop_app
(rabbitmqctl stop 会将Erlang 虚拟机关闭,rabbitmqctl stop_app 只关闭 RabbitMQ 服务) 
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1 
rabbitmqctl start_app(只启动应用服务)

6.在节点 3 执行

rabbitmqctl stop_app
(rabbitmqctl stop 会将Erlang 虚拟机关闭,rabbitmqctl stop_app 只关闭 RabbitMQ 服务) 
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1 
rabbitmqctl start_app(只启动应用服务)

7.查看当前集群状态

rabbitmqctl cluster_status 

8.需要重新设置用户

创建账号
rabbitmqctl add_user admin 123
设置用户角色
rabbitmqctl set_user_tags admin administrator
设置用户权限
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"

9.解除集群节点

(node2 和 node3 机器分别执行)
rabbitmqctl stop_app
rabbitmqctl reset 
rabbitmqctl start_app 
rabbitmqctl cluster_status

rabbitmqctl forget_cluster_node rabbit@node2 (node1 机器上执行)

 

使用Docker运行RabbitMQ

1.创建Docker网络

docker network create mq-net

 

 

2.运行RabbitMQ的Docker命令

docker run -d --net mq-net \
-v ${PWD}/mq1/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \    # docker 设置配置文件数据卷
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \   # docker 设置cookie数据卷
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq1 \     # docker 实例名
--hostname mq1 \ # 服务器名称,与实例名同名的话,RabbitMQ可以使用 rabbit@mq1 来访问
-p 8071:5672 \
-p 8081:15672 \
rabbitmq:3.8-management

 

Spring Boot 中连接集群的RabbitMQ

SpringBoot 中的配置和单点连接有一些不同,使用 addresses 作为连接,把所有的服务地址都写上即可。

spring:
  rabbitmq:
    addresses: 192.168.150.105:8071, 192.168.150.105:8072, 192.168.150.105:8073
    username: itcast 
    password: 123321
    virtual-host: /

 

 

 

普通集群队列

普通集群,或者叫标准集群(classic cluster),具备下列特征:

  • 会在集群的各个节点间共享部分数据,包括:交换机、队列元信息。不包含队列中的消息。
  • 当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回
  • 队列所在节点宕机,队列中的消息就会丢失

普通集群队列在创建完集群后就能正常使用,无需特定配置。

 

 

镜像队列

在集群中的RabbitMQ,虽然多个节点成为一体,但它们的消息和队列依然是分开的。假如程序向node1节点发送消息,发送完后node1发生宕机等情况,如不做特殊配置,重启后的node1中的队外消息将会丢失!

而且其它node节点也没有备份,这对于消息的存储可靠性是有风险的。

所以我们需要对集群中的节点做镜像队列策略,具体操作如下:

1.在任意节点登陆RabbitMQ后台,在用户管理中选择“Policies”策略设置

2.增加一个复制副本的策略

说明:

Pattern: 要匹配策略的关键词,会对交换机和队列都会匹配,如 ^mirror 是指所有 mirror 开头的交换机或队列都会被匹配上。

Definition: 定义策略

ha-mode : 定义副本的方式,分别可选 [all ,exactly ,nodes ].

all: 所有交换机和节点都会复制一份副本

exactly : 精确到某队列或交换机进行副本,这个选项还需要使用 ha-param 来定义需要多少份

node: 精确到整个节点进行副本,这个选项还需要使用 ha-param 来定义需要多少份

ha-param : 设置需要复制多少个副本。

ha-sync-mode: 设置复制副本手动manual 或自动automatic

此后带有 mirror 开头的交换机或队列,都会在其它任意节点中创建多一份,如果刚好有一个节点出现宕机,那么RabbitMQ会动态的保持两个副本,在其它节点上再创建副本。

 

镜像模式的配置

镜像模式的配置有3种模式:

ha-mode ha-params 效果
准确模式exactly 队列的副本量count 集群中队列副本(主服务器和镜像服务器之和)的数量。count如果为1意味着单个副本:即队列主节点。count值为2表示2个副本:1个队列主和1个队列镜像。换句话说:count = 镜像数量 + 1。如果群集中的节点数少于count,则该队列将镜像到所有节点。如果有集群总数大于count+1,并且包含镜像的节点出现故障,则将在另一个节点上创建一个新的镜像。
all (none) 队列在群集中的所有节点之间进行镜像。队列将镜像到任何新加入的节点。镜像到所有节点将对所有群集节点施加额外的压力,包括网络I / O,磁盘I / O和磁盘空间使用情况。推荐使用exactly,设置副本数为(N / 2 +1)。
nodes node names 指定队列创建到哪些节点,如果指定的节点全部不存在,则会出现异常。如果指定的节点在集群中存在,但是暂时不可用,会创建节点到当前客户端连接到的节点。

 

exactly模式

rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
解析:
1.设置准备模式,名称为 ha-two
2.把所有队列为 two 开头的都生效
3.设置为exactly模式,节点为 2
4.设置自动同步,由RabbitMQ管理集群同步问题
  • rabbitmqctl set_policy:固定写法
  • ha-two:策略名称,自定义
  • "^two\.":匹配队列的正则表达式,符合命名规则的队列才生效,这里是任何以two.开头的队列名称
  • '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}': 策略内容
    • "ha-mode":"exactly":策略模式,此处是exactly模式,指定副本数量
    • "ha-params":2:策略参数,这里是2,就是副本数量为2,1主1镜像
    • "ha-sync-mode":"automatic":同步策略,默认是manual,即新加入的镜像节点不会同步旧的消息。如果设置为automatic,则新加入的镜像节点会把主节点中所有消息都同步,会带来额外的网络开销

 

all模式

rabbitmqctl set_policy ha-all "^all\." '{"ha-mode":"all"}'
  • ha-all:策略名称,自定义
  • "^all\.":匹配所有以all.开头的队列名
  • '{"ha-mode":"all"}':策略内容
    • "ha-mode":"all":策略模式,此处是all模式,即所有节点都会称为镜像节点

 

nodes模式

rabbitmqctl set_policy ha-nodes "^nodes\." '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'
  • rabbitmqctl set_policy:固定写法
  • ha-nodes:策略名称,自定义
  • "^nodes\.":匹配队列的正则表达式,符合命名规则的队列才生效,这里是任何以nodes.开头的队列名称
  • '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}': 策略内容
    • "ha-mode":"nodes":策略模式,此处是nodes模式
    • "ha-params":["rabbit@mq1", "rabbit@mq2"]:策略参数,这里指定副本所在节点名称

 

 

仲裁队列

从RabbitMQ 3.8版本开始,引入了新的仲裁队列,他具备与镜像队里类似的功能,但使用更加方便。

因为镜像队列使用的是【最终一致】模式,而不是强一致模式,所以镜像队列有可能会出现数据不一致的情况,而仲裁队列则是使用Raft协议实现强一致性的队列

步骤

在任意控制台添加一个队列,一定要选择队列类型为Quorum类型。

在任意控制台查看队列:

可以看到,仲裁队列的 + 2字样。代表这个队列有2个镜像节点。

因为仲裁队列默认的镜像数为5。如果你的集群有7个节点,那么镜像数肯定是5;而我们集群只有3个节点,因此镜像数量就是3.

集群扩容

加入集群

启动一个新的MQ容器:

docker run -d --net mq-net \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq4 \
--hostname mq5 \
-p 8074:15672 \
-p 8084:15672 \
rabbitmq:3.8-management

 

进入容器控制台:

docker exec -it mq4 bash

 

停止mq进程

rabbitmqctl stop_app

 

重置RabbitMQ中的数据:

rabbitmqctl reset

 

加入mq1:

rabbitmqctl join_cluster rabbit@mq1

 

再次启动mq进程

rabbitmqctl start_app

 

增加仲裁队列副本

我们先查看下quorum.queue这个队列目前的副本情况,进入mq1容器

docker exec -it mq1 bash

 

rabbitmq-queues quorum_status "quorum.queue"

现在,我们让mq4也加入进来:

rabbitmq-queues add_member "quorum.queue" "rabbit@mq4"

 

再次查看:

rabbitmq-queues quorum_status "quorum.queue"

 

联合交换机 Federation Exchange

(broker 北京),(broker 深圳)彼此之间相距甚远,网络延迟是一个不得不面对的问题。有一个在北京的业务(Client 北京) 需要连接(broker 北京),向其中的交换器 exchangeA 发送消息,此时的网络延迟很小, (Client 北京)可以迅速将消息发送至 exchangeA 中,就算在开启了 publisherconfirm 机制或者事务机制的情况下,也可以迅速收到确认信息。此时又有个在深圳的业务(Client 深圳)需要向 exchangeA 发送消息, 那么(Client 深圳) (broker 北京)之间有很大的网络延迟,(Client 深圳) 将发送消息至 exchangeA 会经历一定的延迟,尤其是在开启了 publisherconfirm 机制或者事务机制的情况下,(Client 深圳) 会等待很长的延迟时间来接收(broker 北京)的确认信息,进而必然造成这条发送线程的性能降低,甚至造成一定程度上的阻塞。

将业务(Client 深圳)部署到北京的机房可以解决这个问题,但是如果(Client 深圳)调用的另些服务都部署在深圳,那么又会引发新的时延问题,总不见得将所有业务全部部署在一个机房,那么容灾又何以实现? 这里使用 Federation 插件就可以很好地解决这个问题.

搭建步骤:

需要保证每台节点单独运行
在每台机器上开启 federation 相关插件

rabbitmq-plugins enable rabbitmq_federation
rabbitmq-plugins enable rabbitmq_federation_management

 

然后在Admin选项中就能找到以下两个菜单

原理图(先运行 consumer 在 node2 创建 fed_exchange)

 

再创建一个策略

 

 

联合队列 Federation Queue

联邦队列可以在多个 Broker 节点(或者集群)之间为单个队列提供均衡负载的功能。一个联邦队列可以连接一个或者多个上游队列(upstream queue),并从这些上游队列中获取消息以满足本地消费者消费消息的需求。

搭建步骤

1.原理图

2.创建上游节点的队列

 

Shovel

Federation 具备的数据转发功能类似,Shovel 够可靠、持续地从一个 Broker 中的队列(作为源端,即source)拉取数据并转发至另一个 Broker 中的交换器(作为目的端,即 destination)。作为源端的队列和作为目的端的交换器可以同时位于同一个 Broker,也可以位于不同的 Broker 上。Shovel 可以翻译为"铲子", 是一种比较形象的比喻,这个"铲子"可以将消息从一方"铲子"另一方。Shovel 行为就像优秀的客户端应用程序能够负责连接源和目的地、负责消息的读写及负责连接失败问题的处理。

搭建步骤

1.开启插件

rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management

开启后可在Admin选项框中看到以下菜单

2.原理

3.添加 shovel 源和目的地

 

如果您喜欢本站,点击这儿不花一分钱捐赠本站

这些信息可能会帮助到你: 下载帮助 | 报毒说明 | 进站必看

修改版本安卓软件,加群提示为修改者自留,非本站信息,注意鉴别

THE END
分享
二维码
打赏
海报
RabbitMQ – 基础使用说明
简介 RabbitMQ 是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 Rab……
<<上一篇
下一篇>>