请配合《Spring Boot使用RabbitMQ》一文使用
可靠投递
加入配置文件的相关配置
spring:
rabbitmq:
host: 192.168.5.101
port: 5672
virtual-host: /
# 开启发送端确认
publisher-confirms: true
# 开启发送端消息抵达队列的确认
publisher-returns: true
template:
mandatory: true
listener:
simple:
# 消费端的消息手动确认,默认是auto
acknowledge-mode: manual
修改配置类
@Configuration
public class CustomRabbitConfig {
// 注入依赖
@Autowired
private RabbitTemplate rabbitTemplate;
@Bean
MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
// CustomRabbitConfig创建完成后执行此方法,类似于在对象实例化后对rabbitTemplate进行拓展。
@PostConstruct
public void initRabbitTemplate(){
// 设置确认回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
// correlationData:这个消息的唯一ID
// ack:消息是否成功收到,true为收到
// cause:失败原因
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
}
});
// 设置消息抵达队列的确认回调
// 只有消息没有投递给指定的队列,才会触发这个回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
// message:投递失败的消息的详细信息
// replyCode:回复的状态码
// replyText:回复的文本内容
// exchange:当时这个消息发给了哪个交换机
// routingKey:当时这个消息发给了哪个路由件
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
}
});
}
}
消费端确认机制
@Slf4j
@Service
public class RabbitMessageReciveListener {
@RabbitListener(queues = {"hello-queue"})
public void reviceMessageListener(Message message, SysUser sysUser, Channel channel) throws IOException {
log.info("接收到消息,内容:{},类型:{}", message, sysUser);
// deliveryTag:消息的标记,通过这个标记来签收或者拒收消息
long deliveryTag = message.getMessageProperties().getDeliveryTag();
// deliveryTag:消息的标记
// multiple:是否批量签收,一般false
//void basicAck(long deliveryTag, boolean multiple) throws IOException;
channel.basicAck(deliveryTag, false); // 签收消息
// deliveryTag:消息的标记
// multiple:是否批量签收,一般false
// requeue:是否重新返回队列,一般为true,防止消息丢失
// void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
channel.basicNack(deliveryTag, false, true); // 拒收消息
}
}