消息队列-RocketMQ

使用场景:

RocketMQ特性:

  • 高吞吐量:可以处理高流量、低延迟的数据流
  • 高可靠性:消息持久化、多副本机制
  • 灵活:支持多种模型,例如点对点pub/sub
  • 顺序消息:可以保证同一个topic内,消息的顺序消费
  • 事务消息:支持分布式事务,确保消息和数据库操作一致性
  • 消息过滤机制:支持基于tag或属性的消息过滤

应用场景:

  • 系统解耦:微服务中,可以通过消息队列解耦各个服务
  • 异步处理:非实时任务可以通过通过队列异步执行
  • 流量削峰:高并发可以缓冲流量
  • 事件架构:实现复杂业务的事件驱动,推动各个阶段的事务流转
  • 日志处理:收集和处理日志,监控数据等
存储模型:
  • CommitLog:存储所有消息的原始日志,固定大小(如1G)。
  • ConsumeQueue:消息逻辑队列,存储CommitLog中的物理偏移,定长存储,固定条目大小单个20字节(commitlog offset + size + tag hashcode)。
  • IndexFile:按照Message Key和时间区块进行快速索引,可选的,用于需要基于消息属性进行快速查找的场景。
  • RocketMQ使用零拷贝机制,CommitLog采用顺序写,提升了写入性能。
  • 提供同步刷盘和异步刷盘。
  • 内存映射文件(MappedFile):文件映射到内存地址空间,减少系统调用开销,读写快。
  • 读写分离,生产者写Log,消费者取队列。

在Java中,java.nio包提供了MappedByteBuffer类来支持内存映射文件的功能,这是实现零拷贝的一种方式。

// 内存映射文件核心实现  
MappedByteBuffer mappedByteBuffer = fileChannel.map(  
    FileChannel.MapMode.READ_WRITE,   // 映射模式:读写模式  
    0,                                // 文件中的起始偏移量  
    fileSize                          // 要映射的区域大小  
);

这段代码创建了一个从文件通道到内存的直接映射,允许程序像操作内存一样操作文件内容,而无需显式地执行读取和写入操作。这样做的好处是减少了数据从内核空间到用户空间的复制次数,从而提高了效率。

可靠性:
  1. 生产者端:同步发送、异步发送+重试、事务消息。
  2. Broker端:同步刷盘、主从复制。
  3. 消费者端:手动提交位移、消费重试、幂等性消费。
防止重复:

防止重复消费

  1. 幂等性设计:这是最根本的解决方案。每个消费者应当设计成对于相同的消息多次处理的结果是一致的。例如,在数据库操作中,使用唯一键约束可以防止插入重复记录。
  2. 消息去重:在消费者端,可以通过维护一个已处理消息的记录(比如使用哈希表或者数据库)来过滤掉重复的消息。这种方法要求对每条消息都有一个全局唯一的ID以便识别。
  3. 消息确认机制(ACK):大多数消息队列提供者支持消息确认机制。当一条消息被成功处理后,消费者会发送一个确认回执给队列。如果队列没有收到确认,它会认为这条消息未被成功处理,并可能重新投递该消息。
  4. 死信队列(DLQ):对于无法处理的消息,可以将其转移到死信队列中进行进一步分析和手动处理。这可以帮助避免因为某些消息导致的无限循环消费问题。

防止重复生产

  1. 事务支持:一些消息队列提供事务支持,允许生产者将消息发送作为事务的一部分。这样,如果事务失败,消息不会被实际发送到队列中。
  2. 消息唯一性检查:在生产者端,可以在发送之前检查消息是否已经存在于队列中。不过,这种做法取决于具体的队列实现以及性能考量,因为它可能会增加额外的开销。
  3. 批量消息的原子性:如果需要发送一批消息并且希望它们作为一个整体被消费,则可以考虑将这些消息封装为单个事务或者使用特定的API来保证这些消息要么全部成功,要么全部失败。
  4. 延迟和重试机制:合理设置消息生产和消费的重试机制,以及适当利用延迟队列,可以减少因网络抖动等原因造成的重复生产情况。
顺序消息:
  1. 全局顺序:一个topic内所有消息路由到一个队列,顺序消费。
  2. 分区顺序:自定义分区选择算法,相同分区键的消息按顺序进行消费
事务消息:

RocketMQ的事务消息主要通过两阶段提交(2PC)和补偿机制保障分布式事务的一致性,例如在订单服务中,可以先发送一个减库存的半消息(对消费者不可见),随后生产者执行本地事务,执行成功后提交事务消息给消费者,否则回滚,RocketMQ会定期轮询订单服务以确认事务最终状态。

  • 原子性:半事务消息对消费者不可见,确保了事务的原子性。
  • 持久性:半事务消息会被持久化,即使Broker宕机也能恢复。
  • 隔离性:通过特殊的队列存储半事务消息,实现了隔离。
  • 一致性:通过二阶段提交和回查机制保证了最终一致性。
import org.apache.rocketmq.client.exception.MQClientException;  
import org.apache.rocketmq.client.producer.TransactionListener;  
import org.apache.rocketmq.client.producer.TransactionMQProducer;  
import org.apache.rocketmq.client.producer.TransactionSendResult;  
import org.apache.rocketmq.common.message.Message;  
import org.apache.rocketmq.common.message.MessageExt;  

public class OrderService {  

    public static void main(String[] args) throws MQClientException {  
        // 初始化事务消息生产者  
        TransactionMQProducer producer = new TransactionMQProducer("order_transaction_group");  
        producer.setNamesrvAddr("localhost:9876");  

        // 设置事务监听器  
        TransactionListener transactionListener = new OrderTransactionListener();  
        producer.setTransactionListener(transactionListener);  
        producer.start();  

        try {  
            // 构建减库存半消息  
            Message msg = new Message("OrderTopic", "TagA", "KEY1",  
                                      ("OrderID:12345, SKU:67890, Quantity:1").getBytes());  

            // 发送事务消息  
            TransactionSendResult sendResult = producer.sendMessageInTransaction(msg, null);  
            System.out.printf("SendResult: %s%n", sendResult);  

        } catch (MQClientException e) {  
            e.printStackTrace();  
        }  

        Runtime.getRuntime().addShutdownHook(new Thread(producer::shutdown));  
    }  
}  

class OrderTransactionListener implements TransactionListener {  

    @Override  
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {  
        try {  
            // 执行本地事务:创建订单  
            boolean isOrderCreated = createOrder(msg);  

            if (isOrderCreated) {  
                return LocalTransactionState.COMMIT_MESSAGE; // 提交库存扣减消息  
            } else {  
                return LocalTransactionState.ROLLBACK_MESSAGE; // 回滚库存扣减消息  
            }  
        } catch (Exception e) {  
            e.printStackTrace();  
            return LocalTransactionState.UNKNOW; // 未知状态  
        }  
    }  

    @Override  
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {  
        // 回查本地事务状态  
        boolean isOrderConfirmed = checkOrderStatus(msg.getKeys());  
        return isOrderConfirmed ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;  
    }  

    private boolean createOrder(Message msg) {  
        // 模拟订单创建逻辑(通常为数据库操作)  
        System.out.println("Executing local transaction to create order: " + new String(msg.getBody()));  
        // 这里应该包括数据库的insert操作和相关的业务逻辑  
        return true; // 返回订单创建成功状态  
    }  

    private boolean checkOrderStatus(String orderKey) {  
        // 模拟查询订单状态  
        System.out.println("Checking order status for key: " + orderKey);  
        // 这里通常涉及查询数据库以确认订单的最终状态  
        return true; // 假设订单已确认  
    }  
}
//消费者示例
public class DelayMessageConsumer {  
    public static void main(String[] args) throws Exception {  
        // 创建消费者实例  
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DelayMessageConsumerGroup");  
        consumer.setNamesrvAddr("localhost:9876");  

        // 订阅主题  
        consumer.subscribe("DelayTopic", "*");  

        // 注册消息监听器  
        consumer.registerMessageListener(new MessageListenerConcurrently() {  
            @Override  
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {  
                for (MessageExt msg : msgs) {  
                    System.out.printf("Received Message: %s, Delay Time: %dms%n", new String(msg.getBody()), (System.currentTimeMillis() - msg.getStoreTimestamp()));  
                }  
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
            }  
        });  

        // 启动消费者  
        consumer.start();  
    }  
}
最后修改于:2025年02月21日 22:54

添加新评论