消息队列-RocketMQ
使用场景:
RocketMQ特性:
- 高吞吐量:可以处理高流量、低延迟的数据流
- 高可靠性:消息持久化、多副本机制
- 灵活:支持多种模型,例如点对点pub/sub
- 顺序消息:可以保证同一个topic内,消息的顺序消费
- 事务消息:支持分布式事务,确保消息和数据库操作一致性
- 消息过滤机制:支持基于tag或属性的消息过滤
应用场景:
- 系统解耦:微服务中,可以通过消息队列解耦各个服务
- 异步处理:非实时任务可以通过通过队列异步执行
- 流量削峰:高并发可以缓冲流量
- 事件架构:实现复杂业务的事件驱动,推动各个阶段的事务流转
- 日志处理:收集和处理日志,监控数据等
存储模型:
CommitLog
:存储所有消息的原始日志,固定大小(如1G)。ConsumeQueue
:消息逻辑队列,存储CommitLog
中的物理偏移,定长存储,固定条目大小单个20字节(commitlog offset + size + tag hashcode)。IndexFile
:按照Message Key和时间区块进行快速索引,可选的,用于需要基于消息属性进行快速查找的场景。- RocketMQ使用零拷贝机制,
CommitLog
采用顺序写,提升了写入性能。 - 提供同步刷盘和异步刷盘。
- 内存映射文件(MappedFile):文件映射到内存地址空间,减少系统调用开销,读写快。
- 读写分离,生产者写Log,消费者取队列。
在Java中,java.nio
包提供了MappedByteBuffer
类来支持内存映射文件的功能,这是实现零拷贝的一种方式。
// 内存映射文件核心实现
MappedByteBuffer mappedByteBuffer = fileChannel.map(
FileChannel.MapMode.READ_WRITE, // 映射模式:读写模式
0, // 文件中的起始偏移量
fileSize // 要映射的区域大小
);
这段代码创建了一个从文件通道到内存的直接映射,允许程序像操作内存一样操作文件内容,而无需显式地执行读取和写入操作。这样做的好处是减少了数据从内核空间到用户空间的复制次数,从而提高了效率。
可靠性:
- 生产者端:同步发送、异步发送+重试、事务消息。
- Broker端:同步刷盘、主从复制。
- 消费者端:手动提交位移、消费重试、幂等性消费。
防止重复:
防止重复消费
- 幂等性设计:这是最根本的解决方案。每个消费者应当设计成对于相同的消息多次处理的结果是一致的。例如,在数据库操作中,使用唯一键约束可以防止插入重复记录。
- 消息去重:在消费者端,可以通过维护一个已处理消息的记录(比如使用哈希表或者数据库)来过滤掉重复的消息。这种方法要求对每条消息都有一个全局唯一的ID以便识别。
- 消息确认机制(ACK):大多数消息队列提供者支持消息确认机制。当一条消息被成功处理后,消费者会发送一个确认回执给队列。如果队列没有收到确认,它会认为这条消息未被成功处理,并可能重新投递该消息。
- 死信队列(DLQ):对于无法处理的消息,可以将其转移到死信队列中进行进一步分析和手动处理。这可以帮助避免因为某些消息导致的无限循环消费问题。
防止重复生产
- 事务支持:一些消息队列提供事务支持,允许生产者将消息发送作为事务的一部分。这样,如果事务失败,消息不会被实际发送到队列中。
- 消息唯一性检查:在生产者端,可以在发送之前检查消息是否已经存在于队列中。不过,这种做法取决于具体的队列实现以及性能考量,因为它可能会增加额外的开销。
- 批量消息的原子性:如果需要发送一批消息并且希望它们作为一个整体被消费,则可以考虑将这些消息封装为单个事务或者使用特定的API来保证这些消息要么全部成功,要么全部失败。
- 延迟和重试机制:合理设置消息生产和消费的重试机制,以及适当利用延迟队列,可以减少因网络抖动等原因造成的重复生产情况。
顺序消息:
- 全局顺序:一个topic内所有消息路由到一个队列,顺序消费。
- 分区顺序:自定义分区选择算法,相同分区键的消息按顺序进行消费
事务消息:
RocketMQ的事务消息主要通过两阶段提交(2PC)和补偿机制保障分布式事务的一致性,例如在订单服务中,可以先发送一个减库存的半消息(对消费者不可见),随后生产者执行本地事务,执行成功后提交事务消息给消费者,否则回滚,RocketMQ会定期轮询订单服务以确认事务最终状态。
- 原子性:半事务消息对消费者不可见,确保了事务的原子性。
- 持久性:半事务消息会被持久化,即使Broker宕机也能恢复。
- 隔离性:通过特殊的队列存储半事务消息,实现了隔离。
- 一致性:通过二阶段提交和回查机制保证了最终一致性。
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
public class OrderService {
public static void main(String[] args) throws MQClientException {
// 初始化事务消息生产者
TransactionMQProducer producer = new TransactionMQProducer("order_transaction_group");
producer.setNamesrvAddr("localhost:9876");
// 设置事务监听器
TransactionListener transactionListener = new OrderTransactionListener();
producer.setTransactionListener(transactionListener);
producer.start();
try {
// 构建减库存半消息
Message msg = new Message("OrderTopic", "TagA", "KEY1",
("OrderID:12345, SKU:67890, Quantity:1").getBytes());
// 发送事务消息
TransactionSendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("SendResult: %s%n", sendResult);
} catch (MQClientException e) {
e.printStackTrace();
}
Runtime.getRuntime().addShutdownHook(new Thread(producer::shutdown));
}
}
class OrderTransactionListener implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 执行本地事务:创建订单
boolean isOrderCreated = createOrder(msg);
if (isOrderCreated) {
return LocalTransactionState.COMMIT_MESSAGE; // 提交库存扣减消息
} else {
return LocalTransactionState.ROLLBACK_MESSAGE; // 回滚库存扣减消息
}
} catch (Exception e) {
e.printStackTrace();
return LocalTransactionState.UNKNOW; // 未知状态
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 回查本地事务状态
boolean isOrderConfirmed = checkOrderStatus(msg.getKeys());
return isOrderConfirmed ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
}
private boolean createOrder(Message msg) {
// 模拟订单创建逻辑(通常为数据库操作)
System.out.println("Executing local transaction to create order: " + new String(msg.getBody()));
// 这里应该包括数据库的insert操作和相关的业务逻辑
return true; // 返回订单创建成功状态
}
private boolean checkOrderStatus(String orderKey) {
// 模拟查询订单状态
System.out.println("Checking order status for key: " + orderKey);
// 这里通常涉及查询数据库以确认订单的最终状态
return true; // 假设订单已确认
}
}
//消费者示例
public class DelayMessageConsumer {
public static void main(String[] args) throws Exception {
// 创建消费者实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DelayMessageConsumerGroup");
consumer.setNamesrvAddr("localhost:9876");
// 订阅主题
consumer.subscribe("DelayTopic", "*");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("Received Message: %s, Delay Time: %dms%n", new String(msg.getBody()), (System.currentTimeMillis() - msg.getStoreTimestamp()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
}
}