rabbitmq学习第四节--主题Topic模式,路由模式基础上增加模糊匹配功能
文章目录1.主题模式简介2 新建一个topic类型的交换机3 案例演示3.1 发布者--气象局3.2 消费者--百度3.3 消费者--新浪3.4 运行结果3.5 解除消费队列和虚拟机的绑定1.主题模式简介2 新建一个topic类型的交换机3 案例演示3.1 发布者–气象局package com.ruoyi.project.rabbitmq.topic;import com.rab...
·
文章目录
1.主题模式简介
2 新建一个topic类型的交换机
3 案例演示
3.1 发布者–气象局
package com.ruoyi.project.rabbitmq.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.ruoyi.project.rabbitmq.utils.RabbitConstant;
import com.ruoyi.project.rabbitmq.utils.RabbitUtils;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
/**
* 消息发布者--气象局
*/
public class WeatherBureau {
public static void main(String[] args) throws IOException, TimeoutException {
Map area=new LinkedHashMap<String,String>();
area.put("china.hebei.shijiazhuang.20991011", "中国河北石家庄20991011天气数据");
area.put("china.shandong.qingdao.20991011", "中国山东青岛20991011天气数据");
area.put("china.henan.zhengzhou.20991011", "中国河南郑州20991011天气数据");
area.put("us.cal.la.20991011", "美国加州洛杉矶20991011天气数据");
area.put("china.hebei.shijiazhuang.20991012", "中国河北石家庄20991012天气数据");
area.put("china.shandong.qingdao.20991012", "中国山东青岛20991012天气数据");
area.put("china.henan.zhengzhou.20991012", "中国河南郑州20991012天气数据");
area.put("us.cal.la.20991012", "美国加州洛杉矶20991012天气数据");
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
// String input = new Scanner(System.in).next();
Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();
while (itr.hasNext()) {
Map.Entry<String, String> me = itr.next();
//指定对应的交换机
/**
* 四个参数
* exchangeName 交换机名称发布订阅时才会用到
* 队列名称 ,当于数据筛选的条件
* 额外的设置属性
* 要传递的消息字节数组
*/
channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC,me.getKey() , null , me.getValue().getBytes());
}
System.out.println("发送数据成功");
channel.close();
connection.close();
}
}
3.2 消费者–百度
需求:只获取山东省的天气
package com.ruoyi.project.rabbitmq.topic;
import com.rabbitmq.client.*;
import com.ruoyi.project.rabbitmq.utils.RabbitConstant;
import com.ruoyi.project.rabbitmq.utils.RabbitUtils;
import java.io.IOException;
/**
* 主题模式
* 消费者
* 需求:获取美国全境的天气
*/
public class Baidu {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
//创建通信“通道”,相当于TCP中的虚拟连接
Channel channel = connection.createChannel();
//创建一个队列
channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);
/**
* 只订阅20991011这一天的数据
* *.*.qingdao.* 只匹配青岛的天气
* *.shandong.*.* 只匹配山东的天气
* 用于队列和交换机的绑定
* 参数1:队列名
* 参数2: 交换机名
* 参数3:路由key
*/
channel.queueBind(RabbitConstant.QUEUE_BAIDU,RabbitConstant.EXCHANGE_WEATHER_TOPIC,"*.shandong.*.*");
/**
* 解除消费队列和虚拟机的绑定
*/
//channel.queueUnbind(RabbitConstant.QUEUE_BAIDU,RabbitConstant.EXCHANGE_WEATHER_TOPIC,"*.*.*.20991011");
channel.basicQos(1);
//放入队列
channel.basicConsume(RabbitConstant.QUEUE_BAIDU, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//真正开发时,调用短信包接口就可以发送了
System.out.println("百度收到气象信息:"+new String(body));
/**
* 确认签收
* 参数1:获取这个消息的TagID
* 参数2:false只确认签收当前的消息,设置为true:代表签收该消费这所有未签收的消息
*/
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
3.3 消费者–新浪
需求:只获取美国全境的天气
package com.ruoyi.project.rabbitmq.topic;
import com.rabbitmq.client.*;
import com.ruoyi.project.rabbitmq.utils.RabbitConstant;
import com.ruoyi.project.rabbitmq.utils.RabbitUtils;
import java.io.IOException;
/**
* 消费者
* 需求:获取美国全境的天气
*/
public class Sina {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
//创建通信“通道”,相当于TCP中的虚拟连接
Channel channel = connection.createChannel();
//创建一个队列
channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);
/**
* 用于队列和交换机的绑定
* 参数1:队列名
* 参数2: 交换机名
* 参数3:路由key
*/
channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "us.#");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_SINA, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String jsonSMS = new String(body);
//真正开发时,调用短信包接口就可以发送了
System.out.println("新浪收到气象信息:" + jsonSMS);
/**
* 确认签收
* 参数1:获取这个消息的TagID
* 参数2:false只确认签收当前的消息,设置为true:代表签收该消费这所有未签收的消息
*/
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
3.4 运行结果
3.5 解除消费队列和虚拟机的绑定
代码实现
channel.queueUnbind(RabbitConstant.QUEUE_BAIDU,RabbitConstant.EXCHANGE_WEATHER_TOPIC,"*.*.*.20991011");
或者直接在rabbitMQ后台找到该模糊匹配绑定的虚拟机,点击Unbind按钮,即可解除绑定
点击阅读全文
更多推荐
所有评论(0)