搜索
您的当前位置:首页正文

rabbitmq的延迟消息队列实现

来源:榕意旅游网
rabbitmq的延迟消息队列实现

第⼀部分:延迟消息的实现原理和知识点

使⽤RabbitMQ来实现延迟任务必须先了解RabbitMQ的两个概念:消息的TTL和死信Exchange,通过这两者的组合来实现上述需求。

消息的TTL(Time To Live)

消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每⼀个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。如果队列设置了,消息也设置了,那么会取⼩的。所以⼀个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不⼀样(不同的队列设置)。这⾥单讲单个消息的TTL,因为它才是实现延迟任务的关键。

可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间,两者是⼀样的效果。只是expiration字段是字符串参数,所以要写个int类型的字符串:

当上⾯的消息扔到队列中后,过了3分钟,如果没有被消费,它就死了。不会被消费者消费到。这个消息后⾯的,没有“死掉”的消息对顶上来,被消费者消费。死信在队列中并不会被删除和释放,它会被统计到队列的消息数中去。单靠死信还不能实现延迟任务,还要靠Dead Letter Exchange。

Dead Letter Exchanges

Exchage的概念在这⾥就不在赘述。⼀个消息在满⾜如下条件下,会进死信路由,记住这⾥是路由⽽不是队列,⼀个路由可以对应很多队列。1. ⼀个消息被Consumer拒收了,并且reject⽅法的参数⾥requeue是false。也就是说不会被再次放在队列⾥,被其他消费者使⽤。2. 上⾯的消息的TTL到了,消息过期了。

3. 队列的长度限制满了。排在前⾯的消息会被丢弃或者扔到死信路由上。

Dead Letter Exchange其实就是⼀种普通的exchange,和创建其他exchange没有两样。只是在某⼀个设置Dead Letter Exchange的队列中有消息过期了,会⾃动触发消息的转发,发送到Dead Letter Exchange中去。

实现延迟队列

延迟任务通过消息的TTL和Dead Letter Exchange来实现。我们需要建⽴2个队列,⼀个⽤于发送消息,⼀个⽤于消息过期后的转发⽬标队列。

⽣产者输出消息到Queue1,并且这个消息是设置有有效时间的,⽐如3分钟。消息会在Queue1中等待3分钟,如果没有消费者收掉的话,它就是被转发到Queue2,Queue2有消费者,收到,处理延迟任务。完成延迟任务的实现。

第⼆部分:具体实现例⼦

1、新建⽴消息队列配置⽂件rabbitmq.properties

1 #rabbitmq消息队列的属性配置⽂件properties 2 rabbitmq.study.host=192.168.56.101 3 rabbitmq.study.username=duanml

4 rabbitmq.study.password=1qaz@WSX 5 rabbitmq.study.port=5672

6 rabbitmq.study.vhost=studymq 7

8 #Mail 消息队列的相关变量值 9 mail.exchange=mailExchange

10 mail.exchange.key=mail_queue_key11 12

13 #Phone 消息队列的相关变量值14 phone.topic.key=phone.one

15 phone.topic.key.more=phone.one.more16

17 #delay 延迟消息队列的相关变量值

18 delay.directQueue.key=TradePayNotify_delay_2m19 delay.directMessage.key=TradePayNotify_delay_3m

2、新建⽴配置⽂件,申明延迟队列相关的配置信息如:spring-rabbigmq-dlx.xml

1

2 5 xsi:schemaLocation=\"http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-ra 6

7 14

15 16

17 18

19

20 21

22 23

24 25

26

27 28

29 30

31 32

33 34 35 36 37

38 39

40

41 43 confirm-callback=\"notifyConfirmCallBackListener\"44 return-callback=\"notifyFailedCallBackListener\"45 message-converter=\"jsonMessageConverter\"/>46

47

48 49 50 51

52

58 60

61 62

63 64 65 66

67

73 75

76

77 78 79 80

81

82 83

84 85 86

87 88

89

3、新建⽴延迟队列测试Controller

1 package org.seckill.web; 2

3 import org.seckill.dto.SeckillResult; 4 import org.seckill.entity.Seckill;

5 import org.seckill.utils.rabbitmq.Impl.MQProducerImpl; 6 import org.seckill.utils.rabbitmq.MQProducer; 7 import org.slf4j.Logger;

8 import org.slf4j.LoggerFactory;

9 import org.springframework.amqp.core.Message;

10 import org.springframework.beans.factory.annotation.Autowired; 11 import org.springframework.beans.factory.annotation.Value; 12 import org.springframework.stereotype.Controller;

13 import org.springframework.web.bind.annotation.RequestMapping; 14 import org.springframework.web.bind.annotation.ResponseBody; 15

16 import java.util.Date; 17 18 /**

19 *

Title: org.seckill.web

20 *

Company:东软集团(neusoft)

21 *

Copyright:Copyright(c)2018

22 * User: 段美林

23 * Date: 2018/5/30 17:33 24 * Description: 消息队列测试 25 */

26 @Controller

27 @RequestMapping(\"/rabbitmq\") 28 public class RabbitmqController { 29

30 private final Logger logger = LoggerFactory.getLogger(this.getClass()); 31 40

41 @Value(\"${delay.directQueue.key}\") 42 private String delay_directQueue_key; 43

44 @Value(\"${delay.directMessage.key}\") 45 private String delay_directMessage_key; 46 52

53 @Autowired

54 private MQProducerImpl delayMQProducerImpl;111 112 /**

113 * @Description: 消息队列114 * @Author:

115 * @CreateTime:116 */

117 @ResponseBody

118 @RequestMapping(\"/sendDelayQueue\")

119 public SeckillResult testDelayQueue() {120 SeckillResult result = null;121 Date now = new Date();122 try {

123 Seckill seckill = new Seckill();

124        //第⼀种情况,给队列设置消息ttl,详情见配置⽂件125 for (int i = 0; i < 2; i++) {

126 seckill.setSeckillId(1922339387 + i);127 seckill.setName(\"delay_queue_ttl_\" + i);

128 String msgId = delayMQProducerImpl.getMsgId();

129 Message message = delayMQProducerImpl.messageBuil(seckill,msgId);

130 delayMQProducerImpl.sendDataToRabbitMQ(delay_directQueue_key, message);131 }

132         //第⼆种情况,给消息设置ttl133 for (int i = 0; i < 2; i++) {

134 seckill.setSeckillId(1922339287 + i);

135 seckill.setName(\"delay_message_ttl_\" + i);

136 String msgId = delayMQProducerImpl.getMsgId();

137 Message message = delayMQProducerImpl.messageBuil(seckill,msgId);138 if (message != null) {

139 //给消息设置过期时间ttl,为3分钟

140 message.getMessageProperties().setExpiration(\"180000\");

141 delayMQProducerImpl.sendDataToRabbitMQ(delay_directMessage_key, message);142 }143 }

144 result = new SeckillResult(true, now.getTime());145 } catch (Exception e) {

146 logger.error(e.getMessage(), e);147 }

148 return result;149 }

150 151 }

4、编写延迟消息确认类和监听类:

NotifyConfirmCallBackListener.java

1 package org.seckill.rabbitmqListener.notify; 2

3 import org.slf4j.Logger;

4 import org.slf4j.LoggerFactory;

5 import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback; 6 import org.springframework.amqp.rabbit.support.CorrelationData; 7 8 /**

9 *

Title: org.seckill.rabbitmqListener.notify

10 *

Company:东软集团(neusoft)

11 *

Copyright:Copyright(c)2018

12 * User: 段美林

13 * Date: 2018/6/3 0:27

14 * Description: 延迟任务测试--->消息确认回调类15 */

16 public class NotifyConfirmCallBackListener implements ConfirmCallback {17

18 private final Logger logger = LoggerFactory.getLogger(this.getClass());19

20 /**

21 * Confirmation callback.22 *

23 * @param correlationData correlation data for the callback.24 * @param ack true for ack, false for nack

25 * @param cause An optional cause, for nack, when available, otherwise null.26 */

27 public void confirm(CorrelationData correlationData, boolean ack, String cause) {

28 logger.info(\"延迟测试---确认消息完成-------->confirm--:correlationData:\" + correlationData.getId() + \29 }30 }

NotifyConsumerListener.java

1 package org.seckill.rabbitmqListener.notify; 2

3 import com.alibaba.fastjson.JSONObject; 4 import com.rabbitmq.client.Channel; 5 import org.slf4j.Logger;

6 import org.slf4j.LoggerFactory;

7 import org.springframework.amqp.core.Message;

8 import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; 9 10 /**

11 *

Title: org.seckill.rabbitmqListener.notify

12 *

Company:东软集团(neusoft)

13 *

Copyright:Copyright(c)2018

14 * User: 段美林

15 * Date: 2018/6/3 0:27

16 * Description: 订单通知队列监听服务17 * 实现延迟任务的功能18 */

19 public class NotifyConsumerListener implements ChannelAwareMessageListener {20 21

22 private final Logger logger = LoggerFactory.getLogger(this.getClass());23

24 /**

25 * Callback for processing a received Rabbit message.

26 *

Implementors are supposed to process the given Message,27 * typically sending reply messages through the given Session.28 *

29 * @param message the received AMQP message (never null)30 * @param channel the underlying Rabbit Channel (never null)31 * @throws Exception Any.32 */

33 public void onMessage(Message message, Channel channel) throws Exception {34 try {

35 //将字节流对象转换成Java对象

36 // Seckill seckill=(Seckill) new ObjectInputStream(new ByteArrayInputStream(message.getBody())).readObject();37

38 String returnStr = new String(message.getBody(),\"UTF-8\");39 JSONObject jsStr = JSONObject.parseObject(returnStr);40

41 logger.info(\"延迟测试--消费开始:名称为--===>\" + jsStr.getString(\"name\") + \"----->返回消息:\" + returnStr + \"||||消息的Properties:--》\" + message.getMessageProperties());42

43 //TODO 进⾏相关业务操作44

45 //成功处理业务,那么返回消息确认机制,这个消息成功处理OK

46 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);47

48 } catch (Exception e) {

49 if (message.getMessageProperties().getRedelivered()) {

50 //消息已经进⾏过⼀次轮询操作,还是失败,将拒绝再次接收本消息51 logger.info(\"消息已重复处理失败,拒绝再次接收...\");

52 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 拒绝消息53

54 //TODO 进⾏相关业务操作55

56 } else {

57 //消息第⼀次接收处理失败后,将再此回到队列中进⾏ 再⼀次轮询操作58 logger.info(\"消息即将再次返回队列处理...\");

59 //处理失败,那么返回消息确认机制,这个消息没有成功处理,返回到队列中

60 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);61 }62 }63 }64 }

NotifyFailedCallBackListener.java

1 package org.seckill.rabbitmqListener.notify; 2

3 import org.slf4j.Logger;

4 import org.slf4j.LoggerFactory;

5 import org.springframework.amqp.core.Message;

6 import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback; 7 8 /**

9 *

Title: org.seckill.rabbitmqListener.notify

10 *

Company:东软集团(neusoft)

11 *

Copyright:Copyright(c)2018

12 * User: 段美林

13 * Date: 2018/6/3 0:28

14 * Description: 延迟任务测试----> 消息发送失败回调类15 */

16 public class NotifyFailedCallBackListener implements ReturnCallback {17

18 private final Logger logger = LoggerFactory.getLogger(this.getClass());19

20 /**

21 * Returned message callback.22 *

23 * @param message the returned message.24 * @param replyCode the reply code.25 * @param replyText the reply text.26 * @param exchange the exchange.27 * @param routingKey the routing key.28 */

29 public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {30 logger.info(\"延迟测试------------->return--message:\" +31 new String(message.getBody()) +

32 \33 \34 }35 }

5、编写消息队列的操作类和接⼝:

MQProducer.java

1 package org.seckill.utils.rabbitmq; 2

3 import org.springframework.amqp.core.Message;

4 import org.springframework.amqp.core.MessagePostProcessor; 5 import org.springframework.amqp.rabbit.support.CorrelationData; 6 7 /**

8 *

Title: org.seckill.utils.rabbitmq

9 *

Company:东软集团(neusoft)

10 *

Copyright:Copyright(c)2018

11 * User: 段美林

12 * Date: 2018/5/30 11:49

13 * Description: No Description 14 */

15 public interface MQProducer { 16

17 /**

18 * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key. 19 * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。 20 * @param message 21 */

22 void sendDataToRabbitMQ(java.lang.Object message); 23

24 /**

25 * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key. 26 * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。 27 * @param message

28 * @param messagePostProcessor 29 */

30 void sendDataToRabbitMQ(java.lang.Object message, MessagePostProcessor messagePostProcessor); 31

32 /**

33 * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key. 34 * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。 35 * @param message

36 * @param messagePostProcessor 37 * @param correlationData 38 */

39 void sendDataToRabbitMQ(java.lang.Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData); 40

41 /**

42 * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key. 43 * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。 44 * @param routingKey 45 * @param message 46 */

47 void sendDataToRabbitMQ(java.lang.String routingKey, java.lang.Object message); 48

49 /**

50 * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key. 51 * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。 52 * @param routingKey 53 * @param message

54 * @param correlationData 55 */

56 void sendDataToRabbitMQ(java.lang.String routingKey, java.lang.Object message, CorrelationData correlationData); 57

58 /**

59 * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key. 60 * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。 61 * @param routingKey 62 * @param message

63 * @param messagePostProcessor 64 */

65 void sendDataToRabbitMQ(java.lang.String routingKey, java.lang.Object message, MessagePostProcessor messagePostProcessor); 66

67 /**

68 * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key. 69 * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。 70 * @param routingKey 71 * @param message

72 * @param messagePostProcessor 73 * @param correlationData 74 */

75 void sendDataToRabbitMQ(java.lang.String routingKey, java.lang.Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData); 76

77 /**

78 * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key. 79 * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。 80 * @param exchange 81 * @param routingKey 82 * @param message 83 */

84 void sendDataToRabbitMQ(java.lang.String exchange, java.lang.String routingKey, java.lang.Object message); 85

86 /**

87 * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key. 88 * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。 89 * @param exchange 90 * @param routingKey 91 * @param message

92 * @param correlationData 93 */

94 void sendDataToRabbitMQ(java.lang.String exchange, java.lang.String routingKey, java.lang.Object message, CorrelationData correlationData); 95

96 /**

97 * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key. 98 * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。 99 * @param exchange100 * @param routingKey

101 * @param message

102 * @param messagePostProcessor103 */

104 void sendDataToRabbitMQ(java.lang.String exchange, java.lang.String routingKey, java.lang.Object message, MessagePostProcessor messagePostProcessor);105

106 /**

107 * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.108 * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。109 * @param exchange110 * @param routingKey111 * @param message

112 * @param messagePostProcessor113 * @param correlationData114 */

115 void sendDataToRabbitMQ(java.lang.String exchange, java.lang.String routingKey, java.lang.Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData);116

117 Message messageBuil(Object handleObject, String msgId);118

119 String getMsgId();120 }

MQProducerImpl.java

1 package org.seckill.utils.rabbitmq.Impl; 2

3 import com.alibaba.fastjson.JSONObject; 4 import org.seckill.utils.rabbitmq.MQProducer; 5 import org.slf4j.Logger;

6 import org.slf4j.LoggerFactory;

7 import org.springframework.amqp.AmqpException; 8 import org.springframework.amqp.core.Message;

9 import org.springframework.amqp.core.MessageBuilder;

10 import org.springframework.amqp.core.MessagePostProcessor; 11 import org.springframework.amqp.core.MessageProperties; 12 import org.springframework.amqp.rabbit.core.RabbitTemplate; 13 import org.springframework.amqp.rabbit.support.CorrelationData; 14 import org.springframework.stereotype.Component; 15

16 import java.io.UnsupportedEncodingException; 17 import java.util.UUID; 18 19 /**

20 *

Title: org.seckill.utils.rabbitmq.Impl

21 *

Company:东软集团(neusoft)

22 *

Copyright:Copyright(c)2018

23 * User: 段美林

24 * Date: 2018/6/2 22:54

25 * Description: 消息⽣产者操作主体类 26 */

27 @Component

28 public class MQProducerImpl implements MQProducer{ 29

30 private static final Logger logger = LoggerFactory.getLogger(MQProducerImpl.class); 31

32 private RabbitTemplate rabbitTemplate; 33

34 /**

35 * Sets the rabbitTemplate. 36 *

37 *

You can use getRabbitTemplate() to get the value of rabbitTemplate

38 *

39 * @param rabbitTemplate rabbitTemplate 40 */

41 public void setRabbitTemplate(RabbitTemplate rabbitTemplate) { 42 this.rabbitTemplate = rabbitTemplate; 43 } 44

45 /**

46 * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key. 47 * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。 48 *

49 * @param message 50 */

51 public void sendDataToRabbitMQ(Object message) { 52 try {

53 if (message instanceof Message){

54 Message messageSend = (Message) message;

55 String msgId = messageSend.getMessageProperties().getCorrelationId(); 56 CorrelationData correlationData = new CorrelationData(msgId);

57 rabbitTemplate.convertAndSend(rabbitTemplate.getRoutingKey(),message,correlationData); 58 }else {

59 rabbitTemplate.convertAndSend(message); 60 }

61 } catch (AmqpException e) {

62 logger.error(e.getMessage(), e); 63 } 64 } 65

66 /**

67 * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key. 68 * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。 69 *

70 * @param message

71 * @param messagePostProcessor 72 */

73 public void sendDataToRabbitMQ(Object message, MessagePostProcessor messagePostProcessor) { 74 try {

75 if (message instanceof Message){

76 Message messageSend = (Message) message;

77 String msgId = messageSend.getMessageProperties().getCorrelationId(); 78 CorrelationData correlationData = new CorrelationData(msgId);

79 rabbitTemplate.convertAndSend(rabbitTemplate.getRoutingKey(),message,messagePostProcessor,correlationData); 80 }else {

81 rabbitTemplate.convertAndSend(message, messagePostProcessor); 82 }

83 } catch (AmqpException e) {

84 logger.error(e.getMessage(), e); 85 } 86 } 87

88 /**

89 * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key. 90 * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。 91 *

92 * @param message

93 * @param messagePostProcessor 94 * @param correlationData 95 */

96 public void sendDataToRabbitMQ(Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) { 97 try {

98 rabbitTemplate.convertAndSend(message, messagePostProcessor, correlationData); 99 } catch (AmqpException e) {

100 logger.error(e.getMessage(), e);

101 }102 }103

104 /**

105 * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.106 * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。107 *

108 * @param routingKey109 * @param message110 */

111 public void sendDataToRabbitMQ(String routingKey, Object message) {112 try {

113 if (message instanceof Message){

114 Message messageSend = (Message) message;

115 String msgId = messageSend.getMessageProperties().getCorrelationId();116 CorrelationData correlationData = new CorrelationData(msgId);

117 rabbitTemplate.convertAndSend(routingKey,message,correlationData);118 }else {

119 rabbitTemplate.convertAndSend(routingKey, message);120 }

121 } catch (AmqpException e) {

122 logger.error(e.getMessage(), e);123 }124 }125

126 /**

127 * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.128 * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。129 *

130 * @param routingKey131 * @param message

132 * @param correlationData133 */

134 public void sendDataToRabbitMQ(String routingKey, Object message, CorrelationData correlationData) {135 try {

136 rabbitTemplate.convertAndSend(routingKey, message, correlationData);137 } catch (AmqpException e) {

138 logger.error(e.getMessage(), e);139 }140 }141

142 /**

143 * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.144 * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。145 *

146 * @param routingKey147 * @param message

148 * @param messagePostProcessor149 */

150 public void sendDataToRabbitMQ(String routingKey, Object message, MessagePostProcessor messagePostProcessor) {151 try {

152 if (message instanceof Message){

153 Message messageSend = (Message) message;

154 String msgId = messageSend.getMessageProperties().getCorrelationId();155 CorrelationData correlationData = new CorrelationData(msgId);

156 rabbitTemplate.convertAndSend(routingKey,message,messagePostProcessor,correlationData);157 }else {

158 rabbitTemplate.convertAndSend(routingKey, message, messagePostProcessor);159 }

160 } catch (AmqpException e) {

161 logger.error(e.getMessage(), e);162 }163 }164

165 /**

166 * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.167 * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。168 *

169 * @param routingKey170 * @param message

171 * @param messagePostProcessor172 * @param correlationData173 */

174 public void sendDataToRabbitMQ(String routingKey, Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) {175 try {

176 rabbitTemplate.convertAndSend(routingKey, message, messagePostProcessor, correlationData);177 } catch (AmqpException e) {

178 logger.error(e.getMessage(), e);179 }180 }181

182 /**

183 * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.184 * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。185 *

186 * @param exchange187 * @param routingKey188 * @param message189 */

190 public void sendDataToRabbitMQ(String exchange, String routingKey, Object message) {191 try {

192 if (message instanceof Message){

193 Message messageSend = (Message) message;

194 String msgId = messageSend.getMessageProperties().getCorrelationId();195 CorrelationData correlationData = new CorrelationData(msgId);

196 rabbitTemplate.convertAndSend(routingKey,message,correlationData);197 }else {

198 rabbitTemplate.convertAndSend(exchange, routingKey, message);199 }

200 } catch (AmqpException e) {

201 logger.error(e.getMessage(), e);202 }203 }204

205 /**

206 * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.207 * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。208 *

209 * @param exchange210 * @param routingKey211 * @param message

212 * @param correlationData213 */

214 public void sendDataToRabbitMQ(String exchange, String routingKey, Object message, CorrelationData correlationData) {215 try {

216 rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);217 } catch (AmqpException e) {

218 logger.error(e.getMessage(), e);219 }220 }221

222 /**

223 * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.

224 * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。225 *

226 * @param exchange227 * @param routingKey228 * @param message

229 * @param messagePostProcessor230 */

231 public void sendDataToRabbitMQ(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) {232 try {

233 rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor);234 } catch (AmqpException e) {

235 logger.error(e.getMessage(), e);236 }237 }238

239 /**

240 * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.241 * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。242 *

243 * @param exchange244 * @param routingKey245 * @param message

246 * @param messagePostProcessor247 * @param correlationData248 */

249 public void sendDataToRabbitMQ(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) {250 try {

251 rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor, correlationData);252 } catch (AmqpException e) {

253 logger.error(e.getMessage(), e);254 }255 }256

257 /**

258 * 构建Message对象,进⾏消息发送259 * @param handleObject260 * @param msgId261 * @return262 */

263 public Message messageBuil(Object handleObject, String msgId) {264 try {

265 //先转成JSON

266 String objectJSON = JSONObject.toJSONString(handleObject);267 //再构建Message对象

268 Message messageBuil = MessageBuilder.withBody(objectJSON.getBytes(\"UTF-8\")).setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)269 .setCorrelationId(msgId).build();270 return messageBuil;

271 } catch (UnsupportedEncodingException e) {

272 logger.error(\"构建Message出错:\" + e.getMessage(),e);273 return null;274 }275 }276

277 /**

278 * ⽣成唯⼀的消息操作id279 * @return280 */

281 public String getMsgId() {

282 return UUID.randomUUID().toString();283 }284 285 }

⾄此就完成了延迟消息队列的所有代码实现,

因篇幅问题不能全部显示,请点此查看更多更全内容

Top