死信队列
死信交换机
死信交换机,Dead-Letter-Exchange 即 DLX。
死信交换机用来接收死信消息(Dead Message)的,那什么是死信消息呢?一般消息变成死信消息有如下几种情况:
- 消息被拒绝(Basic.Reject/Basic.Nack),并且设置 queue 参数为false
- 消息过期
- 队列到达最大长度
当消息在一个队列中变成死信消息后,此时就会被发送到 DLX,绑定 DLX 的消息队列则成为死信队列。
DLX 本质上也是一个普普通通的交换机,我们可以为任意队列指定 DLX,当该队列中存在死信时,RabbitMQ 就会自动的将这个死信发布到 DLX 上去,进而被路由到另一个绑定了 DLX 的队列上(即死信队列)。
死信队列
这个好理解,绑定了死信交换机的队列就是死信队列。
实践
我们来看一个简单的例子。
首先我们来创建一个死信交换机,接着创建一个死信队列,再将死信交换机和死信队列绑定到一起:
@Configuration
public class RabbitConfig {
public static final String DLX_EXCHANGE = "dlx_exchange";
public static final String DLX_QUEUE = "dlx_queue";
public static final String DLX_ROUTING_KEY = "dlx_routing_key";
public static final String MSG_EXCHANGE = "msg_exchange";
public static final String MSG_QUEUE = "msg_queue";
public static final String MSG_ROUTING_KEY = "msg_routing_key";
@Bean
DirectExchange dlxDirectExchange(){
return new DirectExchange(DLX_EXCHANGE,true,false);
}
@Bean
Queue dlxQueue(){
return new Queue(DLX_QUEUE,true,false,false);
}
@Bean
Binding dlxBinding(){
return BindingBuilder.bind(dlxQueue())
.to(dlxDirectExchange())
.with(DLX_ROUTING_KEY);
}
@Bean
DirectExchange msgDirectExchange(){
return new DirectExchange(MSG_EXCHANGE,true,false);
}
@Bean
Queue msgQueue(){
Map<String, Object> args = new HashMap<>();
// 设置超时时间
args.put("x-message-ttl",0);
// 死信交换机
args.put("x-dead-letter-exchange",DLX_EXCHANGE);
// 死信 routingkey
args.put("x-dead-letter-routing-key",DLX_ROUTING_KEY);
return new Queue(MSG_QUEUE,true,false,false,args);
}
@Bean
Binding msgBinding(){
return BindingBuilder.bind(msgQueue())
.to(msgDirectExchange())
.with(MSG_ROUTING_KEY);
}
}
配置死信交换机 dlxDirectExchange 和 死信队列 dlxQueue,然后再配置普通的交换机和队列,在普通队列中配置消息过期到达的交换机也就是死信交换机。
配置死信交换机就需要两个参数:
- x-dead-letter-exchange:死信交换机名称。
- x-dead-letter-routing-key:配置死信 routing_key。
将来发送到这个消息队列上的消息,如果发生了 nack、reject 或者过期等问题,就会被发送到 DLX 上,进而进入到与 DLX 绑定的消息队列上。
@RestController
public class TestController {
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/send")
public void test(){
Message msg = MessageBuilder.withBody(("hello rabbitmq"+new Date()).getBytes()).build();
rabbitTemplate.send(RabbitConfig.MSG_EXCHANGE,RabbitConfig.MSG_ROUTING_KEY,msg);
}
}
然后监听死信队列,死信消息队列的消费和普通消息队列的消费并无二致:
@Component
public class Consumer {
private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
@RabbitListener(queues = RabbitConfig.DLX_QUEUE)
public void consumer(String message){
logger.info("message ====> {}",message);
}
}
流程: 消息发送到队列MSG_QUEUE中,由于过期时间是0,并且没有消费者,所以消息会直接进入死信交换机,然后到死信队列,最后由监听死信队列的消费者消费消息。