请配合《Spring Boot使用RabbitMQ》一文使用

可靠投递

加入配置文件的相关配置

spring:
  rabbitmq:
    host: 192.168.5.101
    port: 5672
    virtual-host: /
    # 开启发送端确认
    publisher-confirms: true
    # 开启发送端消息抵达队列的确认
    publisher-returns: true
    template:
      mandatory: true
    listener:
      simple:
        # 消费端的消息手动确认,默认是auto
        acknowledge-mode: manual

修改配置类

@Configuration
public class CustomRabbitConfig {

    // 注入依赖
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Bean
    MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
    
    // CustomRabbitConfig创建完成后执行此方法,类似于在对象实例化后对rabbitTemplate进行拓展。
    @PostConstruct
    public void initRabbitTemplate(){
        // 设置确认回调
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            // correlationData:这个消息的唯一ID
            // ack:消息是否成功收到,true为收到
            // cause:失败原因
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {

            }
        });
        
        // 设置消息抵达队列的确认回调
        // 只有消息没有投递给指定的队列,才会触发这个回调
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            // message:投递失败的消息的详细信息
            // replyCode:回复的状态码
            // replyText:回复的文本内容
            // exchange:当时这个消息发给了哪个交换机
            // routingKey:当时这个消息发给了哪个路由件
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

            }
        });
    }
}

消费端确认机制

@Slf4j
@Service
public class RabbitMessageReciveListener {

    @RabbitListener(queues = {"hello-queue"})
    public void reviceMessageListener(Message message, SysUser sysUser, Channel channel) throws IOException {
        log.info("接收到消息,内容:{},类型:{}", message, sysUser);
        
        // deliveryTag:消息的标记,通过这个标记来签收或者拒收消息
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        
        // deliveryTag:消息的标记
        // multiple:是否批量签收,一般false
        //void basicAck(long deliveryTag, boolean multiple) throws IOException;
        channel.basicAck(deliveryTag, false); // 签收消息
        
        // deliveryTag:消息的标记
        // multiple:是否批量签收,一般false
        // requeue:是否重新返回队列,一般为true,防止消息丢失
        // void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
        channel.basicNack(deliveryTag, false, true); // 拒收消息
    }
}

# To Be Continued!😎

Last Updated: 4/7/2021, 11:50:51 PM