common-rabbitmq-starter
1. 功能介绍
1.自定义消息转换器
- 发送消息时根据数据类型动态设置
Content-Type
(application/json
或text/plain
)。 - 接收消息时根据
Content-Type
自动反序列化为 Java 对象或字符串。
2.增强的可扩展性
- 自定义消息转换器覆盖了默认的 RabbitMQ 消息转换器,实现了更加灵活的消息处理逻辑。
- 自动处理不支持的消息类型,抛出清晰的异常提示。
3.与 Spring Boot 高度集成
- 在
RabbitAutoConfiguration
之前加载配置,确保自定义逻辑优先生效。 - 将自定义消息转换器应用到
RabbitTemplate
,方便开发者直接使用。
2. 配置示例
yaml
spring:
rabbitmq:
host: ${RABBITMQ_HOST} # RabbitMQ 服务地址
username: ${RABBITMQ_USERNAME} # 登录用户名
password: ${RABBITMQ_PASSWORD} # 登录密码
virtual-host: / # 虚拟主机路径
port: ${RABBITMQ_PORT} # RabbitMQ 服务端口
3.案例演示
1.创建模块
1.父模块
2.生产者 publisher
3.消费者 consumer
2.目录结构
3.父pom.xml
1.统一管理子模块
xml
<!-- 统一管理子模块 -->
<packaging>pom</packaging>
<modules>
<module>publisher</module>
<module>consumer</module>
</modules>
2.基本配置
xml
<!-- 通过properties来指定版本号 -->
<properties>
<!-- 指定编译版本 -->
<java.version>1.8</java.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<!-- 指定Sunrays-Framework的版本 -->
<sunrays.version>1.0.0</sunrays.version>
</properties>
<dependencyManagement>
<!-- 使用sunrays-dependencies来管理依赖,则依赖无需加版本号 -->
<dependencies>
<dependency>
<groupId>cn.sunxiansheng</groupId>
<artifactId>sunrays-dependencies</artifactId>
<version>${sunrays.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
3.引入依赖
xml
<dependencies>
<!-- common-rabbitmq-starter -->
<dependency>
<groupId>cn.sunxiansheng</groupId>
<artifactId>common-rabbitmq-starter</artifactId>
</dependency>
<!-- common-log4j2-starter 是必须引入的!!! -->
<dependency>
<groupId>cn.sunxiansheng</groupId>
<artifactId>common-log4j2-starter</artifactId>
</dependency>
<!-- 用来测试的Web模块 -->
<dependency>
<groupId>cn.sunxiansheng</groupId>
<artifactId>common-web-starter</artifactId>
</dependency>
<!-- env模块确保数据安全,可以不引入 -->
<dependency>
<groupId>cn.sunxiansheng</groupId>
<artifactId>common-env-starter</artifactId>
</dependency>
</dependencies>
4.publisher
1.application.yml 配置日志根目录、.env文件的绝对路径以及RabbitMQ
yaml
sun-rays:
log4j2:
home: /Users/sunxiansheng/IdeaProjects/sunrays-framework-demo/common-rabbitmq-starter-demo/publisher/logs # 日志根目录(默认./logs)
env:
path: /Users/sunxiansheng/IdeaProjects/sunrays-framework-demo/common-rabbitmq-starter-demo/publisher # .env文件的绝对路径
spring:
# RabbitMQ 配置
rabbitmq:
# 服务器地址 ip
host: ${RABBITMQ_HOST}
# 用户名
username: ${RABBITMQ_USERNAME}
# 密码
password: ${RABBITMQ_PASSWORD}
# 虚拟主机
virtual-host: /
# 端口
port: ${RABBITMQ_PORT}
server:
port: 8081
2..env 填写RabbitMQ的配置
properties
RABBITMQ_HOST=host
RABBITMQ_USERNAME=用户名
RABBITMQ_PASSWORD=密码
RABBITMQ_PORT=端口
3.TestConfig.java 创建fanout类型的交换机和队列
java
package cn.sunxiansheng.publisher.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Description: 测试配置类
*
* @Author sun
* @Create 2024/12/31 19:00
* @Version 1.0
*/
@Configuration
public class TestConfig {
/**
* 创建一个fanout类型的交换机
*
* @return
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout.exchange.testExchange");
}
/**
* 创建一个队列
*
* @return
*/
@Bean
public Queue fanoutQueue() {
return QueueBuilder.durable("testQueue") // 持久化队列
.lazy() // 惰性队列
.build();
}
/**
* 交换机和队列绑定
*/
@Bean
public Binding binding() {
return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());
}
}
4.TestConfigPublisher.java 发布对象类型的消息
java
package cn.sunxiansheng.publisher.pub;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* Description: 测试发布者
*
* @Author sun
* @Create 2024/12/31 19:05
* @Version 1.0
*/
@RestController
@Slf4j
public class TestConfigPublisher {
@Resource
private RabbitTemplate rabbitTemplate;
/**
* bean对象
*/
@Data
static class Person {
private String name;
private int age;
}
/**
* 发布对象类型的消息
*/
@RequestMapping("/send")
public void send() {
log.info("发送消息");
Person person = new Person();
person.setName("sun");
person.setAge(18);
rabbitTemplate.convertAndSend("fanout.exchange.testExchange", "", person);
}
}
5.PublisherApplication.java 启动类
java
package cn.sunxiansheng.publisher;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Description: PublisherApplication
*
* @Author sun
* @Create 2025/1/20 14:55
* @Version 1.0
*/
@SpringBootApplication
public class PublisherApplication {
public static void main(String[] args) {
SpringApplication.run(PublisherApplication.class, args);
}
}
5.consumer
1.application.yml 配置日志根目录、.env文件的绝对路径以及RabbitMQ
yaml
sun-rays:
log4j2:
home: /Users/sunxiansheng/IdeaProjects/sunrays-framework-demo/common-rabbitmq-starter-demo/consumer/logs # 日志根目录(默认./logs)
env:
path: /Users/sunxiansheng/IdeaProjects/sunrays-framework-demo/common-rabbitmq-starter-demo/consumer # .env文件的绝对路径
spring:
# RabbitMQ 配置
rabbitmq:
# 主机
host: ${RABBITMQ_HOST}
# 用户名
username: ${RABBITMQ_USERNAME}
# 密码
password: ${RABBITMQ_PASSWORD}
# 虚拟主机
virtual-host: /
# 端口
port: ${RABBITMQ_PORT}
server:
port: 8082
2..env 填写RabbitMQ的配置
properties
RABBITMQ_HOST=host
RABBITMQ_USERNAME=用户名
RABBITMQ_PASSWORD=密码
RABBITMQ_PORT=端口
3.TestConfigConsumer.java 监听队列中的消息
java
package cn.sunxiansheng.consumer.con;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* Description: 测试消费者
*
* @Author sun
* @Create 2024/12/31 19:03
* @Version 1.0
*/
@Component
@Slf4j
public class TestConfigConsumer {
/**
* bean对象
*/
@Data
static class Person {
private String name;
private int age;
}
@RabbitListener(queues = "testQueue")
public void receive(Person person) {
String name = person.getName();
int age = person.getAge();
log.info("name:{}, age:{}", name, age);
}
}
4.ConsumerApplication.java 启动类
java
package cn.sunxiansheng.consumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Description: ConsumerApplication
*
* @Author sun
* @Create 2025/1/20 14:55
* @Version 1.0
*/
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
}