消息队列之RabbitMQ工作模式

news/2024/6/18 21:34:17 标签: rabbitmq, 工作模式, 消息队列, MQ

系列文章目录

提示:这里可以添加系列文章的所有文章的目录,目录需要自己手动添加
消息队列之RabbitMQ工作模式


提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录

  • 系列文章目录
  • 前言
  • 一、RabbitMQ工作模式介绍
  • 二、工作模式的使用
    • 环境准备
    • 简单模式
      • 编写生产者
      • 编写消费者
    • 工作队列模式
      • 编写生产者
      • 编写消费者
    • 发布订阅模式
      • 编写生产者
      • 编写消费者
    • 路由模式
      • 编写生产者
      • 编写消费者
    • 通配符模式
      • 编写生产者
      • 编写消费者
  • 三、SpringBoot整合RabbitMQ
    • 项目搭建
    • 创建对列和交换机
    • 编写生产者
    • 编写消费者
  • 总结


前言

提示:这里可以添加本文要记录的大概内容:

在这篇博客中,我将深入探讨 RabbitMQ工作模式,带你领略它的强大之处。我们将了解到 RabbitMQ 如何通过队列、交换机、绑定等核心概念实现消息的路由和传递。你将学习到不同类型的交换机,以及它们如何根据消息的路由键将消息路由到相应的队列。
无论你是刚刚接触消息队列,还是已经有一定经验的开发者,这篇博客都将为你提供深入了解 RabbitMQ 工作模式的机会。通过实际的案例和代码示例,我将帮助你更好地理解如何使用 RabbitMQ 来解决实际的业务问题。
让我们一起探索生成消息队列之 RabbitMQ 的世界,掌握其工作模式,提升我们的应用架构和开发能力!


提示:以下是本篇文章正文内容,下面案例可供参考

MQ_24">一、RabbitMQ工作模式介绍

  • 简单模式(Simple):在简单模式下,生产者将消息发送到队列,消费者从队列中获取消息并进行处理。生产者和消费者之间没有直接的通信,而是通过 RabbitMQ 进行消息的传递和处理。这种模式适用于简单的消息传递场景。
  • 工作队列模式(Work Queue):在工作队列模式下,多个消费者可以同时从同一个队列中获取消息并进行处理。这种模式适用于需要并发处理消息的场景,例如后台处理任务或分布式计算。
  • 发布/订阅模式(Publish/Subscribe):在发布/订阅模式下,生产者将消息发布到一个特定的主题(Topic),多个消费者可以订阅该主题并接收所有发布的消息。这种模式适用于消息广播和消息分发,例如通知系统或消息推送。
  • 路由模式(Routing):在路由模式下,生产者将消息发送到一个交换机,交换机根据消息的路由键(Routing Key)将消息转发到多个队列。消费者可以从不同的队列中获取消息,并根据消息的内容进行处理。这种模式适用于复杂的消息路由和处理需求,例如根据不同的业务规则将消息转发到不同的队列。
  • 通配符模式(Topic)是在路由模式的基础上,给队列绑定带通配符的路由关键字,只要消息的RoutingKey能实现通配符匹配,就会将消息转发到该队列。通配符模式比路由模式更灵活,使用topic交换机。

二、工作模式的使用

环境准备

1.启动RabbitMQ

# 开启管控台插件
MQ.html" title=rabbitmq>rabbitmq-plugins enable MQ.html" title=rabbitmq>rabbitmq_management
# 启动MQ.html" title=rabbitmq>rabbitmq
MQ.html" title=rabbitmq>rabbitmq-server -detached

2.创建普通maven项目,添加RabbitMQ依赖

<dependencies>
  <dependency>
    <groupId>com.MQ.html" title=rabbitmq>rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.14.0</version>
  </dependency>
</dependencies>

简单模式

简单模式是RabbitMQ工作模式中的一种,是指一个生产者发送消息,一个消费者消费消息。消息的消费者监听消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除。这种模式下不需要将Exchange进行任何绑定操作。
简单模式常用于一个生产者,一个消费者的情况。例如,在电子商务应用中,生产者可以是一个订单处理程序,消费者可以是一个物流跟踪程序,生产者将订单信息发送到队列,消费者从队列中获取订单信息并进行处理。

编写生产者

// 生产者
public class Producer {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("zhangsan");
    connectionFactory.setPassword("zhangsan");
    connectionFactory.setVirtualHost("/");
    // 2.创建连接
    Connection connection = connectionFactory.newConnection();
    // 3.建立信道
    Channel channel = connection.createChannel();
    // 4.创建队列,如果队列已存在,则使用该队列
    /**
     * 参数1:队列名
     * 参数2:是否持久化,true表示MQ重启后队列还在。
     * 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问
     * 参数4:是否自动删除,true表示不再使用队列时自动删除队列
     * 参数5:其他额外参数
     */
    channel.queueDeclare("simple_queue",false,false,false,null);
    // 5.发送消息
    String message = "hello!MQ.html" title=rabbitmq>rabbitmq!";
    /**
     * 参数1:交换机名,""表示默认交换机
     * 参数2:路由键,简单模式就是队列名
     * 参数3:其他额外参数
     * 参数4:要传递的消息字节数组
     */
    channel.basicPublish("","simple_queue",null,message.getBytes());
    // 6.关闭信道和连接
    channel.close();
    connection.close();
    System.out.println("===发送成功===");
   }
}

编写消费者

// 消费者
public class Consumer {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("itbaizhan");
    connectionFactory.setPassword("itbaizhan");
    connectionFactory.setVirtualHost("/");
    // 2.创建连接
    Connection connection = connectionFactory.newConnection();
    // 3.建立信道
    Channel channel = connection.createChannel();
    // 4.监听队列
    /**
     * 参数1:监听的队列名
     * 参数2:是否自动签收,如果设置为false,则需要手动确认消息已收到,否则MQ会一直发送消息
     * 参数3:Consumer的实现类,重写该类方法表示接受到消息后如何消费
     */
    channel.basicConsume("simple_queue",true,new DefaultConsumer(channel){
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println("接受消息,消息为:"+message);
       }
     });
   }
}

工作队列模式

工作队列模式是RabbitMQ工作模式的一种,由一个生产者和多个消费者组成。生产者发送消息到消息队列,消费者轮询进行消费。
与简单模式相比,工作队列模式的区别在于可以有多个消费者共同消费一个队列中的消息。工作队列模式使用默认交换机,可以避免消费积压。有两种消息分配策略:循环调度和消息确认。循环调度是默认策略,消息队列会循环依次给每个消费者发送消息,每个消费者都会收到相同数量的消息,但可能导致丢失消息。消息确认策略下,消费者在消费完某个消息,并发送确认消息后,消息队列会重新将新消息推送给该消费者进行消费,同时队列可以自由删除消费过的消息。如果消费者没有发送确认消息的情况下死亡(如通道关闭、链接关闭、TCP链接丢失等),队列了解到消息未被完全处理,会将消息重新排队。如果同时有其它消费者在线消费,那么这条消息会被快速传递给另一个消费者进行消费,这样可以确保不丢失任何消息。消费者消费确认的超时时间默认是30分钟。

编写生产者

public class Producer {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("zhangsan");
    connectionFactory.setPassword("zhangsan");
    connectionFactory.setVirtualHost("/");
    // 2.创建连接
    Connection connection = connectionFactory.newConnection();
    // 3.建立信道
    Channel channel = connection.createChannel();
    // 4.创建队列,持久化队列
    channel.queueDeclare("work_queue",true,false,false,null);
    // 5.发送大量消息,参数3表示该消息为持久化消息,即除了保存到内存还会保存到磁盘中
    for (int i = 1; i <= 100; i++) {
      channel.basicPublish("","work_queue", MessageProperties.PERSISTENT_TEXT_PLAIN,
           ("你好,这是今天的第"+i+"条消息").getBytes());
     }
    // 6.关闭资源
    channel.close();
    connection.close();
   }
}

编写消费者

// 消费者1
public class Consumer1 {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("zhangsan");
    connectionFactory.setPassword("zhangsan");
    connectionFactory.setVirtualHost("/");
    // 2.创建连接
    Connection connection = connectionFactory.newConnection();
    // 3.建立信道
    Channel channel = connection.createChannel();
    // 4.监听队列,处理消息
    channel.basicConsume("work_queue",true,new DefaultConsumer(channel){
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println("消费者1消费消息,消息为:"+message);
       }
     });
   }
}


// 消费者2
public class Consumer2 {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("zhangsan");
    connectionFactory.setPassword("zhangsan");
    connectionFactory.setVirtualHost("/");
    // 2.创建连接
    Connection connection = connectionFactory.newConnection();
    // 3.建立信道
    Channel channel = connection.createChannel();
    // 4.监听队列,处理消息
    channel.basicConsume("work_queue",true,new DefaultConsumer(channel){
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println("消费者2消费消息,消息为:"+message);
       }
     });
   }
}


// 消费者3
public class Consumer3 {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("zhangsan");
    connectionFactory.setPassword("zhangsan");
    connectionFactory.setVirtualHost("/");
    // 2.创建连接
    Connection connection = connectionFactory.newConnection();
    // 3.建立信道
    Channel channel = connection.createChannel();
    // 4.监听队列,处理消息
    channel.basicConsume("work_queue",true,new DefaultConsumer(channel){
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println("消费者3消费消息,消息为:"+message);
       }
     });
   }
}

发布订阅模式

发布订阅模式是RabbitMQ工作模式的一种,由一个生产者和多个消费者组成。生产者将消息发送到交换机,交换机将消息转发到多个队列,每个队列都有一个相应的消费者进行消费。
发布订阅模式的特点是每个消费者都可以订阅一个或多个主题,并接收所有与该主题相关的消息。消费者可以通过订阅不同的主题来接收不同类型的消息,并根据消息的内容进行处理。
发布订阅模式适用于需要向多个消费者发送相同类型的消息的场景,例如通知系统或消息推送。

编写生产者

// 生产者
public class Producer {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("zhangsan");
    connectionFactory.setPassword("zhangsan");
    connectionFactory.setVirtualHost("/");
    // 2.创建连接
    Connection connection = connectionFactory.newConnection();
    // 3.建立信道
    Channel channel = connection.createChannel();
    // 4.创建交换机
    /**
     * 参数1:交换机名
     * 参数2:交换机类型
     * 参数3:交换机持久化
     */
    channel.exchangeDeclare("exchange_fanout", BuiltinExchangeType.FANOUT,true);
    // 5.创建队列
    channel.queueDeclare("SEND_MAIL",true,false,false,null);
    channel.queueDeclare("SEND_MESSAGE",true,false,false,null);
    channel.queueDeclare("SEND_STATION",true,false,false,null);
    // 6.交换机绑定队列
    /**
     * 参数1:队列名
     * 参数2:交换机名
     * 参数3:路由关键字,发布订阅模式写""即可
     */
    channel.queueBind("SEND_MAIL","exchange_fanout","");
    channel.queueBind("SEND_MESSAGE","exchange_fanout","");
    channel.queueBind("SEND_STATION","exchange_fanout","");
    // 7.发送消息
    for (int i = 1; i <= 10 ; i++) {
      channel.basicPublish("exchange_fanout","",null,
           ("你好,尊敬的用户,秒杀商品开抢了!"+i).getBytes(StandardCharsets.UTF_8));
     }
    // 8.关闭资源
    channel.close();
    connection.close();
   }
}

编写消费者

// 站内信消费者
public class CustomerStation {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("zhangsan");
    connectionFactory.setPassword("zhangsan");
    connectionFactory.setVirtualHost("/");// 默认虚拟机
    //2.创建连接
    Connection conn = connectionFactory.newConnection();
    //3.建立信道
    Channel channel = conn.createChannel();
    // 4.监听队列
    channel.basicConsume("SEND_STATION", true, new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "utf-8");
        System.out.println("发送站内信:"+message);
       }
     });
   }
}


// 邮件消费者
public class Customer_Mail {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("zhangsan");
    connectionFactory.setPassword("zhangsan");
    connectionFactory.setVirtualHost("/");// 默认虚拟机
    //2.创建连接
    Connection conn = connectionFactory.newConnection();
    //3.建立信道
    Channel channel = conn.createChannel();
    // 4.监听队列
    channel.basicConsume("SEND_MAIL", true, new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "utf-8");
        System.out.println("发送邮件:"+message);
       }
     });
   }
}


// 短信消费者
public class Customer_Message {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("zhangsan");
    connectionFactory.setPassword("zhangsan");
    connectionFactory.setVirtualHost("/");// 默认虚拟机
    //2.创建连接
    Connection conn = connectionFactory.newConnection();
    //3.建立信道
    Channel channel = conn.createChannel();
    // 4.监听队列
    channel.basicConsume("SEND_MESSAGE", true, new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "utf-8");
        System.out.println("发送短信:"+message);
       }
     });
   }
}

路由模式

RabbitMQ的路由模式是指,队列在绑定交换机时,需要指定一个RoutingKey(路由键)。消息的发送方在向Exchange发送消息时,也必须指定消息的RoutingKey。Exchange不再将消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的RoutingKey与消息的RoutingKey完全一致,才会接收到消息。
这种模式可以实现有选择地接收消息,通过指定不同的RoutingKey,将消息发送到不同的队列,从而实现消息的精准投递。例如,一个队列与一个交换器进行绑定时,可以指定多个不同的RoutingKey,当消息的RoutingKey与队列的RoutingKey相匹配时,消息才会被投递到该队列中。
请注意,使用路由模式时,需要确保RoutingKey的设置与实际需求相符,以确保消息能够准确地投递到目标队列中。

编写生产者

// 生产者
public class Producer {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("zhangsan");
    connectionFactory.setPassword("zhangsan");
    connectionFactory.setVirtualHost("/");
    // 2.创建连接
    Connection connection = connectionFactory.newConnection();
    // 3.建立信道
    Channel channel = connection.createChannel();
    // 4.创建交换机
    channel.exchangeDeclare("exchange_routing", BuiltinExchangeType.DIRECT,true);
    // 5.创建队列
    channel.queueDeclare("SEND_MAIL2",true,false,false,null);
    channel.queueDeclare("SEND_MESSAGE2",true,false,false,null);
    channel.queueDeclare("SEND_STATION2",true,false,false,null);
    // 6.交换机绑定队列
    channel.queueBind("SEND_MAIL2","exchange_routing","import");
    channel.queueBind("SEND_MESSAGE2","exchange_routing","import");
    channel.queueBind("SEND_STATION2","exchange_routing","import");
    channel.queueBind("SEND_STATION2","exchange_routing","normal");
    // 7.发送消息
    channel.basicPublish("exchange_routing","import",null,
        "双十一大促活动".getBytes());
    channel.basicPublish("exchange_routing","normal",null,
        "小心促销活动".getBytes());
    // 8.关闭资源
    channel.close();
    connection.close();
   }
}

编写消费者

// 站内信消费者
public class Customer_Station {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("zhangsan");
    connectionFactory.setPassword("zhangsan");
    connectionFactory.setVirtualHost("/");// 默认虚拟机
    //2.创建连接
    Connection conn = connectionFactory.newConnection();
    //3.建立信道
    Channel channel = conn.createChannel();
    // 4.监听队列
    channel.basicConsume("SEND_STATION2", true, new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "utf-8");
        System.out.println("发送站内信:"+message);
       }
     });
   }
}


// 邮件消费者
public class Customer_Mail {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("zhangsan");
    connectionFactory.setPassword("zhangsan");
    connectionFactory.setVirtualHost("/");
    //2.创建连接
    Connection conn = connectionFactory.newConnection();
    //3.建立信道
    Channel channel = conn.createChannel();
    // 4.监听队列
    channel.basicConsume("SEND_MAIL2", true, new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "utf-8");
        System.out.println("发送邮件:"+message);
       }
     });
   }
}


// 短信消费者
public class Customer_Message {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("zhangsan");
    connectionFactory.setPassword("zhangsan");
    connectionFactory.setVirtualHost("/");// 默认虚拟机
    //2.创建连接
    Connection conn = connectionFactory.newConnection();
    //3.建立信道
    Channel channel = conn.createChannel();
    // 4.监听队列
    channel.basicConsume("SEND_MESSAGE2", true, new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "utf-8");
        System.out.println("发送短信:"+message);
       }
     });
   }
}

通配符模式

RabbitMQ的通配符模式是指,队列在绑定交换机时,可以使用通配符来匹配消息的路由键。此时,队列需要绑定到一个模式上,通配符的规则为:

  • 消息设置RoutingKey时,RoutingKey由多个单词构成,中间以.分割。
  • 队列设置RoutingKey时,#可以匹配任意多个单词,*可以匹配任意一个单词。

例如,“audit.#”能够匹配到“audit.irs.corporate”。通配符模式可以实现有选择地接收消息,通过指定不同的通配符,将消息发送到不同的队列,从而实现消息的精准投递。

编写生产者

// 生产者
public class Producer {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("zhangsan");
    connectionFactory.setPassword("zhangsan");
    connectionFactory.setVirtualHost("/");
    // 2.创建连接
    Connection connection = connectionFactory.newConnection();
    // 3.建立信道
    Channel channel = connection.createChannel();
    // 4.创建交换机
    channel.exchangeDeclare("exchange_topic", BuiltinExchangeType.TOPIC,true);
    // 5.创建队列
    channel.queueDeclare("SEND_MAIL3",true,false,false,null);
    channel.queueDeclare("SEND_MESSAGE3",true,false,false,null);
    channel.queueDeclare("SEND_STATION3",true,false,false,null);
    // 6.交换机绑定队列
    channel.queueBind("SEND_MAIL3","exchange_topic","#.mail.#");
    channel.queueBind("SEND_MESSAGE3","exchange_topic","#.message.#");
    channel.queueBind("SEND_STATION3","exchange_topic","#.station.#");
    // 7.发送消息
    channel.basicPublish("exchange_topic","mail.message.station",null,
        "双十一大促活动".getBytes());
    channel.basicPublish("exchange_topic","station",null,
        "小型促销活动".getBytes());
    // 8.关闭资源
    channel.close();
    connection.close();
   }
}

编写消费者

// 站内信消费者
public class Customer_Station {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("zhangsan");
    connectionFactory.setPassword("zhangsan");
    connectionFactory.setVirtualHost("/");// 默认虚拟机


    //2.创建连接
    Connection conn = connectionFactory.newConnection();
  
    //3.建立信道
    Channel channel = conn.createChannel();
  
    // 4.监听队列
    channel.basicConsume("SEND_STATION3", true, new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "utf-8");
        System.out.println("发送站内信:"+message);
       }
     });
   }
}


// 邮件消费者
public class Customer_Mail {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("zhangsan");
    connectionFactory.setPassword("zhangsan");
    connectionFactory.setVirtualHost("/");// 默认虚拟机


    //2.创建连接
    Connection conn = connectionFactory.newConnection();
  
    //3.建立信道
    Channel channel = conn.createChannel();
  
    // 4.监听队列
    channel.basicConsume("SEND_MAIL3", true, new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "utf-8");
        System.out.println("发送邮件:"+message);
       }
     });
   }


}


// 短信消费者
public class Customer_Message {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("zhangsan");
    connectionFactory.setPassword("zhangsan");
    connectionFactory.setVirtualHost("/");// 默认虚拟机


    //2.创建连接
    Connection conn = connectionFactory.newConnection();
  
    //3.建立信道
    Channel channel = conn.createChannel();
  
    // 4.监听队列
    channel.basicConsume("SEND_MESSAGE3", true, new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "utf-8");
        System.out.println("发送短信:"+message);
       }
     });
   }
}

MQ_649">三、SpringBoot整合RabbitMQ

项目搭建

1.创建SpringBoot项目,引入RabbitMQ起步依赖

<!-- RabbitMQ起步依赖 -->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.编写配置文件

spring:
  MQ.html" title=rabbitmq>rabbitmq:
   host: 192.168.0.162
   port: 5672
   username: zhangsan
   password: zhangsan
   virtual-host: /


#日志格式
logging:
  pattern:
   console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'

创建对列和交换机

@Configuration
public class RabbitConfig {
  private final String EXCHANGE_NAME = "boot_topic_exchange";
  private final String QUEUE_NAME = "boot_queue";


  // 创建交换机
  @Bean("bootExchange")
  public Exchange getExchange() {
    return ExchangeBuilder
         .topicExchange(EXCHANGE_NAME) // 交换机类型
         .durable(true) // 是否持久化
         .build();
   }


  // 创建队列
  @Bean("bootQueue")
  public Queue getMessageQueue() {
    return new Queue(QUEUE_NAME); // 队列名
   }


  // 交换机绑定队列
  @Bean
  public Binding bindMessageQueue(@Qualifier("bootExchange") Exchange exchange, @Qualifier("bootQueue") Queue queue) {
    return BindingBuilder
         .bind(queue)
         .to(exchange)
         .with("#.message.#")
         .noargs();
   }
}

编写生产者

@SpringBootTest
public class TestProducer {
  // 注入RabbitTemplate工具类
  @Autowired
  private RabbitTemplate rabbitTemplate;


  @Test
  public void testSendMessage(){
    /**
     * 发送消息
     * 参数1:交换机
     * 参数2:路由key
     * 参数3:要发送的消息
     */
    rabbitTemplate.convertAndSend("boot_topic_exchange","message","双十一开始了!");
   }
}

编写消费者

1.创建SpringBoot项目,引入RabbitMQ起步依赖

<!-- MQ.html" title=rabbitmq>rabbitmq起步依赖 -->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.编写配置文件

spring:
  MQ.html" title=rabbitmq>rabbitmq:
   host: 192.168.0.162
   port: 5672
   username: zhangsan
   password: zhangsan
   virtual-host: /


#日志格式
logging:
  pattern:
   console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'

3.编写消费者,监听队列

@Component
public class Consumer {
  // 监听队列
  @RabbitListener(queues = "boot_queue")
  public void listen_message(String message){
    System.out.println("发送短信:"+message);
   }
}


总结

提示:这里对文章进行总结:

在这篇博客中,我们深入探讨了 RabbitMQ 的几种工作模式,包括简单模式、工作队列模式、发布/订阅模式、路由模式和通配符模式。
简单模式是消息传递的基本模式,其中生产者将消息发送到队列,消费者从队列中接收消息并进行处理。
工作队列模式允许多个消费者同时处理队列中的消息,提高了消息处理的效率。
发布/订阅模式实现了一对多的消息发布,一个生产者可以将消息发送给多个消费者。
路由模式使得消息能够根据路由键被发送到不同的队列,实现了更灵活的消息路由。
通配符模式通过使用通配符,可以有选择地接收符合特定规则的消息。
了解这些工作模式对于有效利用 RabbitMQ 进行消息传递和构建可靠的分布式系统至关重要。每种模式都有其独特的优势和适用场景,根据具体需求选择合适的工作模式可以提高系统的性能、可扩展性和可靠性。
希望这篇博客对你了解 RabbitMQ工作模式有所帮助!如果你有任何问题或想进一步讨论,欢迎在评论中留言。


http://www.niftyadmin.cn/n/5331903.html

相关文章

SeaTunnel 、DataX 、Sqoop、Flume、Flink CDC 对比

对比 对比项Apache SeaTunnelDataXApache SqoopApache FlumeFlink CDC部署难度容易容易中等,依赖于 Hadoop 生态系统容易中等,依赖于 Hadoop 生态系统运行模式分布式,也支持单机单机本身不是分布式框架,依赖 Hadoop MR 实现分布式分布式,也支持单机分布式,也支持单机健壮…

Java重修第九天—Lambda 表达式和Stream

通过学习本篇文章可以掌握如下知识 Lambda 表达式 Stream Lambda Lambda表达式是JDK 8开始新增的一种语法形式&#xff1b;作用&#xff1a; 用于简化 函数式接口 匿名内部类的代码写法 函数式接口&#xff1a;首先是接口&#xff0c;其次是只有一个抽象方法。 代码实现 p…

010:vue结合el-table实现表格小计总计需求(summary-method)

文章目录 1. 实现效果2. 核心部分3. 完整组件代码4. 注意点 1. 实现效果 2. 核心部分 el-table 添加如下配置&#xff0c;添加 show-summary 属性&#xff0c;配置 summary-method 函数 <el-table.......show-summary:summary-method"getSummaries" >...... …

Linux利用标准c库对文件操作

这里的思路和之前的‘Linux下文件的创建写入读取编程‘的思路是一样的&#xff0c;都是从介绍API的函数功能开始。都是在Linux下操作文件&#xff0c;但是前者是UNIX系统调用函数&#xff0c;后者是ANSIC标准中的C语言库函数。它们有着不同的适用范围&#xff0c;具体有不同&am…

云服务器CVM_弹性云主机_云计算服务器——腾讯云

腾讯云服务器CVM提供安全可靠的弹性计算服务&#xff0c;腾讯云明星级云服务器&#xff0c;弹性计算实时扩展或缩减计算资源&#xff0c;支持包年包月、按量计费和竞价实例计费模式&#xff0c;CVM提供多种CPU、内存、硬盘和带宽可以灵活调整的实例规格&#xff0c;提供9个9的数…

lua使用resty.http做nginx反向代理(https请求,docker容器化部署集群),一个域名多项目转发

下载使用 链接&#xff1a;https://pan.baidu.com/s/1uQ7yCzQsPWsF6xavFTpbZg 提取码&#xff1a;htay –来自百度网盘超级会员V5的分享 在根目录下执行: # 从 github 上下载文件 git clone https://github.com/ledgetech/lua-resty-http.git # 将 lua-resty-http/lib/ 下的 r…

面经-redis

什么是Redis Redis(Remote Dictionary Server)键只能为字符串&#xff0c;值&#xff1a;字符串、列表、集合、散列表、有序集合。Redis 用来做分布式锁。支持事务 、持久化、LUA脚本、LRU驱动事件、多种集群方案。 Redis为什么这么快 完全基于内存&#xff0c;数据结构简单…

Unreal Engine(UE5)中构建离线地图服务

1. 首先需要用到3个软件&#xff0c;Unreal Engine&#xff0c;gis office 和 bigemap离线服务器 Unreal Engine下载地址:点击前往下载页面 Gis office下载地址:点击前往下载页面 Bigemap离线服务器 下载地址: 点击前往下载页面 Unreal Engine用于数字孪生项目开发&#x…