天不为人之恶寒也辍冬,地不为人之恶辽远也辍广。——《荀子》
常见名词
Virtual Hosts——虚拟主机,一个虚拟主机下可有多个队列
Exchange——交换机,分发消息到队列中
管理界面
使用默认账户guest密码guest登录RabbitMQ管理界面


这里可以看到我们的端口和相关信息
15672——管理界面
25672——RabbitMQ集群通信端口号
5672——RabbitMQ内部通信端口号
快速入门
引入依赖
| <dependency>
 <groupId>com.rabbitmq</groupId>
 <artifactId>amqp-client</artifactId>
 <version>5.10.0</version>
 </dependency>
 
 | 
简单队列

生产者
| package com.ruben.mq.rabbitMQ.simple;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 
 import java.nio.charset.StandardCharsets;
 
 public class Send {
 private final static String QUEUE_NAME = "hello";
 
 public static void main(String[] argv) throws Exception {
 ConnectionFactory factory = new ConnectionFactory();
 factory.setHost("localhost");
 
 try (Connection connection = factory.newConnection();
 Channel channel = connection.createChannel()) {
 
 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
 String message = "Hello World!";
 
 channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
 System.out.println(" [x] Sent '" + message + "'");
 }
 }
 }
 
 | 
消费者
| package com.ruben.mq.rabbitMQ.simple;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 import com.rabbitmq.client.DeliverCallback;
 
 public class Recv {
 
 private final static String QUEUE_NAME = "hello";
 
 public static void main(String[] argv) throws Exception {
 ConnectionFactory factory = new ConnectionFactory();
 factory.setHost("localhost");
 Connection connection = factory.newConnection();
 Channel channel = connection.createChannel();
 
 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
 System.out.println(" [*] Waiting for messages.");
 
 DeliverCallback deliverCallback = (consumerTag, delivery) -> {
 String message = new String(delivery.getBody(), "UTF-8");
 System.out.println(" [x] Received '" + message + "'");
 };
 channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
 });
 }
 }
 
 | 
我们可以在管理界面看到队列和消息情况

我们再创建一个Virtual Host,点击Add virtual host

我们点击这个Virtual Host

点击Set permission来设置权限

然后创建队列

这里Durable表示持久化到磁盘,Transient表示队列只在内存中存储

这样我们就可以在创建连接时指定Virtual Host了
MQ确保消息不丢失
生产者->MQ
Ack消息确认机制(MQ收到消息后同步或异步的方式通知生产者)
| 
 
 
 
 
 
 
 
 private static void ACKConfirmDemo() throws Exception {
 try (Connection connection = RabbitMQConnection.getConnection();
 
 Channel channel = connection.createChannel()) {
 
 String msg = "Hino Supa";
 
 channel.confirmSelect();
 
 channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8));
 System.out.println("消息投递成功");
 
 boolean result = channel.waitForConfirms();
 if (result) {
 System.out.println("消息投递成功");
 } else {
 System.out.println("消息投递失败");
 }
 }
 }
 
 | 
事务形式
| 
 
 
 
 
 
 
 
 private static void transactionDemo() throws Exception {
 try (Connection connection = RabbitMQConnection.getConnection();
 
 Channel channel = connection.createChannel()) {
 try {
 
 String msg = "Hino Supa";
 
 channel.txSelect();
 
 channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8));
 
 channel.txCommit();
 System.out.println("消息投递成功");
 } catch (IOException e) {
 
 channel.txRollback();
 e.printStackTrace();
 }
 }
 }
 
 | 
MQ->消费
RabbitMQ必须要将消息消费成功后才会从mq服务端中移除
Kafka不管是消费成功还是失败,都不会立即从mq服务端中移除,使用offset记录消息消费情况
工作队列

我们的消费者可根据自身能力调整消费消息数,如果有多个消费者,则每次消费完成都去告诉RabbitMQ,从而获取下一条/多条消息
| Connection connection = RabbitMQConnection.getConnection();
 
 Channel channel = connection.createChannel();
 
 channel.basicQos(2);
 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
 @Override
 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
 String msg = new String(body, StandardCharsets.UTF_8);
 System.out.println("消费者获取消息:" + msg);
 
 channel.basicAck(envelope.getDeliveryTag(), false);
 }
 };
 
 channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
 
 | 
发布订阅

Exchange——交换机,分发消息到队列中
有以下几种交换机direct,topic,headers和fanout。
Fanout Exchange:扇形交换机——我们每个消费者都能收到消息

生产者代码
| package com.ruben.mq.rabbitMQ.subcrible;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.ruben.mq.rabbitMQ.connection.RabbitMQConnection;
 
 import java.io.IOException;
 import java.util.concurrent.TimeoutException;
 
 public class ProducerFanout {
 
 
 
 
 private static final String EXCHANGE_NAME = "fanout_exchange";
 
 public static void main(String[] args) throws IOException, TimeoutException {
 
 try (Connection connection = RabbitMQConnection.getConnection();
 
 Channel channel = connection.createChannel();) {
 
 channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);
 String msg = "ruben";
 
 channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
 }
 }
 
 }
 
 | 
消费者1
| package com.ruben.mq.rabbitMQ.subcrible;
 import com.rabbitmq.client.*;
 import com.ruben.mq.rabbitMQ.connection.RabbitMQConnection;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.concurrent.TimeoutException;
 
 public class MailConsumer {
 
 
 
 private static final String QUEUE_NAME = "fanout_email_queue";
 
 
 
 private static final String EXCHANGE_NAME = "fanout_exchange";
 
 public static void main(String[] args) throws IOException, TimeoutException {
 System.out.println("邮件消费者...");
 
 Connection connection = RabbitMQConnection.getConnection();
 
 Channel channel = connection.createChannel();
 
 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
 @Override
 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
 String msg = new String(body, StandardCharsets.UTF_8);
 System.out.println("邮件消费者获取消息:" + msg);
 }
 };
 
 channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
 
 }
 }
 
 | 
消费者2
| package com.ruben.mq.rabbitMQ.subcrible;
 import com.rabbitmq.client.*;
 import com.ruben.mq.rabbitMQ.connection.RabbitMQConnection;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.concurrent.TimeoutException;
 
 public class SmsConsumer {
 
 
 
 private static final String QUEUE_NAME = "fanout_email_sms";
 
 
 
 private static final String EXCHANGE_NAME = "fanout_exchange";
 
 public static void main(String[] args) throws IOException, TimeoutException {
 System.out.println("短信消费者...");
 
 Connection connection = RabbitMQConnection.getConnection();
 
 final Channel channel = connection.createChannel();
 
 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
 @Override
 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
 String msg = new String(body, StandardCharsets.UTF_8);
 System.out.println("短信消费者获取消息:" + msg);
 }
 };
 
 channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
 
 }
 }
 
 | 
Direct:直连交换机——按照指定的routingKey去分发消息
生产者
| package com.ruben.mq.rabbitMQ.subcrible.direct;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.ruben.mq.rabbitMQ.connection.RabbitMQConnection;
 
 import java.io.IOException;
 import java.util.concurrent.TimeoutException;
 
 public class ProducerDirect {
 
 
 
 
 private static final String EXCHANGE_NAME = "direct_exchange";
 
 public static void main(String[] args) throws IOException, TimeoutException {
 
 try (Connection connection = RabbitMQConnection.getConnection();
 
 Channel channel = connection.createChannel()) {
 
 channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
 String msg = "ruben";
 channel.basicPublish(EXCHANGE_NAME, "sms", null, msg.getBytes());
 }
 }
 
 }
 
 | 
消费者email
| package com.ruben.mq.rabbitMQ.subcrible.direct;
 import com.rabbitmq.client.*;
 import com.ruben.mq.rabbitMQ.connection.RabbitMQConnection;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.concurrent.TimeoutException;
 
 public class MailConsumer {
 
 
 
 private static final String QUEUE_NAME = "direct_email_queue";
 
 
 
 private static final String EXCHANGE_NAME = "direct_exchange";
 
 public static void main(String[] args) throws IOException, TimeoutException {
 System.out.println("邮件消费者...");
 
 Connection connection = RabbitMQConnection.getConnection();
 
 final Channel channel = connection.createChannel();
 
 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "email");
 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
 @Override
 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
 String msg = new String(body, StandardCharsets.UTF_8);
 System.out.println("邮件消费者获取消息:" + msg);
 }
 };
 
 channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
 
 }
 }
 
 | 
消费者sms
| package com.ruben.mq.rabbitMQ.subcrible.direct;
 import com.rabbitmq.client.*;
 import com.ruben.mq.rabbitMQ.connection.RabbitMQConnection;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.concurrent.TimeoutException;
 
 public class SmsConsumer {
 
 
 
 private static final String QUEUE_NAME = "direct_sms_queue";
 
 
 
 private static final String EXCHANGE_NAME = "direct_exchange";
 
 public static void main(String[] args) throws IOException, TimeoutException {
 System.out.println("短信消费者...");
 
 Connection connection = RabbitMQConnection.getConnection();
 
 final Channel channel = connection.createChannel();
 
 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "sms");
 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
 @Override
 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
 String msg = new String(body, StandardCharsets.UTF_8);
 System.out.println("短信消费者获取消息:" + msg);
 }
 };
 
 channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
 
 }
 }
 
 | 
Topic:主题交换机——消费者的routingKey使用[主题].*去匹配生产者发送的routingKey为[主题].xxx的消息

生产者,发送routingKey为supa.sms的消息
| package com.ruben.mq.rabbitMQ.subcrible.topic;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.ruben.mq.rabbitMQ.connection.RabbitMQConnection;
 
 import java.io.IOException;
 import java.util.concurrent.TimeoutException;
 
 public class ProducerTopic {
 
 
 
 
 private static final String EXCHANGE_NAME = "topic_exchange";
 
 public static void main(String[] args) throws IOException, TimeoutException {
 
 try (Connection connection = RabbitMQConnection.getConnection();
 
 Channel channel = connection.createChannel();) {
 
 channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
 String msg = "ruben";
 channel.basicPublish(EXCHANGE_NAME, "supa.sms", null, msg.getBytes());
 }
 }
 
 }
 
 | 
消费者,指定routingKey为ruben.*
| package com.ruben.mq.rabbitMQ.subcrible.topic;
 import com.rabbitmq.client.*;
 import com.ruben.mq.rabbitMQ.connection.RabbitMQConnection;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.concurrent.TimeoutException;
 
 public class MailConsumer {
 
 
 
 private static final String QUEUE_NAME = "topic_email_queue";
 
 
 
 private static final String EXCHANGE_NAME = "topic_exchange";
 
 public static void main(String[] args) throws IOException, TimeoutException {
 System.out.println("邮件消费者...");
 
 Connection connection = RabbitMQConnection.getConnection();
 
 Channel channel = connection.createChannel();
 
 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "ruben.*");
 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
 @Override
 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
 String msg = new String(body, StandardCharsets.UTF_8);
 System.out.println("邮件消费者获取消息:" + msg);
 }
 };
 
 channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
 
 }
 }
 
 | 
消费者,指定routingKey为supa.*
| package com.ruben.mq.rabbitMQ.subcrible.topic;
 import com.rabbitmq.client.*;
 import com.ruben.mq.rabbitMQ.connection.RabbitMQConnection;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.concurrent.TimeoutException;
 
 public class SmsConsumer {
 
 
 
 private static final String QUEUE_NAME = "topic_sms_queue";
 
 
 
 private static final String EXCHANGE_NAME = "topic_exchange";
 
 public static void main(String[] args) throws IOException, TimeoutException {
 System.out.println("短信消费者...");
 
 Connection connection = RabbitMQConnection.getConnection();
 
 Channel channel = connection.createChannel();
 
 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "supa.*");
 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
 @Override
 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
 String msg = new String(body, StandardCharsets.UTF_8);
 System.out.println("短信消费者获取消息:" + msg);
 }
 };
 
 channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
 
 }
 }
 
 | 
springboot整合RabbitMQ
GAV
| <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-amqp</artifactId>
 <version>2.4.2</version>
 </dependency>
 
 | 
然后是配置文件和配置类
| spring: rabbitmq:
 addresses: localhost
 port: 5672
 username: guest
 password: guest
 virtual-host: /ruben
 
 | 
| package com.ruben.config;
 import org.springframework.amqp.core.Binding;
 import org.springframework.amqp.core.BindingBuilder;
 import org.springframework.amqp.core.FanoutExchange;
 import org.springframework.amqp.core.Queue;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
 
 
 
 
 
 
 
 
 
 @Configuration
 public class RabbitmqConfig {
 
 
 
 public static final String QUEUE_RUBEN_SMS = "queue_ruben_sms";
 
 
 
 public static final String EXCHANGE_RUBEN = "exchange_ruben";
 
 
 
 public static final String ROUTING_KEY_RUBEN = "ruben.sms";
 
 @Bean
 public Queue smsQueue() {
 return new Queue(QUEUE_RUBEN_SMS);
 }
 
 @Bean
 public FanoutExchange fanoutExchange() {
 return new FanoutExchange(EXCHANGE_RUBEN);
 }
 
 @Bean
 public Binding bindingSmsFanoutExchange(Queue smsQueue, FanoutExchange fanoutExchange) {
 return BindingBuilder.bind(smsQueue).to(fanoutExchange);
 }
 
 }
 
 | 
最后是发消息
| @Resourceprivate AmqpTemplate amqpTemplate;
 
 @GetMapping("sendSms/{number}")
 public AjaxJson sendSms(@PathVariable String number) {
 
 String code = new Random().ints(100000, 999999).boxed().findAny().map(String::valueOf).orElseThrow(RuntimeException::new);
 
 stringRedisTemplate.opsForValue().set(number, code, 5, TimeUnit.MINUTES);
 
 amqpTemplate.send(RabbitmqConfig.EXCHANGE_RUBEN, RabbitmqConfig.ROUTING_KEY_RUBEN, MessageBuilder.withBody(JSON.toJSONString(SmsTO.builder().number(number).code(code).build()).getBytes(StandardCharsets.UTF_8)).build());
 return AjaxJson.success("发送成功!");
 }
 
 | 
然后是消费者这边
先配置上面同样的配置类,然后
| package com.ruben.rubenproducerdemo.consumer;
 import com.alibaba.fastjson.JSON;
 import com.ruben.rubenproducerdemo.config.RabbitmqConfig;
 import com.ruben.rubenproducerdemo.pojo.to.SmsTO;
 import com.ruben.rubenproducerdemo.utils.SmsUtil;
 import org.springframework.amqp.core.Message;
 import org.springframework.amqp.rabbit.annotation.*;
 import org.springframework.stereotype.Component;
 
 
 
 
 
 
 
 
 
 
 @Component
 public class SmsConsumer {
 
 @RabbitHandler
 @RabbitListener(bindings = @QueueBinding(value = @Queue(value = RabbitmqConfig.QUEUE_RUBEN_SMS), exchange = @Exchange(value = RabbitmqConfig.EXCHANGE_RUBEN, type = "fanout"), key = RabbitmqConfig.ROUTING_KEY_RUBEN))
 public void consume(Message message) {
 SmsTO smsTO = JSON.parseObject(message.getBody(), SmsTO.class);
 SmsUtil.SendSms(smsTO.getNumber(), "SMS_189521312", smsTO.getCode());
 }
 
 }
 
 | 
然后是发短信的代码,在我之前写过的一篇博客中有
这样就实现了同步返回结果并存入数据库,异步发送验证码短信的业务啦~
死信队列
消息中间件拒收该消息后转移到死信队列中存放,死信队列也可以有交换机、路由key等
产生原因
1.消息以及过期了都还没被消费
2.队列容量满了
3.消费者多次消费失败
这里我们进行配置
| package com.ruben.rubenproducerdemo.config;
 import org.springframework.amqp.core.*;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
 
 
 
 
 
 
 
 
 
 
 @Configuration
 public class RabbitmqConfig {
 
 
 
 public static final String QUEUE_RUBEN_SMS = "queue_ruben_sms";
 
 
 
 public static final String QUEUE_DEAD_RUBEN_SMS = "queue_dead_ruben_sms";
 
 
 
 public static final String EXCHANGE_RUBEN = "exchange_ruben";
 
 
 
 public static final String EXCHANGE_DEAD_RUBEN = "exchange_dead_ruben";
 
 
 
 public static final String ROUTING_KEY_RUBEN = "ruben.sms";
 
 
 
 
 
 
 @Bean
 public Queue smsQueue() {
 return QueueBuilder
 
 .durable(QUEUE_RUBEN_SMS)
 
 .ttl(10000)
 
 .deadLetterExchange(EXCHANGE_DEAD_RUBEN)
 
 .deadLetterRoutingKey(ROUTING_KEY_RUBEN)
 .build();
 }
 
 
 
 
 
 
 @Bean
 public FanoutExchange fanoutExchange() {
 return ExchangeBuilder.fanoutExchange(EXCHANGE_RUBEN).build();
 }
 
 
 
 
 
 
 
 
 @Bean
 public Binding bindingSmsFanoutExchange(Queue smsQueue, FanoutExchange fanoutExchange) {
 return BindingBuilder.bind(smsQueue).to(fanoutExchange);
 }
 
 }
 
 | 
然后是死信消费者
| package com.ruben.rubenproducerdemo.consumer;
 import com.alibaba.fastjson.JSON;
 import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Channel;
 import com.ruben.rubenproducerdemo.config.RabbitmqConfig;
 import com.ruben.rubenproducerdemo.pojo.to.SmsTO;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.amqp.core.AmqpTemplate;
 import org.springframework.amqp.core.Message;
 import org.springframework.amqp.rabbit.annotation.*;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
 import java.io.IOException;
 
 
 
 
 
 
 
 
 
 
 @Slf4j
 @Component
 public class DeadLetterConsumer {
 
 @RabbitHandler
 @RabbitListener(bindings = @QueueBinding(value = @Queue(value = RabbitmqConfig.QUEUE_DEAD_RUBEN_SMS), exchange = @Exchange(value = RabbitmqConfig.EXCHANGE_DEAD_RUBEN, type = "fanout"), key = RabbitmqConfig.ROUTING_KEY_RUBEN))
 public void deadLetterConsume(Message message, Channel channel) throws IOException {
 SmsTO smsTO = JSON.parseObject(message.getBody(), SmsTO.class);
 channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
 log.info("死信队列收到" + smsTO);
 }
 }
 
 | 
配置重试
| spring: rabbitmq:
 addresses: localhost
 port: 5672
 username: guest
 password: guest
 virtual-host: /ruben
 listener:
 simple:
 retry:
 enabled: true
 # 最大重试次数
 max-attempts: 5
 # 重试间隔毫秒
 initial-interval: 3000
 # 手动ack模式
 acknowledge-mode: manual
 
 |