博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ 消息确认
阅读量:7103 次
发布时间:2019-06-28

本文共 3694 字,大约阅读时间需要 12 分钟。

hot3.png

RabbitMQ的消息确认类型

  • RabbitMQ给生产者返回消息确认
  • 消费端给RabbitMQ返回消息确认
  • 消费端给RabbitMQ返回消息拒绝

RabbitMQ返回消息确认

  • 生产者端将消息发送出去,消息到达RabbitMQ之后,会返回一个到达确认。
  • 这个确认实际上就是官方常说的ConfirmCallback,我们通过在生产者端使用一个回调类来监听RabbiMQ返回的消息确认。
  • Spring AMQP中我们通过设置RabbitTemplate的ConfirmCallback属性来实现消息确认回调,通过一个实现了ConfirmCallback的类来实现回调逻辑。

举例说明

@Bean(name = "smsRabbitTemplate")public RabbitTemplate smsRabbitTemplate() {	RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());	rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());	rabbitTemplate.setRoutingKey(smsRoutingKey);	rabbitTemplate.setQueue(QUEUE_SMS);	rabbitTemplate.setExchange("smsExchange");	rabbitTemplate.setConfirmCallback(smsConfirmCallBack());	rabbitTemplate.setRetryTemplate(retryTemplate());	return rabbitTemplate;}
/** * Email队列回调 * @author Xiaoyang.Li * */public class EmailConfirmCallBack implements ConfirmCallback{	private Logger logger = LoggerFactory.getLogger(EmailConfirmCallBack.class);		@Override	public void confirm(CorrelationData correlationData, boolean ack, String cause) {		if(ack){			logger.info(correlationData.getId() + "--邮件已发送到RabbitMQ队列中.");		}else{			logger.info(correlationData.getId() + "--邮件发送到RabbitMQ队列失败。{}", cause);		}	}}

消费端消息确认

  • 默认的rabbitmq消费端是开启了自动确认的。
  • 实际项目中往往我们会对消息进行一系列的处理,然后再给出消费确认,也就是我们需要关闭自动确认,使用手动确认,通过设置AcknowledgeMode为MANUAL可以开启手动确认。
  • Spring AMQP中我们可以通过设置SimpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL)来进行开启手动确认

###举例说明

@Beanpublic SimpleMessageListenerContainer messageListenerContainer() {       SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();       container.setConnectionFactory(connectionFactory());       container.setMessageConverter(new Jackson2JsonMessageConverter());       //如果设置手动消费,那么需要使用Channel.basicAck()进行数据返回,       //所以MessageListener需要实现ChannelAwareMessageListener接口       container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置手动确认消息被消费       container.setQueues(mailQueue());//添加监听队列       container.setMessageListener(mailListenerUseChannel());//如果设置AcknowledgeMode为手动,那么需要使用这个              return container;}

消费端消息拒绝

  • 消费端如果多次消费失败,我们可以将这条消息拒绝,通过死信设置,rabbitmq会将拒绝的消息存放到死信队列中去。
  • 消费端通过使用basic.reject来进行拒绝操作。
  • Spring AMQP中我们通过调用channel.basicReject(deliveryTag, false)来进行消费拒绝

举例说明

public class MailListenerUseChannel implements ChannelAwareMessageListener {	private Logger logger = LoggerFactory.getLogger(MailListenerUseChannel.class);		@Autowired	private MimeMailSender mimeMailSender;		private ByteArrayToObject byteArrayToObj;		@PostConstruct	public void init(){		byteArrayToObj = new ByteArrayToObject();	}	/**	 * 从mq取出数据然后发送	 */	public void onMessage(Message message, Channel channel) {		boolean isComplete = true;		if(message == null){			logger.error("接收到的消息为空");			isComplete = false;		}		logger.info("消息格式"+message.getMessageProperties().getContentType());		EmailMessage emailMessage = byteArrayToObj.parseMessage(message, EmailMessage.class);		if(emailMessage.getHeader() == null){			logger.error("Email header is null. Please check message.");			isComplete = false;		}				if(isComplete){			logger.info("邮件类型>>>>>"+emailMessage.getHeader().getMsgType());						try{				//发送邮件				mimeMailSender.sendEmail(emailMessage);				//发送成功后返回响应给mq				channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);			}catch(Exception e){				logger.error("sendEmail error.", e);				e.printStackTrace();			}		}else{			try {				//如果消息不合法,直接丢弃				channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);				logger.error("Discarded a message!");			} catch (IOException e) {				logger.error("Reject/Nack message error.");				e.printStackTrace();			}		}	}}

转载于:https://my.oschina.net/huaxian8812/blog/779474

你可能感兴趣的文章
saltstack自动部署apache实例
查看>>
电信技术培训AAA介绍
查看>>
docker_基础_2
查看>>
Zabbix系统数据采集方法总结
查看>>
手工编译LAMP
查看>>
HTML --段落
查看>>
广义表
查看>>
输出一个整数的每一位
查看>>
学习Spark的入门教程——《Spark核心源码分析与开发实战》
查看>>
MYSQL函数
查看>>
CentOS7.4安装部署KVM虚拟机
查看>>
2015年度总结
查看>>
3.c#-练习_编程实现计算几天_如46天_是几周零几_天._6周零4天
查看>>
yum install redis安装的配置文件
查看>>
Linux LVM逻辑卷
查看>>
layoutSubviews总结
查看>>
Lintcode93 Balanced Binary Tree solution 题解
查看>>
用Python从零开始创建区块链
查看>>
企业应当建立安全保障机制,以随时应对各种***事件
查看>>
校验左右括号是否匹配,包含数量和左右括号是否匹配
查看>>