说明:本系列笔记总结自
雷丰阳
老师教学项目《谷粒商城》
- 视频地址:直达BiliBili
- 完整项目地址:直达gitee
- 项目资料获取:
一、MQ概述
-
大多应用,可通过消息服务中间件来提升系统异步通信、扩展解耦能力
-
消息服务中两个重要概念:消息代理(messagebroker)和目的地(destination)
当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地。
- 消息队列主要有两种形式的目的地
- 1.队列(queue):点对点消息通信(point-to-point)
- 2.主题(topic):发布(publish)/订阅(subscribe)消息通信
- 点对点式:
- 消息发送者发送消息,消息代理将亘放入一个队列中,消息接收者从队列中获取消息内容,消息读取后被移出队列
- 消息只有唯一的发送者和接受者,但并不是说只能有一个接收者
-
发布订阅式.
发送者(发布者〕发法消息到主题,多个接收者(订阅者)监听(订阅)这个主题,那么就会在消息到达时同时收到消息 -
JMS(Java Message Service)JAVA消息服务
- 基于JVM消息代理的规范。ActiveMQ、ActiveMQ、HornetMQ是JMS实现
- https://blog.csdn.net/jiuqiyuliang/article/details/46701559
- AMQP(Advanced Message Queuing Protocol)
- 高级消息队列协议,也是一个消息代理的规范,兼容JMS
- RabbitMQ是AMQP的实现
- Spring支持
spring-jms
提供了对JMS
的支持spring-rabbit
提供了对AMQP
的支持- 需要
onnectionFactory
的实现来连接消息代理 - 提供
JmsTemplate
、RabbitTemplate
来发送消息 - @JmsListener(JMS)、@RabbitListener(AMQP)注解在方法上监听消息代理发布的消息
- @EnabIeJms、@EnableRabbit开启支持
- SpringBoot自动配置
- JmsAutoConfiguration
- RabbitAutoConfiguration
- 市面的MQ产品
ActiveMQ、RabbitMQ、RocketMQ、Kafka
二、核心概念
RabbitMQ简介:RabbitMQ是—个由erlang语言开发的AMQP(AdvanvedMessageQueueProtocoI)的开源实现。
核心概念:
- Message:消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)
- Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序
- Exchange:交唤器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
- Exchange有4种类型:direct默认fanout,topic,和headers,不同类型的Exchange转发消息的策略有所区别
- consumer:从消息队列中取得消息的客户端应用程序。
- VirtualHost:虚拟主机,表示一扌比交器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在连接时指定,RabbitMQ默认的vhost是
/
。 - Broker:表示消息队列服务器实体
三、交换机Exchange 类型
Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct直接、fanout扇出、topic主题(发布订阅)、headers 。
14是点对点,23是发布订阅。4性能比较低
headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型
(1) direct
direct 交换器
消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式。
(2) fanout
fanout 交换器
每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。
(3) topic
topic 交换器
topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“*”。
#
匹配0个或多个单词,
*
匹配不多不少一个单词。
四、docker安装
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
docker update rabbitmq --restart=always
- 4369 25672 Ealang发现&集群端口
- 5672 5671 AMQP端口
- 15672 web管理后台端口(默认账号密码:guest)
- 61613 61614 STOMP协议端口
- 1883 8883 MQTT协议端口
浏览器访问:http://localhost:15672
(1) 新增交换机
(2) 新增队列
(3) 绑定交换机-队列
(4) 测试发布消息
五、Java 客户端访问
详细信息可以参考:https://blog.csdn.net/kavito/article/details/91403659?spm=1001.2014.3001.5506
RabbitMQ 支持多种语言访问,以 Java 为例看下一般使用 RabbitMQ 的步骤。
pom中添加依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.1.0</version>
</dependency>
消息生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
//设置 RabbitMQ 地址
factory.setHost("localhost");
//建立到代理服务器到连接
Connection conn = factory.newConnection();
//获得信道
Channel channel = conn.createChannel();
//声明交换器
String exchangeName = "hello-exchange";
channel.exchangeDeclare(exchangeName, "direct", true);
String routingKey = "hola";
//发布消息
byte[] messageBodyBytes = "quit".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
channel.close();
conn.close();
}
}
消息消费者
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("localhost");
//建立到代理服务器到连接
Connection conn = factory.newConnection();
//获得信道
final Channel channel = conn.createChannel();
//声明交换器
String exchangeName = "hello-exchange";
channel.exchangeDeclare(exchangeName, "direct", true);
//声明队列
String queueName = channel.queueDeclare().getQueue();
String routingKey = "hola";
//绑定队列,通过键 hola 将队列和交换器绑定起来
channel.queueBind(queueName, exchangeName, routingKey);
while(true) {
//消费消息
boolean autoAck = false;
String consumerTag = "";
channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
System.out.println("消费的路由键:" + routingKey);
System.out.println("消费的内容类型:" + contentType);
long deliveryTag = envelope.getDeliveryTag();
//确认消息
channel.basicAck(deliveryTag, false);
System.out.println("消费的消息体内容:");
String bodyStr = new String(body, "UTF-8");
System.out.println(bodyStr);
}
});
}
}
}
六、SpringBoot集成
(1) 简单测试
导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
编写配置
#基本连接信息
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
主启动类上开启配置
@EnableRabbit
测试代码
@Slf4j
@SpringBootTest
class GulimallOrderApplicationTests {
@Autowired
AmqpAdmin amqpAdmin;
@Autowired
RabbitTemplate rabbitTemplate;
//发送消息
@Test
void sendMessageTest(){
OrderReturnReasonEntity entity = new OrderReturnReasonEntity();
entity.setId(1L);
entity.setCreateTime(new Date());
//1.发送消息,如果发送对象使用序列化机制,将对象写出去,对象必须实现Serializable
String s = "Hello World";
//2.发送的消息变为json格式
for (int i = 0;i<10;i++) {
if(i%2==0) {
entity.setName("Vc" + i);
rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", entity);
}else {
OrderEntity orderEntity = new OrderEntity();
orderEntity.setOrderSn(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",orderEntity);
}
log.info("消息发送完成{}",entity);
}
}
//创建交换机
@Test
void createExchange() {
DirectExchange directExchange = new DirectExchange("hello-java-exchange",true,false);
amqpAdmin.declareExchange(directExchange);
log.info("exchange:[{}]创建成功","hello-java-exchange");
}
//创建队列
@Test
void createQueue() {
Queue queue = new Queue("hello-java-queue",true,false,false);
amqpAdmin.declareQueue(queue);
log.info("queue:[{}]创建成功","hello-java-queue");
}
//绑定交换机-队列
@Test
void createBinding(){
Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE,
"hello-java-exchange",
"hello.java",null);
amqpAdmin.declareBinding(binding);
log.info("binding:[{}]创建成功","hello-java-binding");
}
}
监听队列方法
@Service("orderItemService")
@RabbitListener(queues = {"hello-java-queue"})//queues 声明需要监听的所有队列
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {
@Override
public PageUtils queryPage(Map<String, Object> params) {
IPage<OrderItemEntity> page = this.page(
new Query<OrderItemEntity>().getPage(params),
new QueryWrapper<OrderItemEntity>()
);
return new PageUtils(page);
}
@RabbitHandler
public void reciveMessage(Message msg, OrderReturnReasonEntity content, Channel channel){
//消息头
MessageProperties properties = msg.getMessageProperties();
//消息体
byte[] body = msg.getBody();
//System.out.println("接收到消息...内容:"+msg+"==>类型"+content);
//System.out.println("消息头:"+properties);
//System.out.println("消息体:"+body);
//Channel内按顺序自增
long deliveryTag = msg.getMessageProperties().getDeliveryTag();
//签收消息,multiple是否批量签收消息;拒签消息,requeue=true发回服务器,服务器重新入队,false丢弃消息
try {
if(deliveryTag%2==0){
channel.basicAck(deliveryTag,false);
System.out.println("签收了消息..."+deliveryTag);
}else {
channel.basicNack(deliveryTag,false,false);
System.out.println("拒签了消息..."+deliveryTag);
}
}catch (Exception e){
//网络中断
}
System.out.println("接收到消息...内容:"+content);
}
@RabbitHandler
public void reciveMessage2(OrderEntity content){
System.out.println("接收到消息...内容:"+content);
}
}
(2) 配置集成
编写其他配置信息
# 开启发送端确认
spring.rabbitmq.publisher-confirm-type=correlated
# 开启发送端消息抵达队列的确认
spring.rabbitmq.publisher-returns=true
# 只要抵达队列,以异步发送优先回调我们这个returnconfirm
spring.rabbitmq.template.mandatory=true
# 开启手动处理队列消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual
编写配置类
这两个配置不能放到同一个类中,否则会产生循环依赖
@Configuration
public class MyRabbitMessageConfig {
/**
* 使用JSON序列化机制,进行消息转换
* @return
*/
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
@Configuration
public class MyRabbitConfig {
@Autowired
RabbitTemplate rabbitTemplate;
//定制RabbitTemplate
@PostConstruct //注解表示MyRabbitConfig对象创建完成以后,执行这个方法
public void initRabbitTemplate() {
//设置确认回调(送到交换机时自动触发)
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/*
correlationData当前消息的唯一关联数据(消息的唯一id),
ack消息是否成功收到
cause失败原因
*/
@Override
public void confirm(@Nullable CorrelationData correlationData, boolean b, @Nullable String s) {
System.out.println("confirm...CorrelationData[" + correlationData + "]==>[" + b + "]==>[" + s + "]");
}
});
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
//只要消息没有投递到指定队列,就触发这个失败回调(从交换机通过路由键送达队列失败时调用)
System.out.println("失败回调:"+returnedMessage.getMessage());
returnedMessage.getExchange();
returnedMessage.getReplyCode();
returnedMessage.getReplyText();
returnedMessage.getRoutingKey();
}
});
}
}
(3) 配置类自动创建
@Configuration
public class MyMQConfig {
@Bean //结合死信模拟延迟队列-A(当到达指定ttl事件后死信队列中的信息会被重新投递)
public Queue orderDelayQueue() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "order-event-exchange"); //60秒后发给指定交换机
arguments.put("x-dead-letter-routing-key", "order.release.order"); //交换机所使用的路由键
arguments.put("x-message-ttl", 60000); //60000毫秒即60秒后信息转出
Queue orderDelayQueue = new Queue("order.delay.queue", true, false, false, arguments);
return orderDelayQueue;
}
@Bean //信死后投递的队列-B
public Queue orderReleaseQueue() {
return new Queue("order.release.order.queue", true, false, false);
}
@Bean //秒杀业务队列-C
public Queue orderSeckillOrderQueue(){
return new Queue("order.seckill.order.queue",true,false,false);
}
@Bean //一个交换机
public Exchange orderEventExchange() {
return new TopicExchange("order-event-exchange", true, false);
}
@Bean //绑定延迟队列->A
public Binding orderCreateBingding() {
return new Binding("order.delay.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.create.order", null);
}
@Bean //绑定信死后投递的队列->B
public Binding orderReleaseBingding() {
return new Binding("order.release.order.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.order", null);
}
@Bean //绑定秒杀业务队列->C
public Binding orderSeckillOrderQueueBinding(){
return new Binding("order.seckill.order.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.seckill.order",null);
}
}
(4) 监听队列
创建listener包存放各个队列监听类
@Component
@RabbitListener(queues = "order.release.order.queue") //监听信死后投递的队列-B
public class OrderCloseListener {
@Autowired
OrderService orderService;
@RabbitHandler
public void listener(OrderEntity entity, Channel channel, Message msg) throws IOException {
try {
System.out.println("收到过期的订单信息:准备关闭订单" + entity.getOrderSn());
orderService.closeOrder(entity);
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false); //手动ack确认
} catch (Exception e) {
System.out.println("订单关闭异常,库存解锁异常" + e.getMessage());
channel.basicReject(msg.getMessageProperties().getDeliveryTag(), true); //拒接消息
}
}
}
@Slf4j
@Component
@RabbitListener(queues = "order.seckill.order.queue") //监听秒杀业务队列-C
public class OrderSeckillListener {
@Autowired
OrderService orderService;
@RabbitHandler
public void listener(SeckillOrderTo orderTo, Channel channel, Message msg) throws IOException {
try {
log.info("准备创建秒杀单的详细信息...");
orderService.createSeckillOrder(orderTo);
//不批量处理
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
//将消息重新回队
channel.basicReject(msg.getMessageProperties().getDeliveryTag(), true);
}
}
}
(5) 业务代码
//远程锁库存
R r = wmsFeignService.orderLockStock(lockVo);
if (r.getCode() == 0) {
//成功(rabbitMQ消息通知)
responseVo.setOrder(order.getOrder());
//消息发给延迟队列,半小时后重新投递被消费,执行自动解锁
rabbitTemplate.convertAndSend("order-event-exchange", "order.create.order", order.getOrder());
return responseVo;
} else {
...
}
if (count == 1) {
skuStocked = true;
WareOrderTaskDetailEntity detailEntity = new WareOrderTaskDetailEntity(null, skuId, "", hasStock.getNum(), taskEntity.getId(), wareId, 1);
orderDetailService.save(detailEntity);
StockLockedTo to = new StockLockedTo();
StockDetailTo detailTo = new StockDetailTo();
to.setId(taskEntity.getId());
BeanUtils.copyProperties(detailEntity, detailTo);
//防止回滚之后找不到数据,所以保存完整库存单
to.setDetail(detailTo);
//也是锁库存,发送给延迟队列,时间一到就执行解锁库存,通过幂等性保证程序不会出错
rabbitTemplate.convertAndSend("stock-event-exchange", "stock.locked", to);
break;
} else {
//当前仓库库存不足,尝试下一个仓库
}
七、集群搭建
后续补充。。。