1.主题模式简介

在这里插入图片描述

2 新建一个topic类型的交换机

在这里插入图片描述

3 案例演示

3.1 发布者–气象局

package com.ruoyi.project.rabbitmq.topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.ruoyi.project.rabbitmq.utils.RabbitConstant;
import com.ruoyi.project.rabbitmq.utils.RabbitUtils;

import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

/**
 * 消息发布者--气象局
 */
public class WeatherBureau {

    public static void main(String[] args) throws IOException, TimeoutException {
        Map area=new LinkedHashMap<String,String>();
        area.put("china.hebei.shijiazhuang.20991011", "中国河北石家庄20991011天气数据");
        area.put("china.shandong.qingdao.20991011", "中国山东青岛20991011天气数据");
        area.put("china.henan.zhengzhou.20991011", "中国河南郑州20991011天气数据");
        area.put("us.cal.la.20991011", "美国加州洛杉矶20991011天气数据");

        area.put("china.hebei.shijiazhuang.20991012", "中国河北石家庄20991012天气数据");
        area.put("china.shandong.qingdao.20991012", "中国山东青岛20991012天气数据");
        area.put("china.henan.zhengzhou.20991012", "中国河南郑州20991012天气数据");
        area.put("us.cal.la.20991012", "美国加州洛杉矶20991012天气数据");
        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
//        String input = new Scanner(System.in).next();
        Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();
        while (itr.hasNext()) {
            Map.Entry<String, String> me = itr.next();
            //指定对应的交换机
            /**
             * 四个参数
             * exchangeName 交换机名称发布订阅时才会用到
             * 队列名称 ,当于数据筛选的条件
             * 额外的设置属性
             * 要传递的消息字节数组
             */
            channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC,me.getKey() , null , me.getValue().getBytes());
        }
        System.out.println("发送数据成功");
        channel.close();
        connection.close();
    }
}

3.2 消费者–百度

需求:只获取山东省的天气

package com.ruoyi.project.rabbitmq.topic;

import com.rabbitmq.client.*;
import com.ruoyi.project.rabbitmq.utils.RabbitConstant;
import com.ruoyi.project.rabbitmq.utils.RabbitUtils;

import java.io.IOException;

/**
 * 主题模式
 * 消费者
 * 需求:获取美国全境的天气
 */
public class Baidu {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUtils.getConnection();
        //创建通信“通道”,相当于TCP中的虚拟连接
        Channel channel = connection.createChannel();
        //创建一个队列
        channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);
        /**
         * 只订阅20991011这一天的数据
         * *.*.qingdao.* 只匹配青岛的天气
         * *.shandong.*.* 只匹配山东的天气
         * 用于队列和交换机的绑定
         * 参数1:队列名
         * 参数2: 交换机名
         * 参数3:路由key
         */
        channel.queueBind(RabbitConstant.QUEUE_BAIDU,RabbitConstant.EXCHANGE_WEATHER_TOPIC,"*.shandong.*.*");
        /**
         * 解除消费队列和虚拟机的绑定
         */
        //channel.queueUnbind(RabbitConstant.QUEUE_BAIDU,RabbitConstant.EXCHANGE_WEATHER_TOPIC,"*.*.*.20991011");
        channel.basicQos(1);
        //放入队列
        channel.basicConsume(RabbitConstant.QUEUE_BAIDU, false, new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //真正开发时,调用短信包接口就可以发送了
                System.out.println("百度收到气象信息:"+new String(body));
                /**
                 * 确认签收
                 * 参数1:获取这个消息的TagID
                 * 参数2:false只确认签收当前的消息,设置为true:代表签收该消费这所有未签收的消息
                 */
                channel.basicAck(envelope.getDeliveryTag(), false);

            }
        });

    }
}

3.3 消费者–新浪

需求:只获取美国全境的天气

package com.ruoyi.project.rabbitmq.topic;

import com.rabbitmq.client.*;
import com.ruoyi.project.rabbitmq.utils.RabbitConstant;
import com.ruoyi.project.rabbitmq.utils.RabbitUtils;

import java.io.IOException;

/**
 * 消费者
 * 需求:获取美国全境的天气
 */
public class Sina {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUtils.getConnection();
        //创建通信“通道”,相当于TCP中的虚拟连接
        Channel channel = connection.createChannel();
        //创建一个队列
        channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);
        /**
         * 用于队列和交换机的绑定
         * 参数1:队列名
         * 参数2: 交换机名
         * 参数3:路由key
         */
        channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "us.#");

        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_SINA, false, new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String jsonSMS = new String(body);
                //真正开发时,调用短信包接口就可以发送了
                System.out.println("新浪收到气象信息:" + jsonSMS);
                /**
                 * 确认签收
                 * 参数1:获取这个消息的TagID
                 * 参数2:false只确认签收当前的消息,设置为true:代表签收该消费这所有未签收的消息
                 */
                channel.basicAck(envelope.getDeliveryTag(), false);

            }
        });

    }
}


3.4 运行结果

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

3.5 解除消费队列和虚拟机的绑定

代码实现

channel.queueUnbind(RabbitConstant.QUEUE_BAIDU,RabbitConstant.EXCHANGE_WEATHER_TOPIC,"*.*.*.20991011");

或者直接在rabbitMQ后台找到该模糊匹配绑定的虚拟机,点击Unbind按钮,即可解除绑定

点击阅读全文
Logo

快速构建 Web 应用程序

更多推荐