RabbitMQ的消息确认类型
- RabbitMQ给生产者返回消息确认
- 消费端给RabbitMQ返回消息确认
- 消费端给RabbitMQ返回消息拒绝
RabbitMQ返回消息确认
- 生产者端将消息发送出去,消息到达RabbitMQ之后,会返回一个到达确认。
- 这个确认实际上就是官方常说的ConfirmCallback,我们通过在生产者端使用一个回调类来监听RabbiMQ返回的消息确认。
- Spring AMQP中我们通过设置RabbitTemplate的ConfirmCallback属性来实现消息确认回调,通过一个实现了ConfirmCallback的类来实现回调逻辑。
举例说明
@Bean(name = "smsRabbitTemplate")public RabbitTemplate smsRabbitTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory()); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); rabbitTemplate.setRoutingKey(smsRoutingKey); rabbitTemplate.setQueue(QUEUE_SMS); rabbitTemplate.setExchange("smsExchange"); rabbitTemplate.setConfirmCallback(smsConfirmCallBack()); rabbitTemplate.setRetryTemplate(retryTemplate()); return rabbitTemplate;}
/** * Email队列回调 * @author Xiaoyang.Li * */public class EmailConfirmCallBack implements ConfirmCallback{ private Logger logger = LoggerFactory.getLogger(EmailConfirmCallBack.class); @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if(ack){ logger.info(correlationData.getId() + "--邮件已发送到RabbitMQ队列中."); }else{ logger.info(correlationData.getId() + "--邮件发送到RabbitMQ队列失败。{}", cause); } }}
消费端消息确认
- 默认的rabbitmq消费端是开启了自动确认的。
- 实际项目中往往我们会对消息进行一系列的处理,然后再给出消费确认,也就是我们需要关闭自动确认,使用手动确认,通过设置
AcknowledgeMode为MANUAL
可以开启手动确认。- Spring AMQP中我们可以通过设置
SimpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL)来进行开启手动确认
。
###举例说明
@Beanpublic SimpleMessageListenerContainer messageListenerContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory()); container.setMessageConverter(new Jackson2JsonMessageConverter()); //如果设置手动消费,那么需要使用Channel.basicAck()进行数据返回, //所以MessageListener需要实现ChannelAwareMessageListener接口 container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置手动确认消息被消费 container.setQueues(mailQueue());//添加监听队列 container.setMessageListener(mailListenerUseChannel());//如果设置AcknowledgeMode为手动,那么需要使用这个 return container;}
消费端消息拒绝
- 消费端如果多次消费失败,我们可以将这条消息拒绝,通过死信设置,rabbitmq会将拒绝的消息存放到死信队列中去。
- 消费端通过使用
basic.reject
来进行拒绝操作。- Spring AMQP中我们通过调用
channel.basicReject(deliveryTag, false)
来进行消费拒绝
举例说明
public class MailListenerUseChannel implements ChannelAwareMessageListener { private Logger logger = LoggerFactory.getLogger(MailListenerUseChannel.class); @Autowired private MimeMailSender mimeMailSender; private ByteArrayToObject byteArrayToObj; @PostConstruct public void init(){ byteArrayToObj = new ByteArrayToObject(); } /** * 从mq取出数据然后发送 */ public void onMessage(Message message, Channel channel) { boolean isComplete = true; if(message == null){ logger.error("接收到的消息为空"); isComplete = false; } logger.info("消息格式"+message.getMessageProperties().getContentType()); EmailMessage emailMessage = byteArrayToObj.parseMessage(message, EmailMessage.class); if(emailMessage.getHeader() == null){ logger.error("Email header is null. Please check message."); isComplete = false; } if(isComplete){ logger.info("邮件类型>>>>>"+emailMessage.getHeader().getMsgType()); try{ //发送邮件 mimeMailSender.sendEmail(emailMessage); //发送成功后返回响应给mq channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }catch(Exception e){ logger.error("sendEmail error.", e); e.printStackTrace(); } }else{ try { //如果消息不合法,直接丢弃 channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); logger.error("Discarded a message!"); } catch (IOException e) { logger.error("Reject/Nack message error."); e.printStackTrace(); } } }}