您好,欢迎来到榕意旅游网。
搜索
您的当前位置:首页RabbitMQ高级应用(二)return消息机制—ReturnListener.handleReturn()

RabbitMQ高级应用(二)return消息机制—ReturnListener.handleReturn()

来源:榕意旅游网

应用场景

我们在发送消息的时候,指定的交换机不存在,或者指定的路由key不存在,这时候我们需要监听这种不可达的消息,使用 return机制 Return Listener 进行监听

return机制实现

开启return机制的设置

编写生产者

// 生产者
public class Producer {
    private static final String EXCHANGE_NAME="exchange_return_1";
    // 可以路由的key
    private static final String ROUTING_KEY = "return.save";
    // 不能路由的key
    private static final String ROUTING_KEY_ERROR = "km.save";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        // 添加return监听机制
        channel.addReturnListener(new ReturnListener() {
            /**
             *  监听的回调方法
             * @param replyCode 队列响应给浏览器的状态码
             * @param replyText 状态码对应的文本信息
             * @param exchange  交换机的名称
             * @param routingKey 路由的key
             * @param properties 消息的相关属性
             * @param body  消息体的内容
             */
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("监听到不可达消息..." + new String(body));
                System.out.println("状态码:" + replyCode + "  文本信息为:" + replyText);
                System.out.println("交换机的名字为:"+ exchange +"  路由的key为:" + routingKey);
                System.out.println("相关属性:" + properties);
            }
        });
        for (int i = 0; i < 10; i++) {
            if (i % 2 == 0) {
                // mandatory 属性设置为true表示:要监听不可达的消息
                channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, true,null, ("return监听机制" + i).getBytes());
            } else {
                channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY_ERROR, true, null, ("return监听机制" + i).getBytes());
            }
        }
    }
}

编写消费者

// 消费者
public class Consumer {
    private static final String EXCHANGE_NAME="exchange_return_1";
    private static final String ROUTING_KEY = "return.save";
    private static final String QUEUE_NAME="queue_return_1";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者收到消息:" + new String(body));
            }
        };
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}

测试结果

可以看出,路由可以正常映射的,消息成功发送至队列,return回调不会触发。

路由映射不到的,队列接收不到信息,return机制监听到了不可达信息,并触发了回调函数

小结

  • 如果将 Mandatory 属性设置为false,对于不可达的消息会被Broker直接删除,那么生产端就不会进行任何打印,也就是 ReturnListener 的监听回调会无效。所以,Mandatory 必须设置为 true。
  • return机制可应用于,确认消息已经成功发送到队列中,如果未发送成功至队列,可进行消息的重新发送等操作。

ReturnListener.handleReturn()

// 添加return监听机制
channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("监听到不可达消息...");
            }
        });

该方法是监听到不可达信息之后的回调方法,在此方法中书写对于不可达消息的后续处理逻辑

  • param1:replyCode 队列响应给浏览器的状态码
  • param2:replyText 状态码对应的文本信息
  • param3:exchange 交换机的名称
  • param4:routingKey 路由的key
  • param5:properties 消息的相关属性
  • param6:body 消息体的内容

因篇幅问题不能全部显示,请点此查看更多更全内容

Copyright © 2019- nryq.cn 版权所有 赣ICP备2024042798号-6

违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com

本站由北京市万商天勤律师事务所王兴未律师提供法律服务