Spring 项目集成 RabbitMQ
Spring 项目集成 RabbitMQ
pom依赖
<!-- rabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
控制是否启动RabbitMQ(可选)
下面通过配置文件开启或关闭RabbitMQ
在Spring Cloud项目中,RabbitMQ可能作为一个工具模块被引用,而有些服务不需要使用,或开发环境中,在一些情况下无法连接RabbitMQ服务器,但是引入后 RabbitMQ 会进行无线尝试重连服务器,可能会导致一些错误。
在启动类中中修改@SpringBootApplication注解
//@SpringBootApplication
@SpringBootApplication(exclude = {RabbitAutoConfiguration.class})
public class DemoApplication
{
public static void main(String[] args)
{
SpringApplication.run(DemoApplication.class, args);
}
}
自定义RabbitMQ启动配置类
import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Configuration;
/**
* @desccription 自定义RabbitMQ的启动配置类,可以通过配置变量来控制启用、禁用
* @auth
* @date
*/
@Configuration
@ConditionalOnProperty("spring.rabbitmq.enable")
public class MyRabbitAutoConfiguration extends RabbitAutoConfiguration {
}
在项目的yml配置文件中增加 spring.rabbitmq.enable 属性
spring:
rabbitmq:
#配置rabbitMq启用开关
enable: false
yml文件配置 (基础配置)
# 单机基本配置
spring:
rabbitmq:
#配置rabbitMq启用开关
enable: true
host: 192.168.0.99
port: 5672
username: admin
password: 123456
virtual-host: /
# 集群基本配置
spring:
rabbitmq:
# 配置rabbitMq启用开关
enable: true
# 这里我配置了服务运行服务器的host,所以写的不是IP
# 集群配置:addressed: ip:port,ip:port,ip:port
addresses: rabbitmq01:5672,rabbitmq02:5673,rabbitmq03:5674
username: admin
password: 123456
virtual-host: /
yml文件详细配置(根据实际需求调整)
spring:
rabbitmq:
# 指定客户端连接到的RabbitMQ服务器地址列表,这里都是本地地址的不同端口
addresses: rabbitmq01:5672,rabbitmq02:5673,rabbitmq03:5674
password: admin
username: 123456
# 连接到RabbitMQ的虚拟主机
virtual-host: /
# 指定心跳超时时间,单位为秒,这里设置为60秒,若为0则表示不指定心跳超时
requested-heartbeat: 60
# 启用发布确认机制,即生产者发送消息后会等待RabbitMQ服务器的确认响应
publisher-confirms: true
# 启用发布返回功能,当消息无法正确路由时,RabbitMQ将消息返回给生产者
publisher-returns: true
# 连接池相关配置
# 设置连接超时时间,单位毫秒,这里设置为30秒(30000毫秒),0表示不超时
connection-timeout: 30000
cache:
# 设置缓存中维护的Channel数量
channel.size: 25
# 设置从缓存中获取Channel的超时时间,单位毫秒,0表示总是创建新的Channel
channel.checkout-timeout: 1000
# 设置缓存的连接数,仅在CONNECTION模式下生效
connection.size: 10
# 设置连接工厂缓存模式,可选CHANNEL或CONNECTION
connection.mode: CHANNEL
listener:
simple:
# 控制容器是否在启动时自动启动
auto-startup: true
# 设置消费者确认消息的方式,可选none、manual和auto,默认为auto
acknowledge-mode: auto
# 设置最小消费者数量
concurrency: 5
# 设置最大消费者数量
max-concurrency: 10
# 指定每个消费者每次请求能够处理的消息数量
prefetch: 10
# 设置在一个事务中处理的消息数量,建议小于等于prefetch值
transaction-size: 5
# 若设置为false,当消息被拒绝时,不会重新入队
default-requeue-rejected: false
# 设置多久发布一次空闲容器事件,单位毫秒
idle-event-interval: 10000
retry:
# 是否启用监听器重试机制
enabled: true
# 最大重试次数
max-attempts: 3
# 第一次和第二次尝试发布或传递消息之间的间隔,单位毫秒
initial-interval: 1000
# 应用于上一次重试间隔的乘数,每次失败后重试间隔按此比例增加
multiplier: 2
# 最大重试间隔,防止因重试间隔无限增长导致的长时间等待
max-interval: 60000
Spring 项目中使用 RabbitMQ Demo
RabbitConfig(配置类)
位于项目config文件夹下,用于初始化创建交换器于队列并确定绑定关系。
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
// fanout交换机
public static final String FANOUT_EXCHANGE_NAME = "fanout_exchange_name";
// direct交换机
public static final String DIRECT_EXCHANGE_NAME = "direct_exchange_name";
// topic交换机
public static final String TOPIC_EXCHANGE_NAME = "topic_exchange_name";
// 设置direct路由键
public static final String DIRECT_ROUTING_KEY = "abc";
// 设置topic路由键
public static final String TOPIC_ROUTING_KEY = "*.*.c";
// queue1队列
public static final String QUEUE1_NAME = "queue1";
// queue2队列
public static final String QUEUE2_NAME = "queue2";
/**
* 创建fanout交换机
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange(FANOUT_EXCHANGE_NAME, true, false);
}
/**
* 创建direct交换机
*/
@Bean
public DirectExchange directExchange(){
return new DirectExchange(DIRECT_EXCHANGE_NAME, true, false);
}
/**
* 创建topic交换机
*/
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(TOPIC_EXCHANGE_NAME, true, false);
}
/**
* 创建队列 queue1
*/
@Bean
public Queue queue1(){
return new Queue(QUEUE1_NAME, true);
}
/**
* 创建队列 queue2
*/
@Bean
public Queue queue2(){
return new Queue(QUEUE2_NAME, true);
}
/**
* 绑定queue1到fanout交换机
*/
@Bean
public Binding queue1BindingFanoutExchange(@Qualifier("queue1") Queue queue1,
@Qualifier("fanoutExchange") FanoutExchange fanoutExchange){
return BindingBuilder.bind(queue1).to(fanoutExchange);
}
/**
* 绑定queue2到fanout交换机
*/
@Bean
public Binding queue2BindingFanoutExchange(@Qualifier("queue2") Queue queue2,
@Qualifier("fanoutExchange") FanoutExchange fanoutExchange){
return BindingBuilder.bind(queue2).to(fanoutExchange);
}
/**
* 绑定queue1到direct交换机,with是设置了路由键
*/
@Bean
public Binding queue1BindingDirectExchange(@Qualifier("queue1") Queue queue1,
@Qualifier("directExchange") DirectExchange directExchange){
return BindingBuilder.bind(queue1).to(directExchange).with(DIRECT_ROUTING_KEY);
}
/**
* 绑定queue2到topic交换机,with是设置了路由键
*/
@Bean
public Binding queue1BindingTopicExchange(@Qualifier("queue1") Queue queue1,
@Qualifier("topicExchange") TopicExchange topicExchange){
return BindingBuilder.bind(queue1).to(topicExchange).with(TOPIC_ROUTING_KEY);
}
}
RabbitLister(消费者)
位于项目listener文件夹下,用于消费队列中的消息。
import com.ruoyi.test.config.RabbitConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
public class RabbitLister {
/**
* 接收queue1队列中的消息
* 通过 @Payload String msg 这个参数指定了消息体的类型,此方法不会接收到不是String消息体的消息
* 这里的String也可以换成其他的对象
*
* @param msg 消息体,通过注解@Payload指定,表示接收到的消息内容。
* @param header 消息头信息
* @param message RabbitMQ的消息对象,包含消息体和消息头等信息。
*/
@RabbitListener(queues = RabbitConfig.QUEUE1_NAME)
public void receiveQueue1Msg(@Payload String msg, @Headers Map<String, Object> header, Message message){
System.out.println("接收到queue1队列中的消息:" + msg);
}
/**
* 接收queue2队列中的消息
*/
@RabbitListener(queues = {RabbitConfig.QUEUE2_NAME})
public void receiveQueue2Msg(@Payload String msg) {
System.out.println("接收到queue2队列中的消息:" + msg);
}
}
发送消息方法(生产者)
一般位于业务方法中,下面只是个示例
@Resource
private RabbitTemplate rabbitTemplate;
void setMessage() {
// 发送消息到fanout交换器上,这里因为是fanout类型交换机,所以routingKey可以不指定
rabbitTemplate.convertAndSend(RabbitConfig.FANOUT_EXCHANGE_NAME, null, "测试fanout交换器");
// 发送消息到direct交换器上,direct类型交换机,消息的routingKey与路由键必须一致,所以直接引用了常量
rabbitTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE_NAME, RabbitConfig.DIRECT_ROUTING_KEY, "测试direct交换器");
// 发送消息到topic交换器上,topic类型交换机,配置的路由键为*.*.c,此消息routingKey为a.b.c,可以匹配上
rabbitTemplate.convertAndSend(RabbitConfig.TOPIC_EXCHANGE_NAME, "a.b.c", "测试topic交换器");
}
现延迟队列在 Spring 项目中的实现
RabbitMQ实现延迟队列主要有两种方式:
方式1:TTL + DLX (过期消息+死性队列) 配置繁琐
方式2:使用 rabbitmq_delayed_message_exchange 延迟插件(因为方式1步骤繁琐,所以写了一个这样的插件来实现)
实现原理简述
生产者发送消息给Delay Exchange 这个交换机,它会将消息持久化到 Mnesia 数据库中,然后它会检查这个消息的时间(x-delay),等时间到了才会投递给队列。
此时消费者才可以从队列中消费消息。
安装插件
延迟插件在RabbitMQ 3.5.7 及以上的版本才支持。
在官方插件列表https://www.rabbitmq.com/docs/community-plugins 中可以找到rabbitmq_delayed_message_exchange的 github 地址,选择与自己服务中 RabbitMQ 相近的版本。
比如RabbitMQ版本为3.11.1,选择下载rabbitmq_delayed_message_exchange-3.11.1.ez
将插件包拷贝到 RabbitMQ 服务器的 plugins 目录下,使用以下命令进行安装
如果是通过Docker安装的RabbitMQ,那么容器的/下会生成一个plugins -> /opt/rabbitmq/plugins
将插件拷贝到 /plugins 即可
# 解压缩
unzip rabbitmq_delayed_message_exchange-3.11.1.ez
# 开启插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 查询安装的所有插件
rabbitmq-plugins list
# 重启rabbitmq使其生效
安装完后在 RabbitMQ 管理界面的 Exchanges 中新增时类型可以选择 x-delayed-message
DelayRabbitConfig(配置类)
创建交换机,队列,并进行绑定。
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DelayRabbitConfig {
public static final String DELAY_EXECHANGE = "delay.exchange";
public static final String DELAY_QUEUE = "delay.queue";
public static final String DELAY_ROUTINGKEY = "delay.routingkey";
@Bean
public CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<>();
// 设置延迟类型 这里时固定的
args.put("x-delayed-type", "direct");
// 声明自定义的交换机
// 参数1 交换机名称
// 参数2 交换机的类型 这里是固定的类型,安装此插件后才会有此类型的交换机
// 参数3 是否需要持久化,这里涉及到实现原理,延迟的消息会被持久化到一个数据库中,等到超时才会被交换机投递到队列中。
// 参数4 是否自动删除
// 参数5 其他参数
return new CustomExchange(DELAY_EXECHANGE, "x-delayed-message", true, false, args);
}
@Bean
public Queue delayQueue() {
return new Queue(DELAY_QUEUE);
}
@Bean
public Binding delayBinding(@Qualifier("delayExchange") CustomExchange delayExchange, @Qualifier("delayQueue") Queue delayQueue) {
return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTINGKEY).noargs();
}
}
DelayListener(消费者)
import com.ruoyi.test.config.DelayRabbitConfig;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
@Component
public class DelayListener {
/**
* 接收延迟消息
* @param msg
*/
@RabbitListener(queues = {DelayRabbitConfig.DELAY_QUEUE})
public void receiveMsg(@Payload String msg){
System.out.println(msg + "\n" + "接收时间为:" + LocalDateTime.now());
}
}
发送消息方法(生产者)
public void sendDelayMsg(){
rabbitTemplate.convertAndSend(DelayRabbitConfig.DELAY_EXECHANGE,
DelayRabbitConfig.DELAY_ROUTINGKEY,
"发送时间为:" + LocalDateTime.now(),
message -> {
message.getMessageProperties().setDelay(5000); //延迟5秒
return message;
}
);
}