引入RabbitMQ

前置条件

docker 安装 mq

docker run \
 -e RABBITMQ_DEFAULT_USER=dudu \
 -e RABBITMQ_DEFAULT_PASS=123456 \
 -v mq-plugins:/plugins \
 --name mq \
 --hostname mq \
 -p 15672:15672 \
 -p 5672:5672 \
 --network hmall \
 -d \
 rabbitmq:3.8-management

可能会出现:docker: Error response from daemon: network hmall not found.
原因是在容器启动时,所需的网络环境没有正确配置。

检查网络列表

docker network ls

创建所需网络

docker network create hmall

运行容器时指定网络

docker run -d --net=hmall rabbitmq:3.8-management

重新启动容器

docker restart mq

新建初始工程

父工程引入依赖

  		<!--AMQP依赖,包含RabbitMQ-->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>

publisher 和 consumer 引入 yml 配置

spring:
  rabbitmq:
    host: 192.168.64.100 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: /dudu # 虚拟主机
    username: dudu # 用户名
    password: 123456 # 密码

###基本消息模型

新建虚拟主机

新建 base.queue 队列

publisher 测试类发送消息

@SpringBootTest
public class BaseTest {
	@Autowired
	private RabbitTemplate rabbitTemplate;
	@Test
	public void testSendMessage(){
		//队列名称
		String queueName = "base.queue";
		//消息
		String message = "基本消息模型测试";
		//发送消息
		rabbitTemplate.convertAndSend(queueName,message);
	}
}

consumer 配置监听消息

@Component
public class RabbitMQListener {
	// 监听基本消息模型 base.queue队列
	@RabbitListener(queues = "base.queue")
	public void baseListener(String msg) {
		System.out.println("base.queue接收到消息:" + msg);
	}
}

work 消息模型

  • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
  • 通过设置 prefetch 来控制消费者预取的消息数量

同基本消息模型一样新建队列:work.queue

publisher 测试类发送消息

@SpringBootTest
public class BaseTest {
	@Autowired
	private RabbitTemplate rabbitTemplate;
	@Test
	public void testSendWorkMessage() {
		for (int i = 1; i <= 10; i++) {
			String message = "测试message" + i;
			rabbitTemplate.convertAndSend("work.queue",message);
		}
	}
}

consumer 配置监听消息

@Component
public class RabbitMQListener {
	// 监听 work 消息模型 work.queue队列
	@RabbitListener(queues = "work.queue")
	public void workListener1(String msg) {
		System.out.println("消费者一接收到work.queue的消息:"+ msg);
	}
	@RabbitListener(queues = "work.queue")
	public void workListener2(String msg) {
		System.err.println("消费者二接收到work.queue的消息:"+ msg);
	}
}

测试


默认是消费者平分消息,并没有考虑到消费者的处理能力。可能会存在一个消费者空闲,一个消费者忙,没有充分的利用消费者。

在 spring 中有一个简单的配置,可以解决这个问题。我们修改 consumer 服务的 application.yml 文件,修改配置:

spring:
  rabbitmq:
    host: 192.168.64.100 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: /dudu # 虚拟主机
    username: dudu # 用户名
    password: 123456 # 密码
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

再次测试

Fanout 交换机消息模型(广播)

  • 接收 publisher 发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • 不能缓存消息,路由失败,消息丢失
  • FanoutExchange 的会将消息路由到每个绑定的队列

新建交换机和队列

将队列绑定到交换机

publisher 测试类发送消息

@SpringBootTest
public class BaseTest {
	@Autowired
	private RabbitTemplate rabbitTemplate;
	@Test
	public void testSendFanoutMessage() {
		String exchangeName = "dudu.fanout";
		String message = "测试Fanout消息模型";
		rabbitTemplate.convertAndSend(exchangeName,"",message);
  }
}

consumer 配置监听消息

@Component
public class RabbitMQListener {
	// 监听 fanout 交换机消息模型(广播) work.queue队列
	@RabbitListener(queues = "fanout.queue1")
	public void fanoutListener1(String msg) {
		System.out.println("消费者一接收到 fanout.queue1 的消息:"+ msg);
	}
	@RabbitListener(queues = "fanout.queue2")
	public void fanoutListener2(String msg) {
		System.err.println("消费者二接收到 fanout.queue2 的消息:"+ msg);
	}

测试

Direct 交换机消息模型(发布-订阅)

  • Fanout 交换机将消息路由给每一个与之绑定的队列
  • Direct 交换机根据 RoutingKey 判断路由给哪个队列
  • 如果多个队列具有相同的 RoutingKey,则与 Fanout 功能类似

新建交换机,队列,绑定路由 key

publisher 测试类发送消息

@SpringBootTest
public class BaseTest {
	@Autowired
	private RabbitTemplate rabbitTemplate;
	@Test
	public void testSendDirectMessage() {
		String exchangeName = "dudu.direct";
		String message = "测试Direct消息模型";
		rabbitTemplate.convertAndSend(exchangeName,"red",message+"红色消息");
		rabbitTemplate.convertAndSend(exchangeName,"blue",message+"蓝色消息");
		rabbitTemplate.convertAndSend(exchangeName,"yellow",message+"黄色消息");
	}
}

consumer 配置监听消息

@Component
public class RabbitMQListener {
	// 监听 direct 交换机消息模型
	@RabbitListener(queues = "direct.queue1")
	public void directListener1(String msg) {
		System.out.println("消费者一接收到 direct.queue1 的消息:"+ msg);
	}
	@RabbitListener(queues = "direct.queue2")
	public void directListener2(String msg) {
		System.err.println("消费者二接收到 direct.queue2 的消息:"+ msg);
}

测试

Topic 交换机消息模型


Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。
只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符!- #:匹配一个或多个词 -*:匹配不多不少恰好 1 个词

举例:

  • item.#:能够匹配item.spu.insert 或者 item.spu
  • item.*:只能匹配item.spu

新建交换机,队列,绑定路由 key

publisher 测试类发送消息

@SpringBootTest
public class BaseTest {
	@Autowired
	private RabbitTemplate rabbitTemplate;
	@Test
	public void testSendTopicMessage() {
		String exchangeName = "dudu.topic";
		String message = "测试Topic消息模型";
		rabbitTemplate.convertAndSend(exchangeName,"china.message",message+"中国消息");
		rabbitTemplate.convertAndSend(exchangeName,"blue.news",message+"蓝色新闻");
		rabbitTemplate.convertAndSend(exchangeName,"yellow.news",message+"新闻");
	}
}

consumer 配置监听消息

@Component
public class RabbitMQListener {
	// 监听 topic 交换机消息模型 work.queue队列
	@RabbitListener(queues = "topic.queue1")
	public void topictListener1(String msg) {
		System.out.println("消费者一接收到 topic.queue1 的消息:"+ msg);
	}
	@RabbitListener(queues = "topic.queue2")
	public void topicListener2(String msg) {
		System.err.println("消费者二接收到 topic.queue2 的消息:"+ msg);
	}
}

测试

声明队列和交换机

若 mq 没有以方法名的交换机或队列, 则根据方法中 return 的新建交换机和队列

DirectConfig

@Configuration
public class DirectConfig {
	/**
	 * 声明交换机		若mq没有名为 fanoutExchange 的交换机, 则创建名为 hmall.direct 的交换机
	 * @return Direct类型交换机
	 */
	@Bean
	public DirectExchange directExchange(){
		return ExchangeBuilder.directExchange("hmall.direct").build();
	}

	/**
	 * 第1个队列
	 */
	@Bean
	public Queue directQueue1(){
		return new Queue("direct.queue1");
	}

	/**
	 * 绑定队列和交换机
	 */
	@Bean
	public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){
		return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
	}
	/**
	 * 绑定队列和交换机
	 */
	@Bean
	public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){
		return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
	}

	/**
	 * 第2个队列
	 */
	@Bean
	public Queue directQueue2(){
		return new Queue("direct.queue2");
	}

	/**
	 * 绑定队列和交换机
	 */
	@Bean
	public Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){
		return BindingBuilder.bind(directQueue2).to(directExchange).with("red");
	}
	/**
	 * 绑定队列和交换机
	 */
	@Bean
	public Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){
		return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
	}

FanoutConfig

@Configuration
public class FanoutConfig {
	/**
	 * 声明交换机 若mq没有名为 fanoutExchange 的交换机,则创建名为 dddddddddddddddddddddddddddddddddddddddddddddddddddddd.fanout 的交换机
	 * @return Fanout类型交换机
	 */
	@Bean
	public FanoutExchange fanoutExchange(){
		//ExchangeBuilder.fanoutExchange("").build();
		return new FanoutExchange("dddddddddddddddddddddddddddddddddddddddddddddddddddddd.fanout");
	}
	/**
	 * 第1个队列
	 */
	@Bean
	public Queue fanoutQueue1(){
		//QueueBuilder.durable("").build();
		return new Queue("fanoutdddddddddddddddddddddddddddddddddddddddddddddddddddddd.queue1");
	}
	/**
	 * 绑定队列和交换机
	 */
	@Bean
	public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
		return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
	}
	/**
	 * 第2个队列
	 */
	@Bean
	public Queue fanoutQueue2(){
		return new Queue("fanoutdddddddddddddddddddddddddddddddddddddddddddddddddddddd.queue2");
	}
	/**
	 * 绑定队列和交换机
	 */
	@Bean
	public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
		return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
	}

声明队列和交换机(注解)

AnnotationDirect

@Configuration
public class AnnotationDirect {
	@RabbitListener(bindings = @QueueBinding(
			value = @Queue(name = "direct.queue1"),
			exchange = @Exchange(name = "dudu.direct", type = ExchangeTypes.DIRECT),
			key = {"red", "blue"}
	))
	public void listenDirectQueue1(String msg){
		System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
	}

	@RabbitListener(bindings = @QueueBinding(
			value = @Queue(name = "direct.queue2"),
			exchange = @Exchange(name = "dudu.direct", type = ExchangeTypes.DIRECT),
			key = {"red", "yellow"}
	))
	public void listenDirectQueue2(String msg){
		System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
	}
}

AnnotationTopic

@Configuration
public class AnnotationTopic {
	@RabbitListener(bindings = @QueueBinding(
			value = @Queue(name = "topic.queue1"),
			exchange = @Exchange(name = "dudu.topic", type = ExchangeTypes.TOPIC),
			key = "china.#"
	))
	public void listenTopicQueue1(String msg){
		System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
	}

	@RabbitListener(bindings = @QueueBinding(
			value = @Queue(name = "topic.queue2"),
			exchange = @Exchange(name = "dudu.topic", type = ExchangeTypes.TOPIC),
			key = "#.news"
	))
	public void listenTopicQueue2(String msg){
		System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
	}
}

消息转换器

默认情况下 Spring 采用的序列化方式是 JDK 序列化,JDK 序列化存在下列问题:数据体积过大、有安全漏洞、可读性差

publisher 测试类 发一个 map 消息

@SpringBootTest
public class BaseTest {
	@Autowired
	private RabbitTemplate rabbitTemplate;
	@Test
	public void testSendMessageMap() {
		Map<String, Object> map = new HashMap<>();
		map.put("name", "张三");
		map.put("age", 18);
		rabbitTemplate.convertAndSend("object.queue",map);
	}
}

队列就手动在 mq 创建一个 object.queue
这时候消息监听服务开着的话就会报错


在 mq 上查看发送的消息

配置 JSON 转换器

publisherconsumer两个服务中都引入依赖:

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
</dependency>

publisherconsumer两个服务的启动类中添加一个 Bean 即可或者写一个配置类把 bean 注入

@Configuration
public class MessageConverterConfig {
	@Bean
	public MessageConverter messageConverter(){
		// 1.定义消息转换器
		Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
		// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
		jackson2JsonMessageConverter.setCreateMessageIds(true);
		return jackson2JsonMessageConverter;
	}
}

consumer 监听 object.queue

@Component
public class RabbitMQListener {
	// 监听 object.queue 队列
	@RabbitListener(queues = "object.queue")
	public void listenSimpleQueueMessage(Map<String, Object> msg) throws InterruptedException {
		System.out.println("消费者接收到object.queue消息:【" + msg + "】");
	}
}

测试

发送者的可靠性

修改 publisher 配置问价

spring:
  rabbitmq:
    host: 192.168.64.100 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: /dudu # 虚拟主机
    username: dudu # 用户名
    password: 123456 # 密码
    # 生产者重试机制
    connection-timeout: 1s #设置MQ的连接超时时间
    template:
      retry:
        enabled: true # 开启超时重试机制  SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。
        # 如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。
        initial-interval: 1000ms # 失败后的初始等待时间
        multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
        max-attempts: 3 # 最大重试次数
    # 生产者确认机制
    # publisher-confirm-type`有三种模式可选:-
    # `none`:关闭confirm机制,simple`:同步阻塞等待MQ的回执,correlated`:MQ异步回调返回回执
    #一般我们推荐使用`correlated`,回调机制。
    publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
    publisher-returns: true # 开启publisher return机制

定义 ReturnCallback

每个RabbitTemplate只能配置一个ReturnCallback,因此我们可以在配置类中统一设置。我们在 publisher 模块定义一个配置类:MqConfig

@AllArgsConstructor
@Configuration
public class MqConfig {
	private final RabbitTemplate rabbitTemplate;

	@PostConstruct
	public void init(){
		rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
			@Override
			public void returnedMessage(@Nonnull ReturnedMessage returnedMessage) {
				System.out.println("收到ReturnsCallback===========================");
				System.out.println("消息未进入队列"+returnedMessage.getMessage());
				System.out.println("交换机:"+returnedMessage.getExchange());
				System.out.println("路由键:"+returnedMessage.getRoutingKey());
				System.out.println("replyCode:"+returnedMessage.getReplyCode());
				System.out.println("replyText:"+returnedMessage.getReplyText());
			}
		});
		rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
			System.out.println("收到ConfirmCallback===========================");
			System.out.println("是否到交换机:"+correlationData);
			System.out.println("ack:"+ack);
			System.out.println("原因:"+cause);
			if (!ack){
				System.out.println("消息发送失败"+cause);
			}
		});
	}
}

也可以这样写

@Configuration
@AllArgsConstructor
public class MqConfig {
	@Bean
	public RabbitTemplate rabbitTemplate(CachingConnectionFactory factory) {
		// 开启消息进入Broker确认
		factory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
		// 开启消息未进入队列确认
		factory.setPublisherReturns(true);

		RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
		// 进入Broker时触发回调
		rabbitTemplate.setConfirmCallback((correlationData, b, s) -> {
			System.out.println("是否到交换机:"+correlationData);
			System.out.println("ack:"+b);
			System.out.println("原因:"+s);
			if (b) {
				System.out.println("消息进入Broker成功");
			} else {
				System.out.println("消息进入Broker失败");
			}
		});

		// Mandatory:为true时,消息通过交换器无法匹配到队列会返回给生产者 并触发MessageReturn,为false时,匹配不到会直接被丢弃
		rabbitTemplate.setMandatory(true);
		// 消息未进入队列时触发回调
		rabbitTemplate.setReturnsCallback(returnedMessage -> {
			System.out.println("消息未进入队列"+returnedMessage.getMessage());
			System.out.println("交换机:"+returnedMessage.getExchange());
			System.out.println("路由键:"+returnedMessage.getRoutingKey());
			System.out.println("replyCode:"+returnedMessage.getReplyCode());
			System.out.println("replyText:"+returnedMessage.getReplyText());
		});
		return rabbitTemplate;
	}
}

新建测试、并且添加 ConfirmCallback

@Test
void testPublisherConfirm() {
    // 1.创建CorrelationData
    CorrelationData cd = new CorrelationData();
    // 2.给Future添加ConfirmCallback
    cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
        @Override
        public void onFailure(Throwable ex) {
            // 2.1.Future发生异常时的处理逻辑,基本不会触发
            log.error("send message fail", ex);
        }
        @Override
        public void onSuccess(CorrelationData.Confirm result) {
            // 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容
            if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执
                log.debug("发送消息成功,收到 ack!");
            }else{ // result.getReason(),String类型,返回nack时的异常描述
                log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
            }
        }
    });
    // 3.发送消息
    rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}

新的版本好像没有这个 addCallback()方法了下面这个倒是可能也可以吧

@Test
	void testPublisherConfirm() throws InterruptedException {
		// 1.创建CorrelationData
		CorrelationData cd = new CorrelationData();
		cd.getFuture().whenComplete((confirm, throwable) -> {
			System.out.println("confirm: " + confirm + " throwable: " + throwable);
			if (confirm.isAck()) {
				System.out.println("消息发送成功,收到ack"+confirm.getReason());
			}else {
				System.out.println("消息发送失败,收到nack"+confirm.getReason());
			}
		});
		rabbitTemplate.convertAndSend("hmall.11direct", "blu1e", "hello",cd);
		Thread.sleep(2000);
	}

测试

总结

开启生产者确认比较消耗 MQ 性能,一般不建议开启。而且大家思考一下触发确认的几种情况:

  • 路由失败:一般是因为 RoutingKey 错误导致,往往是编程导致
  • 交换机名称错误:同样是编程错误导致
  • MQ 内部故障:这种需要处理,但概率往往较低。因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启 ConfirmCallback 处理 nack 就可以了。

MQ 的可靠性

说明:在开启持久化机制以后,如果同时还开启了生产者确认,那么 MQ 会在消息持久化以后才发送 ACK 回执,进一步确保消息的可靠性。

不过出于性能考虑,为了减少 IO 次数,发送到 MQ 的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在 100 毫秒左右,这就会导致 ACK 有一定的延迟,因此建议生产者确认全部采用异步方式。

交换机持久化

队列持久化

消息持久化

在控制台发消息时可以指定参数

代码实现

	@Test
	public void testSendMessage(){
		//队列名称
		String queueName = "base.queue";
		//消息
		String message = "基本消息模型测试";
		//发送消息
		//设置消息持久化
		rabbitTemplate.setMandatory(true);
		rabbitTemplate.convertAndSend(queueName,message);
	}

LazyQueue

在 3.12 版本之后,LazyQueue 已经成为所有队列的默认格式。因此官方推荐升级 MQ 为 3.12 版本或者所有队列都设置为 LazyQueue 模式。
在添加队列的时候,添加x-queue-mod=lazy参数即可设置队列为 Lazy 模式:

代码

@Bean
public Queue lazyQueue(){
    return QueueBuilder
            .durable("lazy.queue")
            .lazy() // 开启Lazy模式
            .build();
}

注解方式

@RabbitListener(queuesToDeclare = @Queue(
        name = "lazy.queue",
        durable = "true",
        arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){
    log.info("接收到 lazy.queue的消息:{}", msg);
}

更新已有队列为 lazy 模式

命令

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues

命令解读:

  • rabbitmqctl :RabbitMQ 的命令行工具
  • set_policy :添加一个策略
  • Lazy :策略名称,可以自定义
  • "^lazy-queue$" :用正则表达式匹配队列的名字
  • '{"queue-mode":"lazy"}' :设置队列模式为 lazy 模式
  • --apply-to queues:策略的作用对象,是所有的队列

消费者的可靠性

消费者确认机制

当消费者处理消息结束后,应该向 RabbitMQ 发送一个回执,告知 RabbitMQ 自己消息处理状态。回执有三种可选值:

  • ack:成功处理消息,RabbitMQ 从队列中删除该消息
  • nack:消息处理失败,RabbitMQ 需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ 从队列中删除该消息

一般 reject 方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过try catch机制捕获,消息处理成功时返回 ack,处理失败时返回 nack.

由于消息回执的处理代码比较统一,因此 SpringAMQP 帮我们实现了消息确认。并允许我们通过配置文件设置 ACK 处理方式,有三种模式:

  • none:不处理。即消息投递给消费者后立刻 ack,消息会立刻从 MQ 删除。非常不安全,不建议使用

  • manual:手动模式。需要自己在业务代码中调用 api,发送ackreject,存在业务入侵,但更灵活

  • auto:自动模式。SpringAMQP 利用 AOP 对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:

    • 如果是业务异常,会自动返回nack
    • 如果是消息处理或校验异常,自动返回reject;

修改 consumer 的 yml 文件

spring:
  rabbitmq:
    host: 192.168.64.100 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: /dudu # 虚拟主机
    username: dudu # 用户名
    password: 123456 # 密码
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
        # 确认模式
        # none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
        # manual:手动确认。即消费者处理完消息后,需要手动ack。
        # auto:自动确认。即消费者处理完消息后,自动ack,消息会从MQ删除。如果是业务异常,会自动返回`nack` 消息处理或校验异常,自动返回`reject`消息不会从MQ删除
        acknowledge-mode: auto

失败重试机制

修改 consumer 的配置

spring:
  rabbitmq:
    host: 192.168.64.100 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: /dudu # 虚拟主机
    username: dudu # 用户名
    password: 123456 # 密码
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
        # 确认模式
        # none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
        # manual:手动确认。即消费者处理完消息后,需要手动ack,消息不会从MQ删除。
        # auto:自动确认。即消费者处理完消息后,自动ack,消息会从MQ删除。如果是业务异常,会自动返回`nack` 消息处理或校验异常,自动返回`reject`
        acknowledge-mode: auto
        # 失败重试机制
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
  • 消费者在失败后消息没有重新回到 MQ 无限重新投递,而是在本地重试了 3 次
  • 本地重试 3 次以后,抛出了AmqpRejectAndDontRequeueException异常。查看 RabbitMQ 控制台,发现消息被删除了,说明最后 SpringAMQP 返回的是reject

结论:

  • 开启本地重试时,消息处理过程中抛出异常,不会 requeue 到队列,而是在消费者本地重试
  • 重试达到最大次数后,Spring 会返回 reject,消息会被丢弃

失败处理策略

本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。
因此 Spring 允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,它有 3 个不同实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

ErrorMessageConfig 配置类

@Configuration
// 开启重试机制 这个配置类才会生效
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {
	@Bean
	public DirectExchange errorMessageExchange(){
		return new DirectExchange("error.direct");
	}
	@Bean
	public Queue errorQueue(){
		return new Queue("error.queue", true);
	}
	@Bean
	public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
		return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
	}

	@Bean
	public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
		return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
	}
}

延迟消息

死信交换机

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.rejectbasic.nack声明消费失败,并且消息的requeue参数设置为 false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息满了,无法投递
    如果一个队列中的消息已经成为死信,并且这个队列通过**dead-letter-exchange**属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。

死信交换机有什么作用呢?

  1. 收集那些因处理失败而被拒绝的消息
  2. 收集那些因队列满了而被拒绝的消息
  3. 收集因 TTL(有效期)到期的消息

注意:
RabbitMQ 的消息过期是基于追溯方式来实现的,也就是说当一个消息的 TTL 到期以后不一定会被移除或投递到死信交换机,而是在消息恰好处于队首时才会被处理。
当队列中消息堆积很多的时候,过期消息可能不会被按时处理,因此你设置的 TTL 时间不一定准确。

DelayExchange 插件

安装
基于 Docker 安装,所以需要先查看 RabbitMQ 的插件目录对应的数据卷

docker volume inspect mq-plugins

结果如下

[
    {
        "CreatedAt": "2024-06-19T09:22:59+08:00",
        "Driver": "local",
        "Labels": null,
        "Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data",
        "Name": "mq-plugins",
        "Options": null,
        "Scope": "local"
    }
]

插件目录被挂载到了/var/lib/docker/volumes/mq-plugins/_data这个目录,我们上传插件到该目录下。

接下来执行命令,安装插件:

docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

声明延迟交换机

基于注解方式:

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "delay.queue", durable = "true"),
        exchange = @Exchange(name = "delay.direct", delayed = "true"),
        key = "delay"
))
public void listenDelayMessage(String msg){
    log.info("接收到delay.queue的延迟消息:{}", msg);
}

基于@Bean的方式:

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DelayExchangeConfig {
    @Bean
    public DirectExchange delayExchange(){
        return ExchangeBuilder
                .directExchange("delay.direct") // 指定交换机类型和名称
                .delayed() // 设置delay的属性为true
                .durable(true) // 持久化
                .build();
    }
    @Bean
    public Queue delayedQueue(){
        return new Queue("delay.queue");
    }
    @Bean
    public Binding delayQueueBinding(){
        return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
    }
}

发送延迟消息

@SpringBootTest
public class BaseTest {
	@Autowired
	private RabbitTemplate rabbitTemplate;
	@Test
	void testPublisherDelayMessage() {
		// 1.创建消息
		String message = "hello, delayed message";
		// 2.发送消息,利用消息后置处理器添加消息头
		rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
			@Override
			public Message postProcessMessage(Message message) throws AmqpException {
				// 添加延迟消息属性  这里的.setHeader("x-delay", 10000)替代了setDelay(10000)
				message.getMessageProperties().setHeader("x-delay", 10000);
				return message;
			}
		});
		System.out.println("消息发送成功"+ LocalDateTime.now());
	}
}

消息发送十秒后,消费者接收到消息

注意: 延迟消息插件内部会维护一个本地数据库表,同时使用 Elang Timers 功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的 CPU 开销,同时延迟消息的时间会存在误差。
因此,不建议设置延迟时间过长的延迟消息

假如订单超时支付时间为 30 分钟,理论上说我们应该在下单时发送一条延迟消息,延迟时间为 30 分钟。这样就可以在接收到消息时检验订单支付状态,关闭未支付订单。
但是大多数情况下用户支付都会在 1 分钟内完成,我们发送的消息却要在 MQ 中停留 30 分钟,额外消耗了 MQ 的资源。因此,我们最好多检测几次订单支付状态,而不是在最后第 30 分钟才检测。
例如:我们在用户下单后的第 10 秒、20 秒、30 秒、45 秒、60 秒、1 分 30 秒、2 分、…30 分分别设置延迟消息,如果提前发现订单已经支付,则后续的检测取消即可。
这样就可以有效避免对 MQ 资源的浪费了。

整体视图

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/610834.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

福昕PDF阅读器取消手型工具鼠标点击翻页

前言&#xff1a; 本文介绍如何关闭福昕PDF阅读器取消手型工具鼠标点击翻页&#xff0c;因为这样真的很容易误触发PDF翻页&#xff0c;使用起来让人窝火。 引用&#xff1a; NA 正文&#xff1a; 新版的福昕PDF阅读器默认打开了“使用手型工具阅读文章”这个勾选项&#x…

五、Redis五种常用数据结构-SET

Redis的Set结构存储的数据和Java中的HashSet类似&#xff0c;都是无序且不重复的。其底层的数据结构有两种&#xff0c;一是当value为整数时&#xff0c;且数据量不大时采用intset来存储。其他情况使用dict字典存储。集合中最多存储232-1(40多亿)个数据。 1、常用命令 sadd k…

Amesim基础篇-热仿真常用模型库-Air Conditioning-Pipes

前言 基于上文对空调库各个元件的介绍&#xff0c;本文进一步将其中的管路展开。 管路介绍 1 摩擦阻力管&#xff08;R&#xff09;&#xff1a; 具有阻力特性的管路&#xff0c;通过管长以及管截面计算阻力。 2 可调节阻力管&#xff08;R&#xff09;&#xff1a; 只具有…

字节薪资解密,张一鸣啥等级?

大家好&#xff0c;我是白露啊。 之前说BAT&#xff0c;可能是指百度、阿里、腾讯&#xff0c;但是现在&#xff0c;这个 B&#xff0c;大多数时候指的是字节跳动了。 随着抖音系产品的流量持续升温&#xff0c;字节跳动已经是一个毋庸置疑的互联网大厂了&#xff0c;不管是想…

小阳的戒S笔记

文章目录 写在前面2024年5月8日21:12:172024年5月9日21:48:242024年5月10日08:04:141、记录昨夜之身体变化2、自身制定之计划1.此亦乃要事&#xff0c;特定问了度娘与GPT&#xff0c;找时间还得咨询专业医师。2.通过跑步宣泄&#xff0c;同时锻炼身体3.我不会有压力&#xff0c…

HFSS学习-day4-建模操作

通过昨天的学习&#xff0c;我们已经熟悉了HFSS的工作环境&#xff1b;今天我们来讲解HFSS中创建物体模型的县体步骤和相关操作。物体建模是HFSS仿真设计工作的第一步&#xff0c;HFSS中提供了诸如矩形、圆面、长方体圆柱体和球体等多种基本模型(Primitive)&#xff0c;这些基本…

Docker学习二(Centos):Docker安装并运行redis(成功运行)

文章目录 前言一、下载并挂载1. 拉取镜像2. 创建挂载目录3. 下载redis.conf文件4. 赋予权限5. 修改redis.conf 默认配置 二、docker运行redis三、检查redis运行状态四、navicat链接redis 前言 一、下载并挂载 1. 拉取镜像 docker pull redis2. 创建挂载目录 fengfanli是我自…

Sarcasm detection论文解析 |基于混合自动编码器的模型对社交媒体平台进行讽刺检

论文地址 论文地址&#xff1a;Electronics | Free Full-Text | Sarcasm Detection over Social Media Platforms Using Hybrid Auto-Encoder-Based Model (mdpi.com) 论文首页 笔记框架 基于混合自动编码器的模型对社交媒体平台进行讽刺检 &#x1f4c5;出版年份:2022 &#x…

5.08.7 CMT: Convolutional Neural Networks Meet Vision Transformers

1. 介绍 将基于 Transformer 的架构应用于视觉领域&#xff0c;并在图像分类、目标检测和语义分割等各种任务中取得了有希望的结果。 Vision Transformer (ViT)是第一个用纯 Transformer 替代传统 CNN 主干的工作。输入图像&#xff08;2242243&#xff09;首先被分割成196个不…

系统架构设计师 - 计算机组成与体系结构(1)

计算机组成与体系结构 计算机组成与体系结构计算机结构 ★CPU 组成结构运算器组成控制器组成 计算机体系结构冯诺依曼结构哈弗结构 嵌入式芯片&#xff08;了解&#xff09; 存储系统 ★★★★概述Cache主存编址磁盘管理磁盘基本结构与存取过程磁盘优化分布存储磁盘管理 大家好…

绝地求生:杜卡迪联动下架,兰博基尼联动预计在下半年上线!

杜卡迪联名活动即将在5月8日上午八点下架&#xff0c;届时商城内购买-升阶活动将不可用。 杜卡迪下架 本次杜卡迪联名是蓝洞首次以非通行证方式进行的载具联名活动&#xff0c;玩家认为有利有弊。 多数玩家表示非通行证-仅抽奖获取的方式成本太高&#xff0c;部分脸黑玩家本次…

c++ poencv Project2 - Document Scanner

惯例先上结果图&#xff1a; 本文提供一种文本提取思路&#xff1a; 1、首先图像预处理&#xff1a;灰度转换、高斯模糊、边缘提取&#xff0c;膨胀。 Mat preProcessing(Mat img) {cvtColor(img, imgGray, COLOR_BGR2GRAY);GaussianBlur(imgGray, imgBlur, Size(3, 3), 3, …

基于鸢尾花数据集的四种聚类算法(kmeans,层次聚类,DBSCAN,FCM)和学习向量量化对比

基于鸢尾花数据集的四种聚类算法&#xff08;kmeans&#xff0c;层次聚类&#xff0c;DBSCAN,FCM&#xff09;和学习向量量化对比 注&#xff1a;下面的代码可能需要做一点参数调整&#xff0c;才得到所有我的运行结果。 kmeans算法&#xff1a; import matplotlib.pyplot a…

从面试官视角出发,聊聊产品经理的面试攻略

一、请进行自我介绍 这题基本是面试的开胃菜了&#xff0c;估计面试多的&#xff0c;自己答案都能倒背如流啦。 其实自我介绍还是蛮重要的&#xff0c;对我来说主要有 3 个作用&#xff1a;面试准备、能力预估、思维评估。 面试准备&#xff1a;面试官每天都要面 3 ~6 人&am…

嵌入式C语言高级教程:实现基于STM32的智能水质监测系统

智能水质监测系统可以实时监控水体的质量&#xff0c;对于环境保护和水资源管理具有重要意义。本教程将指导您如何在STM32微控制器上实现一个基本的智能水质监测系统。 一、开发环境准备 硬件要求 微控制器&#xff1a;STM32F303K8&#xff0c;因其高精度模拟特性而被选用。…

嵌入式C语言高级教程:实现基于STM32的智能照明系统

智能照明系统不仅可以自动调节光源的亮度和色温&#xff0c;还可以通过感应用户的行为模式来优化能源消耗。本教程将指导您如何在STM32微控制器上实现一个基本的智能照明系统。 一、开发环境准备 硬件要求 微控制器&#xff1a;STM32F103RET6&#xff0c;具有足够的处理能力…

苹果再失资深设计师,Jony Ive 团队基本离开;OpenAI 或于下周发布 AI 搜索丨 RTE 开发者日报 Vol.201

开发者朋友们大家好&#xff1a; 这里是 「RTE 开发者日报」 &#xff0c;每天和大家一起看新闻、聊八卦。我们的社区编辑团队会整理分享 RTE&#xff08;Real Time Engagement&#xff09; 领域内「有话题的 新闻 」、「有态度的 观点 」、「有意思的 数据 」、「有思考的 文…

测试环境搭建整套大数据系统(十六:超级大文件处理遇到的问题)

一&#xff1a;yarn出现损坏的nodemanger 报错现象 日志&#xff1a;1/1 local-dirs usable space is below configured utilization percentage/no more usable space [ /opt/hadoop-3.2.4/data/nm-local-dir : used space above threshold of 90.0% ] ; 1/1 log-dirs usabl…

【SRC实战】合成类小游戏外挂漏洞

挖个洞先 https://mp.weixin.qq.com/s/ZnaRn222xJU0MQxWoRaiJg “以下漏洞均为实验靶场&#xff0c;如有雷同&#xff0c;纯属巧合” 合成类小游戏三个特点&#xff1a; 1、一关比一关难&#xff0c;可以参考“羊了个羊” 2、无限关卡无限奖励&#xff0c;可以参考“消灭星星…

【Qt 学习笔记】Qt常用控件 | 多元素控件 | List Widget的说明及介绍

博客主页&#xff1a;Duck Bro 博客主页系列专栏&#xff1a;Qt 专栏关注博主&#xff0c;后期持续更新系列文章如果有错误感谢请大家批评指出&#xff0c;及时修改感谢大家点赞&#x1f44d;收藏⭐评论✍ Qt常用控件 | 多元素控件 | List Widget的说明及介绍 文章编号&#x…
最新文章