Spring 项目集成 RabbitMQ

pom依赖

<!-- rabbitMQ -->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

控制是否启动RabbitMQ(可选)

下面通过配置文件开启或关闭RabbitMQ

在Spring Cloud项目中,RabbitMQ可能作为一个工具模块被引用,而有些服务不需要使用,或开发环境中,在一些情况下无法连接RabbitMQ服务器,但是引入后 RabbitMQ 会进行无线尝试重连服务器,可能会导致一些错误。

在启动类中中修改@SpringBootApplication注解

//@SpringBootApplication
@SpringBootApplication(exclude = {RabbitAutoConfiguration.class})
public class  DemoApplication
{
    public static void main(String[] args)
    {
        SpringApplication.run(DemoApplication.class, args);
    }
}

自定义RabbitMQ启动配置类

import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Configuration;

/**
 * @desccription 自定义RabbitMQ的启动配置类,可以通过配置变量来控制启用、禁用
 * @auth
 * @date
 */
@Configuration
@ConditionalOnProperty("spring.rabbitmq.enable")
public class MyRabbitAutoConfiguration extends RabbitAutoConfiguration {
}

在项目的yml配置文件中增加 spring.rabbitmq.enable 属性

spring:
  rabbitmq:
    #配置rabbitMq启用开关
    enable: false

yml文件配置 (基础配置)

# 单机基本配置
spring:
  rabbitmq:
    #配置rabbitMq启用开关
    enable: true
    host: 192.168.0.99
    port: 5672
    username: admin
    password: 123456
    virtual-host: /

# 集群基本配置
spring:
  rabbitmq:
    # 配置rabbitMq启用开关
    enable: true
	# 这里我配置了服务运行服务器的host,所以写的不是IP
	# 集群配置:addressed: ip:port,ip:port,ip:port
    addresses: rabbitmq01:5672,rabbitmq02:5673,rabbitmq03:5674
    username: admin
    password: 123456
    virtual-host: /

yml文件详细配置(根据实际需求调整)

spring:
  rabbitmq:
	# 指定客户端连接到的RabbitMQ服务器地址列表,这里都是本地地址的不同端口
    addresses: rabbitmq01:5672,rabbitmq02:5673,rabbitmq03:5674
    password: admin
    username: 123456
	# 连接到RabbitMQ的虚拟主机
    virtual-host: /
	# 指定心跳超时时间,单位为秒,这里设置为60秒,若为0则表示不指定心跳超时
    requested-heartbeat: 60
	# 启用发布确认机制,即生产者发送消息后会等待RabbitMQ服务器的确认响应
    publisher-confirms: true
	# 启用发布返回功能,当消息无法正确路由时,RabbitMQ将消息返回给生产者
    publisher-returns: true 

    # 连接池相关配置
	# 设置连接超时时间,单位毫秒,这里设置为30秒(30000毫秒),0表示不超时
    connection-timeout: 30000
    cache:
	  # 设置缓存中维护的Channel数量
      channel.size: 25
	  # 设置从缓存中获取Channel的超时时间,单位毫秒,0表示总是创建新的Channel
      channel.checkout-timeout: 1000
	  # 设置缓存的连接数,仅在CONNECTION模式下生效
      connection.size: 10
	  # 设置连接工厂缓存模式,可选CHANNEL或CONNECTION
      connection.mode: CHANNEL

    listener:
      simple:
		# 控制容器是否在启动时自动启动
        auto-startup: true
		# 设置消费者确认消息的方式,可选none、manual和auto,默认为auto
        acknowledge-mode: auto
 		# 设置最小消费者数量
        concurrency: 5
		# 设置最大消费者数量
        max-concurrency: 10
		# 指定每个消费者每次请求能够处理的消息数量
        prefetch: 10
		# 设置在一个事务中处理的消息数量,建议小于等于prefetch值
        transaction-size: 5
		# 若设置为false,当消息被拒绝时,不会重新入队
        default-requeue-rejected: false
		# 设置多久发布一次空闲容器事件,单位毫秒
        idle-event-interval: 10000
        retry:
		  # 是否启用监听器重试机制
          enabled: true
		  # 最大重试次数
          max-attempts: 3
		  # 第一次和第二次尝试发布或传递消息之间的间隔,单位毫秒
          initial-interval: 1000
		  # 应用于上一次重试间隔的乘数,每次失败后重试间隔按此比例增加
          multiplier: 2
		  # 最大重试间隔,防止因重试间隔无限增长导致的长时间等待
          max-interval: 60000

Spring 项目中使用 RabbitMQ Demo

RabbitConfig(配置类)

位于项目config文件夹下,用于初始化创建交换器于队列并确定绑定关系。

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    // fanout交换机
    public static final String FANOUT_EXCHANGE_NAME = "fanout_exchange_name";

    // direct交换机
    public static final String DIRECT_EXCHANGE_NAME = "direct_exchange_name";

    // topic交换机
    public static final String TOPIC_EXCHANGE_NAME = "topic_exchange_name";

    // 设置direct路由键
    public static final String DIRECT_ROUTING_KEY = "abc";

    // 设置topic路由键
    public static final String TOPIC_ROUTING_KEY = "*.*.c";

    // queue1队列
    public static final String QUEUE1_NAME = "queue1";

    // queue2队列
    public static final String QUEUE2_NAME = "queue2";

    /**
     * 创建fanout交换机
      */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange(FANOUT_EXCHANGE_NAME, true, false);
    }

    /**
     * 创建direct交换机
      */
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(DIRECT_EXCHANGE_NAME, true, false);
    }

    /**
     * 创建topic交换机
     */
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(TOPIC_EXCHANGE_NAME, true, false);
    }

    /**
     * 创建队列 queue1
      */
    @Bean
    public Queue queue1(){
        return new Queue(QUEUE1_NAME, true);
    }

    /**
     * 创建队列 queue2
     */
    @Bean
    public Queue queue2(){
        return new Queue(QUEUE2_NAME, true);
    }

    /**
     * 绑定queue1到fanout交换机
      */
    @Bean
    public Binding queue1BindingFanoutExchange(@Qualifier("queue1") Queue queue1,
                                        @Qualifier("fanoutExchange") FanoutExchange fanoutExchange){
        return BindingBuilder.bind(queue1).to(fanoutExchange);
    }

    /**
     * 绑定queue2到fanout交换机
      */
    @Bean
    public Binding queue2BindingFanoutExchange(@Qualifier("queue2") Queue queue2,
                                              @Qualifier("fanoutExchange") FanoutExchange fanoutExchange){
        return BindingBuilder.bind(queue2).to(fanoutExchange);
    }

    /**
     * 绑定queue1到direct交换机,with是设置了路由键
     */
    @Bean
    public Binding queue1BindingDirectExchange(@Qualifier("queue1") Queue queue1,
                                               @Qualifier("directExchange") DirectExchange directExchange){
        return BindingBuilder.bind(queue1).to(directExchange).with(DIRECT_ROUTING_KEY);
    }

    /**
     * 绑定queue2到topic交换机,with是设置了路由键
     */
    @Bean
    public Binding queue1BindingTopicExchange(@Qualifier("queue1") Queue queue1,
                                               @Qualifier("topicExchange") TopicExchange topicExchange){
        return BindingBuilder.bind(queue1).to(topicExchange).with(TOPIC_ROUTING_KEY);
    }

}

RabbitLister(消费者)

位于项目listener文件夹下,用于消费队列中的消息。

import com.ruoyi.test.config.RabbitConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.util.Map;


@Component
public class RabbitLister {

    /**
     * 接收queue1队列中的消息
     * 通过 @Payload String msg 这个参数指定了消息体的类型,此方法不会接收到不是String消息体的消息
     * 这里的String也可以换成其他的对象
     *
     * @param msg 消息体,通过注解@Payload指定,表示接收到的消息内容。
     * @param header 消息头信息
     * @param message RabbitMQ的消息对象,包含消息体和消息头等信息。
     */
    @RabbitListener(queues = RabbitConfig.QUEUE1_NAME)
    public void receiveQueue1Msg(@Payload String msg, @Headers Map<String, Object> header, Message message){
        System.out.println("接收到queue1队列中的消息:" + msg);
    }

    /**
     * 接收queue2队列中的消息
     */
    @RabbitListener(queues = {RabbitConfig.QUEUE2_NAME})
    public void receiveQueue2Msg(@Payload String msg) {
        System.out.println("接收到queue2队列中的消息:" + msg);
    }

}

发送消息方法(生产者)

一般位于业务方法中,下面只是个示例

@Resource
private RabbitTemplate rabbitTemplate;

void setMessage() {
    // 发送消息到fanout交换器上,这里因为是fanout类型交换机,所以routingKey可以不指定
    rabbitTemplate.convertAndSend(RabbitConfig.FANOUT_EXCHANGE_NAME, null, "测试fanout交换器");
    // 发送消息到direct交换器上,direct类型交换机,消息的routingKey与路由键必须一致,所以直接引用了常量
    rabbitTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE_NAME, RabbitConfig.DIRECT_ROUTING_KEY, "测试direct交换器");
    // 发送消息到topic交换器上,topic类型交换机,配置的路由键为*.*.c,此消息routingKey为a.b.c,可以匹配上
    rabbitTemplate.convertAndSend(RabbitConfig.TOPIC_EXCHANGE_NAME, "a.b.c", "测试topic交换器");
}

现延迟队列在 Spring 项目中的实现

RabbitMQ实现延迟队列主要有两种方式:

方式1:TTL + DLX (过期消息+死性队列) 配置繁琐

方式2:使用 rabbitmq_delayed_message_exchange 延迟插件(因为方式1步骤繁琐,所以写了一个这样的插件来实现)

实现原理简述

生产者发送消息给Delay Exchange 这个交换机,它会将消息持久化到 Mnesia 数据库中,然后它会检查这个消息的时间(x-delay),等时间到了才会投递给队列。

此时消费者才可以从队列中消费消息。

安装插件

延迟插件在RabbitMQ 3.5.7 及以上的版本才支持。

在官方插件列表https://www.rabbitmq.com/docs/community-plugins 中可以找到rabbitmq_delayed_message_exchange的 github 地址,选择与自己服务中 RabbitMQ 相近的版本。

比如RabbitMQ版本为3.11.1,选择下载rabbitmq_delayed_message_exchange-3.11.1.ez
将插件包拷贝到 RabbitMQ 服务器的 plugins 目录下,使用以下命令进行安装

如果是通过Docker安装的RabbitMQ,那么容器的/下会生成一个plugins -> /opt/rabbitmq/plugins

将插件拷贝到 /plugins 即可

# 解压缩
unzip rabbitmq_delayed_message_exchange-3.11.1.ez
# 开启插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 查询安装的所有插件
rabbitmq-plugins list
# 重启rabbitmq使其生效

安装完后在 RabbitMQ 管理界面的 Exchanges 中新增时类型可以选择 x-delayed-message

DelayRabbitConfig(配置类)

创建交换机,队列,并进行绑定。

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class DelayRabbitConfig {

    public static final String DELAY_EXECHANGE = "delay.exchange";

    public static final String DELAY_QUEUE = "delay.queue";

    public static final String DELAY_ROUTINGKEY = "delay.routingkey";

    @Bean
    public CustomExchange delayExchange() {
        Map<String, Object> args = new HashMap<>();
        // 设置延迟类型 这里时固定的
        args.put("x-delayed-type", "direct");
        // 声明自定义的交换机
        // 参数1 交换机名称
        // 参数2 交换机的类型 这里是固定的类型,安装此插件后才会有此类型的交换机
        // 参数3 是否需要持久化,这里涉及到实现原理,延迟的消息会被持久化到一个数据库中,等到超时才会被交换机投递到队列中。
        // 参数4 是否自动删除
        // 参数5 其他参数
        return new CustomExchange(DELAY_EXECHANGE, "x-delayed-message", true, false, args);
    }

    @Bean
    public Queue delayQueue() {
        return new Queue(DELAY_QUEUE);
    }

    @Bean
    public Binding delayBinding(@Qualifier("delayExchange") CustomExchange delayExchange, @Qualifier("delayQueue") Queue delayQueue) {
        return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTINGKEY).noargs();
    }
}

DelayListener(消费者)

import com.ruoyi.test.config.DelayRabbitConfig;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;

@Component
public class DelayListener {

    /**
     *  接收延迟消息
     * @param msg
     */
    @RabbitListener(queues = {DelayRabbitConfig.DELAY_QUEUE})
    public void receiveMsg(@Payload String msg){
        System.out.println(msg + "\n" + "接收时间为:" + LocalDateTime.now());
    }

}

发送消息方法(生产者)

public void sendDelayMsg(){
    rabbitTemplate.convertAndSend(DelayRabbitConfig.DELAY_EXECHANGE,
            DelayRabbitConfig.DELAY_ROUTINGKEY,
            "发送时间为:" + LocalDateTime.now(),
            message -> {
                message.getMessageProperties().setDelay(5000); //延迟5秒
                return message;
            }
    );
}