博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
(转) RabbitMQ学习之延时队列
阅读量:6271 次
发布时间:2019-06-22

本文共 6520 字,大约阅读时间需要 21 分钟。

http://blog.csdn.net/zhu_tianwei/article/details/53563311

在实际的业务中我们会遇见生产者产生的消息,不立即消费,而是延时一段时间在消费。RabbitMQ本身没有直接支持延迟队列功能,但是我们可以根据其特性Per-Queue Message TTL和 Dead Letter Exchanges实现延时队列。也可以通过改特性设置消息的优先级。

1.Per-Queue Message TTL

RabbitMQ可以针对消息和队列设置TTL(过期时间)。队列中的消息过期时间(Time To Live, TTL)有两种方法可以设置。第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。第二种方法是对消息进行单独设置,每条消息TTL可以不同。如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就成为dead message,消费者将无法再收到该消息。
2.Dead Letter Exchanges
当消息在一个队列中变成死信后,它能被重新publish到另一个Exchange。消息变成Dead Letter一向有以下几种情况:
消息被拒绝(basic.reject or basic.nack)并且requeue=false
消息TTL过期
队列达到最大长度
实际上就是设置某个队列的属性,当这个队列中有Dead Letter时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange中去,进而被路由到另一个队列,publish可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ 3.0.0以前支持的immediate参数中的向publish确认的功能。

一、在队列上设置TTL

1.建立delay.exchange

这里Internal设置为NO,否则将无法接受dead letter,YES表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定。

2.建立延时队列(delay queue)

如上配置延时5min队列(x-message-ttl=300000)

x-max-length:最大积压的消息个数,可以根据自己的实际情况设置,超过限制消息不会丢失,会立即转向delay.exchange进行投递

x-dead-letter-exchange:设置为刚刚配置好的delay.exchange,消息过期后会通过delay.exchange进行投递

这里不需要配置"dead letter routing key"否则会覆盖掉消息发送时携带的routingkey,导致后面无法路由为刚才配置的delay.exchange

3.配置延时路由规则

需要延时的消息到exchange后先路由到指定的延时队列

1)创建delaysync.exchange通过Routing key将消息路由到延时队列

 

2.配置delay.exchange 将消息投递到正常的消费队列

 

配置完成。

下面使用代码一下:

生产者:

 

[java]   
 
 
  1. package cn.slimsmart.study.rabbitmq.delayqueue.queue;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import com.rabbitmq.client.Channel;  
  6. import com.rabbitmq.client.Connection;  
  7. import com.rabbitmq.client.ConnectionFactory;  
  8.   
  9. public class Producer {  
  10.   
  11.     private static String queue_name = "test.queue";  
  12.   
  13.     public static void main(String[] args) throws IOException {  
  14.         ConnectionFactory factory = new ConnectionFactory();  
  15.         factory.setHost("10.1.199.169");  
  16.         factory.setUsername("admin");  
  17.         factory.setPassword("123456");  
  18.         Connection connection = factory.newConnection();  
  19.         Channel channel = connection.createChannel();  
  20.         // 声明队列  
  21.         channel.queueDeclare(queue_name, true, false, false, null);  
  22.         String message = "hello world!" + System.currentTimeMillis();  
  23.         channel.basicPublish("delaysync.exchange", "deal.message", null, message.getBytes());  
  24.         System.out.println("sent message: " + message + ",date:" + System.currentTimeMillis());  
  25.         // 关闭频道和连接  
  26.         channel.close();  
  27.         connection.close();  
  28.     }  
  29. }  

消费者:

 

 

[java]   
 
 
  1. package cn.slimsmart.study.rabbitmq.delayqueue.queue;  
  2.   
  3. import com.rabbitmq.client.Channel;  
  4. import com.rabbitmq.client.Connection;  
  5. import com.rabbitmq.client.ConnectionFactory;  
  6. import com.rabbitmq.client.QueueingConsumer;  
  7.   
  8. public class Consumer {  
  9.   
  10.     private static String queue_name = "test.queue";  
  11.   
  12.     public static void main(String[] args) throws Exception {  
  13.         ConnectionFactory factory = new ConnectionFactory();  
  14.         factory.setHost("10.1.199.169");  
  15.         factory.setUsername("admin");  
  16.         factory.setPassword("123456");  
  17.         Connection connection = factory.newConnection();  
  18.         Channel channel = connection.createChannel();  
  19.         // 声明队列  
  20.         channel.queueDeclare(queue_name, true, false, false, null);  
  21.         QueueingConsumer consumer = new QueueingConsumer(channel);  
  22.         // 指定消费队列  
  23.         channel.basicConsume(queue_name, true, consumer);  
  24.         while (true) {  
  25.             // nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)  
  26.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
  27.             String message = new String(delivery.getBody());  
  28.             System.out.println("received message:" + message + ",date:" + System.currentTimeMillis());  
  29.         }  
  30.     }  
  31.   
  32. }  

二、在消息上设置TTL

 

实现代码:

生产者:

 

[java]   
 
 
  1. package cn.slimsmart.study.rabbitmq.delayqueue.message;  
  2.   
  3. import java.io.IOException;  
  4. import java.util.HashMap;  
  5.   
  6. import com.rabbitmq.client.AMQP;  
  7. import com.rabbitmq.client.Channel;  
  8. import com.rabbitmq.client.Connection;  
  9. import com.rabbitmq.client.ConnectionFactory;  
  10.   
  11. public class Producer {  
  12.   
  13.     private static String queue_name = "message_ttl_queue";  
  14.   
  15.     public static void main(String[] args) throws IOException {  
  16.         ConnectionFactory factory = new ConnectionFactory();  
  17.         factory.setHost("10.1.199.169");  
  18.         factory.setUsername("admin");  
  19.         factory.setPassword("123456");  
  20.         Connection connection = factory.newConnection();  
  21.         Channel channel = connection.createChannel();  
  22.         HashMap<String, Object> arguments = new HashMap<String, Object>();  
  23.         arguments.put("x-dead-letter-exchange", "amq.direct");  
  24.         arguments.put("x-dead-letter-routing-key", "message_ttl_routingKey");  
  25.         channel.queueDeclare("delay_queue", true, false, false, arguments);  
  26.   
  27.         // 声明队列  
  28.         channel.queueDeclare(queue_name, true, false, false, null);  
  29.         // 绑定路由  
  30.         channel.queueBind(queue_name, "amq.direct", "message_ttl_routingKey");  
  31.   
  32.         String message = "hello world!" + System.currentTimeMillis();  
  33.         // 设置延时属性  
  34.         AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();  
  35.         // 持久性 non-persistent (1) or persistent (2)  
  36.         AMQP.BasicProperties properties = builder.expiration("300000").deliveryMode(2).build();  
  37.         // routingKey =delay_queue 进行转发  
  38.         channel.basicPublish("", "delay_queue", properties, message.getBytes());  
  39.         System.out.println("sent message: " + message + ",date:" + System.currentTimeMillis());  
  40.         // 关闭频道和连接  
  41.         channel.close();  
  42.         connection.close();  
  43.     }  
  44. }  

消费者:

 

 

[java]   
 
 
  1. package cn.slimsmart.study.rabbitmq.delayqueue.message;  
  2.   
  3. import java.util.HashMap;  
  4.   
  5. import com.rabbitmq.client.Channel;  
  6. import com.rabbitmq.client.Connection;  
  7. import com.rabbitmq.client.ConnectionFactory;  
  8. import com.rabbitmq.client.QueueingConsumer;  
  9.   
  10. public class Consumer {  
  11.   
  12.     private static String queue_name = "message_ttl_queue";  
  13.   
  14.     public static void main(String[] args) throws Exception {  
  15.         ConnectionFactory factory = new ConnectionFactory();  
  16.         factory.setHost("10.1.199.169");  
  17.         factory.setUsername("admin");  
  18.         factory.setPassword("123456");  
  19.         Connection connection = factory.newConnection();  
  20.         Channel channel = connection.createChannel();  
  21.         HashMap<String, Object> arguments = new HashMap<String, Object>();  
  22.         arguments.put("x-dead-letter-exchange", "amq.direct");  
  23.         arguments.put("x-dead-letter-routing-key", "message_ttl_routingKey");  
  24.         channel.queueDeclare("delay_queue", true, false, false, arguments);  
  25.   
  26.         // 声明队列  
  27.         channel.queueDeclare(queue_name, true, false, false, null);  
  28.         // 绑定路由  
  29.         channel.queueBind(queue_name, "amq.direct", "message_ttl_routingKey");  
  30.   
  31.         QueueingConsumer consumer = new QueueingConsumer(channel);  
  32.         // 指定消费队列  
  33.         channel.basicConsume(queue_name, true, consumer);  
  34.         while (true) {  
  35.             // nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)  
  36.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
  37.             String message = new String(delivery.getBody());  
  38.             System.out.println("received message:" + message + ",date:" + System.currentTimeMillis());  
  39.         }  
  40.     }  
  41.   
  42. }  

查看资料:

http://www.rabbitmq.com/ttl.html 

http://www.rabbitmq.com/dlx.html 
http://www.rabbitmq.com/maxlength.html
https://www.cloudamqp.com/docs/delayed-messages.html 

你可能感兴趣的文章
RSuite 一个基于 React.js 的 Web 组件库
查看>>
技术博客网址收藏
查看>>
python 金融分析学习
查看>>
授人以渔不如授人以鱼
查看>>
matlab练习程序(图像Haar小波变换)
查看>>
【Java】从域名得到ip
查看>>
Mysql索引会失效的几种情况分析
查看>>
LVM逻辑卷
查看>>
zoj3591 Nim(Nim博弈)
查看>>
canvas绘图
查看>>
poj - 3039 Margaritas on the River Walk
查看>>
bootstrap(5)关于导航
查看>>
Aptana插件在eclipse中安装
查看>>
jQuery-数据管理-删除事件
查看>>
下载器简单实例
查看>>
java实现分页工具类(JDBC)
查看>>
欧几里德算法与扩展欧几里德算法
查看>>
Tinkoff Internship Warmup Round 2018 and Codeforces Round #475 (Div. 2)
查看>>
通过kafka提供的命令来查看offset消费情况
查看>>
oracle数据库从入门到精通之四
查看>>