消息队列-Kafka

消息队列:

消息队列,可以解耦消息生产者和消息消费者,二者之间互不影响,只需要关注消息的处理即可;提供路由功能,保证消息可靠传输;提供消息堆积功能,可以进行异步,削峰填谷等功能。

比较:
特性ActiveMQRabbitMQRocketMQKafka
吞吐量万级万级10万10万,高吞吐,一般在大数据进行实时计算,日志采集
topic影响 同机器支持大量topic,可达几百/几千两级,吞吐量小幅下降topic到几百时吞吐量大幅下降
时效性msus,优点低延迟msms
可用性高,主从高,主从非常高,分布式非常高,分布式,多副本
可靠性低概率丢消息基本可靠参数优化,可靠可靠
功能性功能完备基于erlang开发,并发高,延迟低完善,分布式,方便拓展功能较简单,主要在大数据领域多,提供批处理API用于实时处理;提供事件溯源(事件可以在topic保留一段时间)

RocketMQ:阿里出品,广泛使用在订单,交易,流计算,消息推送,日志刘处理,binlog分发等

Kafka:大数据领域事实规划

Kafka介绍:

Kafka是一个分布式流处理系统,流处理系统使它可以像消息队列一样publish或者subscribe消息,分布式提供了容错性,并发处理消息的机制。

  • 日志聚合:Kafka 通常用于收集和聚合分布式系统中产生的日志数据,以便后续的监控、分析和故障排除。
  • 数据流处理:Kafka 可以作为数据流处理平台的基础,用于处理实时数据流,例如事件处理、实时分析和机器学习模型的训练。
  • 数据仓库集成:Kafka 可以将数据传输到数据仓库,如Hadoop或Elasticsearch,以进行高级分析和报告。
  • 应用程序集成:许多应用程序可以使用 Kafka 作为消息中间件来实现异步通信,包括微服务架构、批处理作业等。
  • 流媒体处理:Kafka 可以用于流媒体处理,例如实时监控、事件驱动的应用程序等
架构设计:

Producer - 消息生产者,就是向kafka broker发消息的客户端。
Consumer - 消息消费者,是消息的使用方,负责消费Kafka服务器上的消息。
Topic - 主题,由用户定义并配置在Kafka服务器,用于建立Producer和Consumer之间的订阅关系。生产者发送消息到指定的Topic下,消息者从这个Topic下消费消息,在Kafka集群中,消息存在与topic中,包含键值对和时间戳。

Partition - 消息分区,一个topic可以分为多个 partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。
Broker - 一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
Consumer Group - 消费者分组,用于归组同类消费者。每个consumer属于一个特定的consumer group,多个消费者可以共同消息一个Topic下的消息,每个消费者消费其中的部分消息,这些消费者就组成了一个分组,拥有同一个分组名称,通常也被称为消费者集群。
Offset - 消息在partition中的偏移量。每一条消息在partition都有唯一的偏移量,消息者可以指定偏移量来指定要消费的消息。

  • topic 是逻辑上的概念,而partition 是物理上的概念,每个partition 对应一个log 文件,该log 文件中存储的就是producer 生产的数据。Producer 生产的数据会被不断追加到该log 文件末端,且每条数据都有自己的offset。消费者组中的每个消费者,都会实时记录自己消费到了哪个offset,以便出错恢复时,从上次的位置继续消费。
  • 在 Kafka 的数据文件目录下,一个 Partition 对应一个唯一的文件夹。如果有 4 个 Topic,每个 Topic 有 5 个 Partition,那么一共会有 4 * 5 = 20 个文件夹。而在 文件夹下,Kafka 消息是采用 Segment File 的存储方式进行存储的。
    Segment File 的大概意思是:将大文件拆分成小文件来存储,这样一个大文件就变成了一段一段(Segment 段)。这样的好处是 IO 加载速度快,不会有很长的 IO 加载时间。Kafka 的消息存储就采用了这种方式。
  • 索引文件的命名统一为数字格式,其名称表示 Kafka 消息的偏移量。我们假设索引文件的数字为 N,那么就代表该索引文件存储的第一条 Kafka 消息的偏移量为 N + 1,而上个文件存储的最后一条 Kafka 消息的偏移量为 N(因为 Kafka 是顺序存储的)。例如00000.index后的第一个368769.index 索引文件,其表示文件存储的第一条 Kafka 消息的偏移量为 368770。而 368769 表示的是 0000.index 这个索引文件的最后一条消息。所以 368769.index 索引文件,其存储的 Kafka 消息偏移量范围为 (368769,737337]。(索引文件存储的是消息索引+物理偏移地址)
工作流程:
  1. producer先从zookeeper的 "/brokers/.../state"节点找到该partition的leader
  2. producer将消息发送给该leader
  3. leader将消息写入本地log
  4. followers从leader pull消息
  5. 写入本地log后向leader发送ACK
  6. leader收到所有ISR中的replication的ACK后,增加HW(high watermark,最后commit 的offset)并向producer发送ACK
文件存储优势:

写消息:1,消息从java堆写到page cache(物理内存);2,异步线程刷盘,写入磁盘。

读消息:1,从page cache转入socket发送;2,物理内存没有查磁盘,写入内存后发送。

高效:

  • topic中的一个parition大文件分成多个小文件,方便定期清除已消费文件。
  • 索引可以做到快速定位信息并确定返回的最大大小。
  • index元数据全部映射到内存,避免segment file的磁盘IO。
  • 索引文件稀疏存储,减少index文件大小。
数据一致性:

ACK:

  • acks=0:生产者发送消息,不确认
  • acks=1:需要分区leader确认
  • acks=-1:全部确认
  • retries:最大重试次数
  • retry.backoff.ms:重试时间间隔

ISR:

  • ISR(In-Sync-Replicas):和leader保持同步的副本集合,可以认为可靠。
  • OSR:同步失败的副本集合。通过延迟时间增加,新节点也会先放入。

副本数据一致性:

  • HW (High Watermark)俗称高水位,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个offset之前的消息,对于同一个副本对象而言,其 HW 值不会大于 LEO 值。小于等于 HW 值的所有消息都被认为是“已备份”的(replicated) 。所有分区副本中消息偏移量最小值。
  • LEO(Log End Offset),即日志末端位移(log end offset),记录了该副本底层日志(log)中下一条消息的位移值。注意是下一条消息!也就是说,如果 LEO =8,那么表示该副本保存了 8 条消息,位移值范围是[0, 7]。LEO 的大小相当于当前日志分区中最后一条消息的 offset 值加1,分区 ISR 集合中的每个副本都会维护自身的 LEO,而 ISR 集合中最小的 LEO 即为分区的 HW,对消费者而言只能消费 HW 之前的消息。
  • Follow异常:临时踢出 ISR 集合,恢复后读取HW,将高于HW的信息删除后进行同步,只有LEO大于等于parition的HW后,才能恢复ISR。
  • Leader异常:ISR选举后,新Leader LEO可能低于旧Leader,但是都以Leader为准,follow的LOG高于HW全部删除,重新同步(旧Leader即使恢复,也是follow)
如何防止重复:

生产者端

  1. 幂等生产者:Kafka从0.11.0版本开始引入了幂等生产者的概念。通过设置enable.idempotence=true,可以确保每条消息只会被生产一次,即使生产者重试发送。这适用于单个生产者会话内的消息去重。
  2. 事务:Kafka支持事务,允许生产者将一组消息原子性地写入多个分区。这意味着要么所有消息都成功提交,要么都不提交。事务API包括initTransactionsbeginTransactionsendOffsetsToTransactioncommitTransactionabortTransaction方法。

消费者端

  1. 手动提交Offset:避免自动提交offset,采用手动控制的方式,比如使用commitSync()commitAsync()来确保只有当消息被成功处理后才提交offset。这样可以防止由于consumer崩溃或网络问题导致的消息丢失或重复消费。
  2. 幂等性设计:在业务逻辑层面实现幂等性,例如,在数据库操作时,可以通过唯一键约束来防止重复插入数据;或者在处理消息之前检查是否已经处理过相同的消息ID。
  3. Exactly Once Semantics (EOS):Kafka提供了exactly-once语义的支持,它结合了幂等性和事务功能,确保每条消息只被处理一次,并且能够正确地更新状态(如提交offset)。要启用EOS,需要配置合适的参数并遵循特定的设计模式。
零拷贝:

文件拷贝切换流程

传统文件拷贝流程:

  1. 应用程序发起读请求。
  2. 操作系统将数据从磁盘读入内核空间的缓冲区。
  3. 数据被复制到用户空间的缓冲区供应用程序访问。
  4. 应用程序发起写请求。
  5. 数据再次被复制回内核空间的不同缓冲区(例如socket缓冲区)。
  6. 操作系统将数据从内核缓冲区发送出去(如通过网络)。

采用零拷贝技术的优化流程(以sendfile()为例):

  1. 应用程序调用sendfile()函数,指定源文件描述符(如打开的文件)和目标文件描述符(如打开的socket)。
  2. 操作系统直接在内部处理数据传输,从磁盘读取数据到内核缓冲区。
  3. 然后,它直接将这些数据传递给目标文件描述符(比如直接传输到网络接口),而不必将数据复制到用户空间。

这种方式减少了至少一次的数据复制过程以及相关的上下文切换,从而提高了效率,特别是在处理大文件传输或高吞吐量网络应用时效果尤为显著。

MQ , Kafka
最后修改于:2025年02月20日 22:56

添加新评论