我们在发送消息的时候,指定的交换机不存在,或者指定的路由key不存在,这时候我们需要监听这种不可达的消息,使用 return机制 Return Listener
进行监听
// 生产者
public class Producer {
private static final String EXCHANGE_NAME="exchange_return_1";
// 可以路由的key
private static final String ROUTING_KEY = "return.save";
// 不能路由的key
private static final String ROUTING_KEY_ERROR = "km.save";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 添加return监听机制
channel.addReturnListener(new ReturnListener() {
/**
* 监听的回调方法
* @param replyCode 队列响应给浏览器的状态码
* @param replyText 状态码对应的文本信息
* @param exchange 交换机的名称
* @param routingKey 路由的key
* @param properties 消息的相关属性
* @param body 消息体的内容
*/
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("监听到不可达消息..." + new String(body));
System.out.println("状态码:" + replyCode + " 文本信息为:" + replyText);
System.out.println("交换机的名字为:"+ exchange +" 路由的key为:" + routingKey);
System.out.println("相关属性:" + properties);
}
});
for (int i = 0; i < 10; i++) {
if (i % 2 == 0) {
// mandatory 属性设置为true表示:要监听不可达的消息
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, true,null, ("return监听机制" + i).getBytes());
} else {
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY_ERROR, true, null, ("return监听机制" + i).getBytes());
}
}
}
}
// 消费者
public class Consumer {
private static final String EXCHANGE_NAME="exchange_return_1";
private static final String ROUTING_KEY = "return.save";
private static final String QUEUE_NAME="queue_return_1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者收到消息:" + new String(body));
}
};
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
}
}
可以看出,路由可以正常映射的,消息成功发送至队列,return回调不会触发。
路由映射不到的,队列接收不到信息,return机制监听到了不可达信息,并触发了回调函数
Mandatory
属性设置为false,对于不可达的消息会被Broker直接删除,那么生产端就不会进行任何打印,也就是 ReturnListener 的监听回调会无效。所以,Mandatory 必须设置为 true。// 添加return监听机制
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("监听到不可达消息...");
}
});
该方法是监听到不可达信息之后的回调方法,在此方法中书写对于不可达消息的后续处理逻辑
因篇幅问题不能全部显示,请点此查看更多更全内容
Copyright © 2019- nryq.cn 版权所有 赣ICP备2024042798号-6
违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com
本站由北京市万商天勤律师事务所王兴未律师提供法律服务