Skip to content

common-rabbitmq-starter

1. 功能介绍

1.自定义消息转换器

  • 发送消息时根据数据类型动态设置 Content-Typeapplication/jsontext/plain)。
  • 接收消息时根据 Content-Type 自动反序列化为 Java 对象或字符串。

2.增强的可扩展性

  • 自定义消息转换器覆盖了默认的 RabbitMQ 消息转换器,实现了更加灵活的消息处理逻辑。
  • 自动处理不支持的消息类型,抛出清晰的异常提示。

3.与 Spring Boot 高度集成

  • RabbitAutoConfiguration 之前加载配置,确保自定义逻辑优先生效。
  • 将自定义消息转换器应用到 RabbitTemplate,方便开发者直接使用。

2. 配置示例

yaml
spring:
  rabbitmq:
    host: ${RABBITMQ_HOST} # RabbitMQ 服务地址
    username: ${RABBITMQ_USERNAME} # 登录用户名
    password: ${RABBITMQ_PASSWORD} # 登录密码
    virtual-host: / # 虚拟主机路径
    port: ${RABBITMQ_PORT} # RabbitMQ 服务端口

3.案例演示

1.创建模块

1.父模块

CleanShot 2025-01-20 at 14.49.43@2x

2.生产者 publisher

3.消费者 consumer

CleanShot 2025-01-20 at 14.53.41@2x

2.目录结构

CleanShot 2025-01-20 at 15.12.16@2x

3.父pom.xml

1.统一管理子模块
xml
    <!-- 统一管理子模块 -->
    <packaging>pom</packaging>
    <modules>
        <module>publisher</module>
        <module>consumer</module>
    </modules>
2.基本配置
xml
<!-- 通过properties来指定版本号 -->
<properties>
    <!-- 指定编译版本 -->
    <java.version>1.8</java.version>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <!-- 指定Sunrays-Framework的版本 -->
    <sunrays.version>1.0.0</sunrays.version>
</properties>

<dependencyManagement>
    <!-- 使用sunrays-dependencies来管理依赖,则依赖无需加版本号 -->
    <dependencies>
        <dependency>
            <groupId>cn.sunxiansheng</groupId>
            <artifactId>sunrays-dependencies</artifactId>
            <version>${sunrays.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
3.引入依赖
xml
<dependencies>
    <!-- common-rabbitmq-starter -->
    <dependency>
        <groupId>cn.sunxiansheng</groupId>
        <artifactId>common-rabbitmq-starter</artifactId>
    </dependency>
    <!-- common-log4j2-starter 是必须引入的!!! -->
    <dependency>
        <groupId>cn.sunxiansheng</groupId>
        <artifactId>common-log4j2-starter</artifactId>
    </dependency>

    <!-- 用来测试的Web模块 -->
    <dependency>
        <groupId>cn.sunxiansheng</groupId>
        <artifactId>common-web-starter</artifactId>
    </dependency>
    <!-- env模块确保数据安全,可以不引入 -->
    <dependency>
        <groupId>cn.sunxiansheng</groupId>
        <artifactId>common-env-starter</artifactId>
    </dependency>
</dependencies>

4.publisher

1.application.yml 配置日志根目录、.env文件的绝对路径以及RabbitMQ
yaml
sun-rays:
  log4j2:
    home: /Users/sunxiansheng/IdeaProjects/sunrays-framework-demo/common-rabbitmq-starter-demo/publisher/logs # 日志根目录(默认./logs)
  env:
    path: /Users/sunxiansheng/IdeaProjects/sunrays-framework-demo/common-rabbitmq-starter-demo/publisher # .env文件的绝对路径
spring:
  # RabbitMQ 配置
  rabbitmq:
    # 服务器地址 ip
    host: ${RABBITMQ_HOST}
    # 用户名
    username: ${RABBITMQ_USERNAME}
    # 密码
    password: ${RABBITMQ_PASSWORD}
    # 虚拟主机
    virtual-host: /
    # 端口
    port: ${RABBITMQ_PORT}
server:
  port: 8081
2..env 填写RabbitMQ的配置
properties
RABBITMQ_HOST=host
RABBITMQ_USERNAME=用户名
RABBITMQ_PASSWORD=密码
RABBITMQ_PORT=端口
3.TestConfig.java 创建fanout类型的交换机和队列
java
package cn.sunxiansheng.publisher.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Description: 测试配置类
 *
 * @Author sun
 * @Create 2024/12/31 19:00
 * @Version 1.0
 */
@Configuration
public class TestConfig {

    /**
     * 创建一个fanout类型的交换机
     *
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout.exchange.testExchange");
    }

    /**
     * 创建一个队列
     *
     * @return
     */
    @Bean
    public Queue fanoutQueue() {
        return QueueBuilder.durable("testQueue") // 持久化队列
                .lazy()               // 惰性队列
                .build();
    }

    /**
     * 交换机和队列绑定
     */
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());
    }
}
4.TestConfigPublisher.java 发布对象类型的消息
java
package cn.sunxiansheng.publisher.pub;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * Description: 测试发布者
 *
 * @Author sun
 * @Create 2024/12/31 19:05
 * @Version 1.0
 */
@RestController
@Slf4j
public class TestConfigPublisher {

    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * bean对象
     */
    @Data
    static class Person {
        private String name;
        private int age;
    }

    /**
     * 发布对象类型的消息
     */
    @RequestMapping("/send")
    public void send() {
        log.info("发送消息");
        Person person = new Person();
        person.setName("sun");
        person.setAge(18);
        rabbitTemplate.convertAndSend("fanout.exchange.testExchange", "", person);
    }
}
5.PublisherApplication.java 启动类
java
package cn.sunxiansheng.publisher;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * Description: PublisherApplication
 *
 * @Author sun
 * @Create 2025/1/20 14:55
 * @Version 1.0
 */
@SpringBootApplication
public class PublisherApplication {

    public static void main(String[] args) {
        SpringApplication.run(PublisherApplication.class, args);
    }
}

5.consumer

1.application.yml 配置日志根目录、.env文件的绝对路径以及RabbitMQ
yaml
sun-rays:
  log4j2:
    home: /Users/sunxiansheng/IdeaProjects/sunrays-framework-demo/common-rabbitmq-starter-demo/consumer/logs # 日志根目录(默认./logs)
  env:
    path: /Users/sunxiansheng/IdeaProjects/sunrays-framework-demo/common-rabbitmq-starter-demo/consumer # .env文件的绝对路径
spring:
  # RabbitMQ 配置
  rabbitmq:
    # 主机
    host: ${RABBITMQ_HOST}
    # 用户名
    username: ${RABBITMQ_USERNAME}
    # 密码
    password: ${RABBITMQ_PASSWORD}
    # 虚拟主机
    virtual-host: /
    # 端口
    port: ${RABBITMQ_PORT}
server:
  port: 8082
2..env 填写RabbitMQ的配置
properties
RABBITMQ_HOST=host
RABBITMQ_USERNAME=用户名
RABBITMQ_PASSWORD=密码
RABBITMQ_PORT=端口
3.TestConfigConsumer.java 监听队列中的消息
java
package cn.sunxiansheng.consumer.con;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * Description: 测试消费者
 *
 * @Author sun
 * @Create 2024/12/31 19:03
 * @Version 1.0
 */
@Component
@Slf4j
public class TestConfigConsumer {

    /**
     * bean对象
     */
    @Data
    static class Person {
        private String name;
        private int age;
    }

    @RabbitListener(queues = "testQueue")
    public void receive(Person person) {
        String name = person.getName();
        int age = person.getAge();
        log.info("name:{}, age:{}", name, age);
    }
}
4.ConsumerApplication.java 启动类
java
package cn.sunxiansheng.consumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * Description: ConsumerApplication
 *
 * @Author sun
 * @Create 2025/1/20 14:55
 * @Version 1.0
 */
@SpringBootApplication
public class ConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
}

6.测试

1.发送消息

CleanShot 2025-01-20 at 15.32.20@2x

CleanShot 2025-01-20 at 15.32.12@2x

2.接受消息

CleanShot 2025-01-20 at 15.32.38@2x