死信队列和延迟队列的使用
死信消息:
消息被拒绝(Basic.Reject或Basic.Nack)并且设置 requeue 参数的值为 false 消息过期了 队列达到最大的长度 过期消息:
在 rabbitmq 中存在2种方可设置消息的过期时间,第一种通过对队列进行设置,这种设置 后,该队列中所有的消息都存在相同的过期时间,第二种通过对消息本身进行设置,那么每条 消息的过期时间都不一样。如果同时使用这2种方法,那么以过期时间小的那个数值为准。当 消息达到过期时间还没有被消费,那么那个消息就成为了一个 死信 消息。
队列设置:在队列申明的时候使用 x-message-ttl 参数,单位为 毫秒
单个消息设置:是设置消息属性的 expiration 参数的值,单位为 毫秒
延时队列:在rabbitmq中不存在延时队列,但是我们可以通过设置消息的过期时间和死信队 列来模拟出延时队列。消费者监听死信交换器绑定的队列,而不要监听消息发送的队列。
有了以上的基础知识,我们完成以下需求:
需求:用户在系统中创建一个订单,如果超过时间用户没有进行支付,那么自动取消订单。
分析:
1、上面这个情况,我们就适合使用延时队列来实现,那么延时队列如何创建
2、延时队列可以由 过期消息+死信队列 来时间 3、过期消息通过队列中设置 x-message-ttl 参数实现 4、死信队列通过在队列申明时,给队列设置 x-dead-letter-exchange 参数,然后另外申明 一个队列绑定x-dead-letter-exchange对应的交换器。
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(AMQP.PROTOCOL.PORT);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明一个接收被删除的消息的交换机和队列 String EXCHANGE_DEAD_NAME = "exchange.dead"; String QUEUE_DEAD_NAME = "queue_dead"; channel.exchangeDeclare(EXCHANGE_DEAD_NAME, BuiltinExchangeType.DIRECT); channel.queueDeclare(QUEUE_DEAD_NAME, false, false, false, null); channel.queueBind(QUEUE_DEAD_NAME, EXCHANGE_DEAD_NAME, "routingkey.dead");
String EXCHANGE_NAME = "exchange.fanout";
String QUEUE_NAME = "queue_name";
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
Map<String, Object> arguments = new HashMap<String, Object>();
// 统一设置队列中的所有消息的过期时间 arguments.put("x-message-ttl", 30000); // 设置超过多少毫秒没有消费者来访问队列,就删除队列的时间 arguments.put("x-expires", 20000); // 设置队列的最新的N条消息,如果超过N条,前面的消息将从队列中移除掉 arguments.put("x-max-length", 4); // 设置队列的内容的最大空间,超过该阈值就删除之前的消息 arguments.put("x-max-length-bytes", 1024); // 将删除的消息推送到指定的交换机,一般x-dead-letter-exchange和x-dead-letterrouting-key需要同时设置 arguments.put("x-dead-letter-exchange", "exchange.dead"); // 将删除的消息推送到指定的交换机对应的路由键 arguments.put("x-dead-letter-routing-key", "routingkey.dead"); // 设置消息的优先级,优先级大的优先被消费 arguments.put("x-max-priority", 10); channel.queueDeclare(QUEUE_NAME, false, false, false, arguments); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); String message = "Hello RabbitMQ: ";
for (int i = 1; i <= 5; i++) {
// expiration: 设置单条消息的过期时间 AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder() .priority(i).expiration( i * 1000 + "");
channel.basicPublish(EXCHANGE_NAME, "", properties.build(), (message + i).getBytes("UTF-8"));
}
channel.close();
connection.close();