假设我们的MQ使用都没有问题,但是如果消费者系统的数据库挂了呢?因为我们一直都是假设了一个场景,就是生产者在处理完自己的逻辑之后会推消息到MQ,然后下游消费者系统从MQ里获取消息去执行后续的处理。
那么如果这个时候,消费者系统的数据库宕机了,同样会使消费者从MQ里获取到消息之后,消费线程就会挂掉,没办法继续进行处理。
所以针对这样的场景,消费者系统要怎么处理?应该如何重试?
在下面代码片段中,我们注册了一个***器回调函数,当consumer获取到消息之后,就会调用这个函数进行处理
consumer/registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List</MessageExt>/ msgs/ ConsumeConcurrentlyContext context) { // 在这里对获取到的msgs订单消息进行处理 // 比如增加积分、发送优惠券、通知发货,等等 return ConsumeConcurrentlyStatus/CONSUME_SUCCESS/ } })/我们可以在这个回调函数中对消息进行处理,处理完之后,就可以告诉RocketMQ Consumer这批消息的处理结果。
比如,如果返回的是CONSUME_SUCCESS,那么Consumer就知道这批消息处理完成了,就会提交这批消息的offset到broker上去,然后下次就会继续从broker上获取下一批消息来处理。
但是如果此时我们在上面的回调函数中,对一批消息进行处理的时候,因为数据库宕机了,导致处理逻辑无法完成,此时我们还能返回CONSUME_SUCCESS吗?如果你返回的话,下次就会处理下一批消息,但是这批消息其实没有处理成功,此时必然就导致这批消息丢失了。
如果因为数据库宕机,导致对这批消息处理是异常的,就应该返回一个RECONSUME_LATER状态。
告诉RocketMQ这批消息处理有异常,过段时间再次给我这批消息让我重新试一下。
所以我们的代码应该改成下面这样:
consumer/registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List</MessageExt>/ msgs/ ConsumeConcurrentlyContext context) { try { // 在这里对获取到的msgs订单消息进行处理 // 比如增加积分、发送优惠券、通知发货,等等 return ConsumeConcurrentlyStatus/CONSUME_SUCCESS/ } catch (Exception e) { // 如果因为数据库宕机等问题,对消息处理失败了 // 此时返回一个稍后重试的状态 return ConsumeConcurrentlyStatus/RECONSUME_LATER/ } } })/那么RocketMQ在收到你返回的RECONSUME_LATER状态之后,是如何让你进行消费重试的呢?
RocketMQ有一个针对这个ConsumerGroup的重试队列,如果返回了RECONSUME_LATER状态,他会把你这批消息放到这个消费组的重试队列中去。
比如你的消费者组的名称是 VoucherConsumerGroup ,那么他会有一个“%RETRY%VoucherConsumerGroup”这个名字的重试队列。
重试队列中的消息会按照配置的时间再次给消费者,让消费者进行处理,如果再次失败,那么会再过一段时间让消费者进行处理,默认最多是重试16次,每次重试之间的间隔时间是可以配置的:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h那么如果在16次重试范围内消息处理成功了,自然就没问题了,但是如果你对一批消息重试了16次还是无法处理成功呢?这个时候会把消息放到死信队列中。
其实就是一批消息交给消费者去处理,消费者重试了16次还一直没有处理成功,就不要继续重试这批消息了,就可以认为他们死掉了就可以了,然后这批消息会自动进入死信队列。
死信队列的名字是 %DLQ%VoucherConsumerGroup
免责声明:
1. 《RocketMQ - 如何用死信队列解决消费者异常》内容来源于互联网,版权归原著者或相关公司所有。
2. 若《86561825文库网》收录的文本内容侵犯了您的权益或隐私,请立即通知我们删除。