消息队列-Kafka
消息队列:
消息队列,可以解耦消息生产者和消息消费者,二者之间互不影响,只需要关注消息的处理即可;提供路由功能,保证消息可靠传输;提供消息堆积功能,可以进行异步,削峰填谷等功能。
比较:
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
吞吐量 | 万级 | 万级 | 10万 | 10万,高吞吐,一般在大数据进行实时计算,日志采集 |
topic影响 | 同机器支持大量topic,可达几百/几千两级,吞吐量小幅下降 | topic到几百时吞吐量大幅下降 | ||
时效性 | ms | us,优点低延迟 | ms | ms |
可用性 | 高,主从 | 高,主从 | 非常高,分布式 | 非常高,分布式,多副本 |
可靠性 | 低概率丢消息 | 基本可靠 | 参数优化,可靠 | 可靠 |
功能性 | 功能完备 | 基于erlang开发,并发高,延迟低 | 完善,分布式,方便拓展 | 功能较简单,主要在大数据领域多,提供批处理API用于实时处理;提供事件溯源(事件可以在topic保留一段时间) |
RocketMQ:阿里出品,广泛使用在订单,交易,流计算,消息推送,日志刘处理,binlog分发等
Kafka:大数据领域事实规划
Kafka介绍:
Kafka是一个分布式流处理系统,流处理系统使它可以像消息队列一样publish或者subscribe消息,分布式提供了容错性,并发处理消息的机制。
- 日志聚合:Kafka 通常用于收集和聚合分布式系统中产生的日志数据,以便后续的监控、分析和故障排除。
- 数据流处理:Kafka 可以作为数据流处理平台的基础,用于处理实时数据流,例如事件处理、实时分析和机器学习模型的训练。
- 数据仓库集成:Kafka 可以将数据传输到数据仓库,如Hadoop或Elasticsearch,以进行高级分析和报告。
- 应用程序集成:许多应用程序可以使用 Kafka 作为消息中间件来实现异步通信,包括微服务架构、批处理作业等。
- 流媒体处理:Kafka 可以用于流媒体处理,例如实时监控、事件驱动的应用程序等
架构设计:
Producer - 消息生产者,就是向kafka broker发消息的客户端。
Consumer - 消息消费者,是消息的使用方,负责消费Kafka服务器上的消息。
Topic - 主题,由用户定义并配置在Kafka服务器,用于建立Producer和Consumer之间的订阅关系。生产者发送消息到指定的Topic下,消息者从这个Topic下消费消息,在Kafka集群中,消息存在与topic中,包含键值对和时间戳。
Partition - 消息分区,一个topic可以分为多个 partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。
Broker - 一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
Consumer Group - 消费者分组,用于归组同类消费者。每个consumer属于一个特定的consumer group,多个消费者可以共同消息一个Topic下的消息,每个消费者消费其中的部分消息,这些消费者就组成了一个分组,拥有同一个分组名称,通常也被称为消费者集群。
Offset - 消息在partition中的偏移量。每一条消息在partition都有唯一的偏移量,消息者可以指定偏移量来指定要消费的消息。
- topic 是逻辑上的概念,而partition 是物理上的概念,每个partition 对应一个log 文件,该log 文件中存储的就是producer 生产的数据。Producer 生产的数据会被不断追加到该log 文件末端,且每条数据都有自己的offset。消费者组中的每个消费者,都会实时记录自己消费到了哪个offset,以便出错恢复时,从上次的位置继续消费。
- 在 Kafka 的数据文件目录下,一个 Partition 对应一个唯一的文件夹。如果有 4 个 Topic,每个 Topic 有 5 个 Partition,那么一共会有 4 * 5 = 20 个文件夹。而在 文件夹下,Kafka 消息是采用 Segment File 的存储方式进行存储的。
Segment File 的大概意思是:将大文件拆分成小文件来存储,这样一个大文件就变成了一段一段(Segment 段)。这样的好处是 IO 加载速度快,不会有很长的 IO 加载时间。Kafka 的消息存储就采用了这种方式。 - 索引文件的命名统一为数字格式,其名称表示 Kafka 消息的偏移量。我们假设索引文件的数字为 N,那么就代表该索引文件存储的第一条 Kafka 消息的偏移量为 N + 1,而上个文件存储的最后一条 Kafka 消息的偏移量为 N(因为 Kafka 是顺序存储的)。例如00000.index后的第一个368769.index 索引文件,其表示文件存储的第一条 Kafka 消息的偏移量为 368770。而 368769 表示的是 0000.index 这个索引文件的最后一条消息。所以 368769.index 索引文件,其存储的 Kafka 消息偏移量范围为 (368769,737337]。(索引文件存储的是消息索引+物理偏移地址)
工作流程:
- producer先从zookeeper的 "/brokers/.../state"节点找到该partition的leader
- producer将消息发送给该leader
- leader将消息写入本地log
- followers从leader pull消息
- 写入本地log后向leader发送ACK
- leader收到所有ISR中的replication的ACK后,增加HW(high watermark,最后commit 的offset)并向producer发送ACK
文件存储优势:
写消息:1,消息从java堆写到page cache(物理内存);2,异步线程刷盘,写入磁盘。
读消息:1,从page cache转入socket发送;2,物理内存没有查磁盘,写入内存后发送。
高效:
- topic中的一个parition大文件分成多个小文件,方便定期清除已消费文件。
- 索引可以做到快速定位信息并确定返回的最大大小。
- index元数据全部映射到内存,避免segment file的磁盘IO。
- 索引文件稀疏存储,减少index文件大小。
数据一致性:
ACK:
- acks=0:生产者发送消息,不确认
- acks=1:需要分区leader确认
- acks=-1:全部确认
- retries:最大重试次数
- retry.backoff.ms:重试时间间隔
ISR:
- ISR(In-Sync-Replicas):和leader保持同步的副本集合,可以认为可靠。
- OSR:同步失败的副本集合。通过延迟时间增加,新节点也会先放入。
副本数据一致性:
- HW (High Watermark)俗称高水位,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个offset之前的消息,对于同一个副本对象而言,其 HW 值不会大于 LEO 值。小于等于 HW 值的所有消息都被认为是“已备份”的(replicated) 。所有分区副本中消息偏移量最小值。
- LEO(Log End Offset),即日志末端位移(log end offset),记录了该副本底层日志(log)中下一条消息的位移值。注意是下一条消息!也就是说,如果 LEO =8,那么表示该副本保存了 8 条消息,位移值范围是[0, 7]。LEO 的大小相当于当前日志分区中最后一条消息的 offset 值加1,分区 ISR 集合中的每个副本都会维护自身的 LEO,而 ISR 集合中最小的 LEO 即为分区的 HW,对消费者而言只能消费 HW 之前的消息。
- Follow异常:临时踢出 ISR 集合,恢复后读取HW,将高于HW的信息删除后进行同步,只有LEO大于等于parition的HW后,才能恢复ISR。
- Leader异常:ISR选举后,新Leader LEO可能低于旧Leader,但是都以Leader为准,follow的LOG高于HW全部删除,重新同步(旧Leader即使恢复,也是follow)
如何防止重复:
生产者端
- 幂等生产者:Kafka从0.11.0版本开始引入了幂等生产者的概念。通过设置
enable.idempotence=true
,可以确保每条消息只会被生产一次,即使生产者重试发送。这适用于单个生产者会话内的消息去重。 - 事务:Kafka支持事务,允许生产者将一组消息原子性地写入多个分区。这意味着要么所有消息都成功提交,要么都不提交。事务API包括
initTransactions
、beginTransaction
、sendOffsetsToTransaction
、commitTransaction
和abortTransaction
方法。
消费者端
- 手动提交Offset:避免自动提交offset,采用手动控制的方式,比如使用
commitSync()
或commitAsync()
来确保只有当消息被成功处理后才提交offset。这样可以防止由于consumer崩溃或网络问题导致的消息丢失或重复消费。 - 幂等性设计:在业务逻辑层面实现幂等性,例如,在数据库操作时,可以通过唯一键约束来防止重复插入数据;或者在处理消息之前检查是否已经处理过相同的消息ID。
- Exactly Once Semantics (EOS):Kafka提供了exactly-once语义的支持,它结合了幂等性和事务功能,确保每条消息只被处理一次,并且能够正确地更新状态(如提交offset)。要启用EOS,需要配置合适的参数并遵循特定的设计模式。
零拷贝:
文件拷贝切换流程
传统文件拷贝流程:
- 应用程序发起读请求。
- 操作系统将数据从磁盘读入内核空间的缓冲区。
- 数据被复制到用户空间的缓冲区供应用程序访问。
- 应用程序发起写请求。
- 数据再次被复制回内核空间的不同缓冲区(例如socket缓冲区)。
- 操作系统将数据从内核缓冲区发送出去(如通过网络)。
采用零拷贝技术的优化流程(以sendfile()
为例):
- 应用程序调用
sendfile()
函数,指定源文件描述符(如打开的文件)和目标文件描述符(如打开的socket)。 - 操作系统直接在内部处理数据传输,从磁盘读取数据到内核缓冲区。
- 然后,它直接将这些数据传递给目标文件描述符(比如直接传输到网络接口),而不必将数据复制到用户空间。
这种方式减少了至少一次的数据复制过程以及相关的上下文切换,从而提高了效率,特别是在处理大文件传输或高吞吐量网络应用时效果尤为显著。