Kafka权威指南
Kafka 权威指南一书学习。
参考:
初识Kafka
Apache Kafka(简称 Kafka)。
发布与订阅消息系统
发布与订阅消息系统,是数据驱动型应用程序的关键组件。
数据(消息)的发送者(发布者)不会直接把消息发送给接收者,这是发布与订阅消息系统的一个特点。发布者以某种方式对消息进行分类,接收者(订阅者)通过订阅它们来接收特定类型的消息。
发布与订阅系统一般会有一个 broker,也就是发布消息的地方。
Kafka登场
Kafka 就是为了解决上述问题而设计的一款基于发布与订阅模式的消息系统。它一般被称为 分布式提交日志 或 分布式流式平台。
文件系统或数据库提交日志旨在保持事务的持久化记录,通过重放这些日志可以重建系统状态。同样,Kafka 的数据是按照一定的顺序持久化保存的,并且可以按需读取。此外,Kafka 的数据分布在整个系统中,具备数据故障恢复能力和性能伸缩能力。
消息和批次
Kafka 的数据单元被称为 消息。你可以把消息看作类似数据库中的一条记录。消息由字节数组组成。消息可以有一个可选的元数据,也就是 键,键也是字节数组。
当需要以一种可控的方式将消息写入不同的分区时,需要用到键。最简单的例子就是为键生成一个一致性哈希值,然后用哈希值对主题分区数进行取模,为消息选取分区。这样可以保证具有相同键的消息总是会被写到相同的分区中(前提是分区数量没有变化)。
为了提高效率,消息会被分成批次写入 Kafka。批次包含了一组属于同一个主题和分区的消息。如果每一条消息都单独穿行于网络中,会导致大量的网络开销。把消息分成批次传输可以减少网络开销。不过,这需要在时间延迟和吞吐量之间做出权衡。
模式
消息模式(schema)来定义消息内容。很多 Kafka 开发者喜欢使用 Apache Avro,它提供了一种紧凑的序列化格式,模式和消息体是分开的。
数据格式的一致性对 Kafka 来说很重要,它消除了消息读写操作之间的耦合性。
主题和分区
Kafka 的消息通过 主题 进行分类。主题类似于数据库的表或文件系统的文件夹。
主题可以被分为若干个 分区,一个分区就是一个提交日志。消息会以追加的方式写入分区,然后按照先进先出的顺序读取。
需要注意的是,由于一个主题一般包含多个分区,因此无法在整个主题范围内保证消息的顺序,但可以保证消息在分区内是有序的。
Kafka 通过分区来实现数据的冗余和伸缩。分区可以分布在不同的服务器上,也就是一个主题可以横跨多个服务器。此外,分区可以被复制,相同分区的多个副本可以保存在多个服务器上,以防其中一台服务器发生故障。
通常使用 流 来描述 Kafka 这类系统中的数据。很多时候,人们会把一个主题的数据看成一个流,不管它由多少个分区。流是一组从生产者移动到消费者的数据。
Kafka Stream、Samza 和 Storm 等框架以实时的方式处理消息,这就是所谓的流式处理。区别于离线处理框架(如 Hadoop)被用于在未来某个时刻处理大量的数据。
生产者和消费者
Kafka 的客户端被分为两种类型:
- 生产者:生产者创建消息。一条消息会被发布到一个特定的主题上。默认情况下,生产者会把消息均衡地分布到主题的所有分区中。不过,生产者也可以把消息写入指定的分区(通过消息键和分区器来实现)。
- 消费者:消费者读取消息。消费者会订阅一个或多个主题,并按照消息写入分区的顺序读取它们。消费者通过检查消息的偏移量来区分已经读取过的消息。
偏移量 (不断递增的整数值)是另一种元数据,在创建消息时,Kafka 会把它添加到消息里。在给定的分区中,每一条消息的偏移量都是唯一的,越往后消息的偏移量越大(不一定是严格单调递增)。消费者会把每个分区可能的下一个偏移量保存起来(通常保存在 Kafka 中)。如果消费者关闭或重启,则其读取状态不会丢失。
消费者可以是 消费者群组 的一部分,属于同一群组的一个或多个消费者共同读取一个主题。群组可以保证每个分区只被这个群组中的一个消费者读取。
通过消费者组的方式,消费者可以读取包含大量消息的主题。而且,如果一个消费者失效,那么群组的其他消费者可以接管失效消费者的工作。
broker和集群
一台单独的 Kafka 服务器被称为 broker。它会接收来自生产者的消息,为其设置偏移量,并提交到磁盘保存。它会为消费者提供服务,对读取分区的请求做出响应,并返回已经发布的消息。
根据硬件配置和性能特征的不同,单个 broker 可以轻松处理数千个分区和每秒百万级的消息量。
brokers 组成了集群。每个集群都有一个通常充当了 集群控制器 角色的 broker(自动从活动的集群成员中选举出来)。控制器负责管理工作,包括为 broker 分配分区和监控 broker。
在集群中,一个分区从属于一个 borker,这个 broker 被称为分区的 leader。一个被分配给其他 broker 的分区部分,叫做此分区的 follower。
分区复制提供了分区的消息冗余。如果一个 broker 发生故障,则其中的一个 follower 可以接管它的领导权。所有想要发布消息的生产者必须连接到 leader,但消费者可从 leader 或 follower 处读取消息。
保留消息(在一定限期内)是 Kafka 的一个重要特性,broker 默认的消息保留策略如下。当消息数量达到如下上限时,旧的消息就会过期并被删除。所以,在任意时刻,可用消息的总量都不会超过配置参数所指的大小。主题可以配置自己的保留策略(如用户追踪数据保留几天,而应用指标保留几小时)。
- 要么保留一段时间(如 168h/7d)
- 要么保留消息总量达到一定的字节数(如 1GB)
多集群
随着 broker 数量的增加,最好使用多个集群。原因如下:
- 数据类型分离
- 安全需求隔离
- 多数据中心
Kafka 的消息复制机制只能在单个集群内进行。Kafka 提供了一个叫做 MirrorMaker 的工具,可将消息复制到其他集群中。
为什么选择Kafka
- 多个生产者
- 多个消费者
- 基于磁盘的数据保留
- 伸缩性:集群可以包含上百个 broker。
- 高性能
- 流式平台特性
数据生态系统
起源故事
Kafka 是为了解决领英数据管道问题应运而生的。它的设计目标是提供一个高性能的消息系统,该系统可以处理多种数据类型,并实时提供纯净、结构化的用户活动数据和系统指标。
Kafka 使用 Avro 作为消息序列化框架,每天可以高效处理数十亿级别的指标和用户活动跟踪信息。
2010 年底,Kafka 作为开源项目在 Github 上发布。2011 年成了 Apache 软件基金会孵化器项目。2012 年从服务器项目毕业。
Kafka 项目名称是作者喜欢的作家 Franz Kafka 而来。
安装Kafka
Kafka 使用 ZooKeeper 保存集群元数据和消费者信息。
环境配置
- 操作系统
- Java(OpenJDK)
- ZooKeeper
ZooKeeper配置
为了保证高可用,ZK 以集群(群组)的方式运行。由于使用了再均衡算法,建议应包含奇数个节点。只有当群组中的大多数节点(仲裁)处于可用状态时,ZK 才能处理外部请求。
也就是说,一个包含 3 个节点的群组允许 1 个节点失效。包含 5 个节点的群组允许 2 个节点失效。
不建议一个群组包含超过 7 个节点,因为 ZK 使用了一致性协议,节点过多会降低整个群组的性能。
如果由于客户端连接太多,5 或 7 个节点仍无法支撑负载,则可以考虑增加额外的观察者节点来分摊流量。
一个实例配置:
|
|
initLimit
表示从节点与主节点之间建立初始化连接的时间上限。syncLimit
表示允许从节点与主节点处于不同步状态的时间上限。- 这两个值都是
tickTime
的倍数。- initLimit 实际是 20x2000 ms
- syncLimit 实际是 5x2000 ms
- 服务器 ID(myid) 必须是一个唯一的整数,不一定要从 0 开始,也不要求连续。
- 群组的节点必须通过 3 个端口进行节点间通信。
peerPort
用于节点间通信的 TCP 端口,默认 2888。leaderPort
用于首领选举的 TCP 端口,默认 3888。clientPort
用于客户端连接的 TCP 端口,默认 2181。
安装borker
Kafka 软件包的示例格式 kafka_scalaVersion-kafkaVersion.tgz
(如 kafka_2.13-2.7.0.tgz
)。
请弃用 Kafka 命令行中的 ZK 连接串(--zookeeper
),使用新的 --bootstrap-server
连接串。如果是在集群内部使用命令行,可以指定集群内任一 borker 的主机地址和端口。
|
|
配置broker
常规配置项:
|
|
主题的配置项:
|
|
如何选择分区数量
选择主题的分区数量时,考虑如下因素:
- 主题需要达到多大的吞吐量?
- 从单个分区读取数据的最大吞吐量是多少?
- 可估算生产者向单个分区写入数据的吞吐量。
- 如果消息是按照不同的键写入分区,那么就很难在未来为已有的主题新增分区。所以要根据未来的预期使用量来估算吞吐量。
- 每个 broker 包含的分区数、可用的磁盘空间和网络带宽。
- 避免使用太多分区,因为每个分区都会占用内存和其他资源,还会增加元数据更新和首领选举的时间。
- 是否需要镜像数据?
- 云服务器虚拟机磁盘是否有 IOPS 限制?分区太多会导致 IOPS 数量增加。
综上所述,你可能需要多个分区,但又不能太多。如果要向主题写入和读取 1GBps 的数据,并且每个消费者可以处理 50MBps 的数据,那么至少需要 20 个分区。
如果你无法获得这些信息,根据经验,将分区每天保留的数据限制在 6GB 以内可获得比较理想的效果。先从小容量开始,再根据需要进行扩展。
选择硬件
- 磁盘吞吐量
- 磁盘容量
- 内存:Kafka 本身不需要太多内存。一个每秒处理 150 000 条消息和每秒 200 MB 数据速率的 borker,只需要 5GB 堆内存,剩下的系统内存用于页面缓存。
- 网络:网络和磁盘是制约 Kafka 伸缩规模的主要因素。
- CPU:Kafka 对计算处理能力的要求相对较低。
配置Kafka集群
使用集群的最大好处是可以跨服务器进行负载均衡,再者就是使用复制功能来避免单点故障。
操作系统调优
大部分 Linux 发行版默认的内核参数已经能够满足,不过还是可以通过调整一些参数来提升 Kafka 的性能。
- 虚拟内存
- 网络子系统
- 磁盘挂载
生产环境注意事项
- 垃圾回收器项:调整 Java 垃圾回收器选型是一门艺术。
- 数据中心布局
- 共享 ZK
Kafka 使用 ZK 保存 broker、主题和分区的元数据。只有当消费者群组成员或 Kafka 集群本身专门发生变化时才会向 ZK 写入数据。这些流量通常很小,所以没有必要为单个 Kafka 集群使用专门的 ZK 群组。
随着时间的推移,Kafka 对 ZK 的依赖在减少。在 2.8.0 版本中,Kafka 做了一个完全无 ZK 的早期尝试,但还没有做好生产就绪的准备。
不过,还有一个与消费者和 ZK 有关的问题。虽然不建议使用 ZK 来保存元数据,但消费者仍可以选择使用 ZK 还是 Kafka 来保存偏移量,还可以选择提交的时间间隔。
建议使用新版的 Kafka 消费者,并将偏移量提交到 Kafka,消除对 ZK 的依赖。
不建议把 Kafka 使用的 ZK 群组共享给其他应用使用(虽然可以更改路径)。
Kafka生产者
可以使用 Kafka 内置的客户端 API 来开发 Kafka 应用程序。
Kafka 还提供了二进制连接协议。
生产者概览
- 是否每条消息都很重要?
- 是否允许丢失一小部分消息?
- 是否可以接受偶尔出现重复消息?
- 是否有严格的延迟和吞吐量需求?
创建Kafka生产者
要向 Kafka 写入消息,首先需要创建一个生产者对象,并设置一些属性。3 个必须设置的属性:
bootstrap.servers
key.serializer
value.serializer
发送消息到Kafka
- 同步发送消息
- 异步发送消息
生产者配置
序列化器
创建一个生产者对象必须指定序列化器。
自定义分区
标头
除了键和值,记录还可以包含标头,在标头中添加一些有关记录的元数据。
拦截器
希望在不修改代码的情况下改变 Kafka 客户端的行为。
配额和节流
通过配额,限制生产和消费消息的速率。Kafka 提供了 3 种配额类型:
- 生产:限制客户端发送数据的速率(byte/s)
- 消费:限制客户端接收数据的速率
- 请求:限制 broker 用于处理客户端请求的时间百分比
Kafka消费者
消费程序向 Kafka 订阅主题,并从订阅的主题中接收消息。
消费者相关概念
消费者和消费者群组
Kafka 消费者从属于消费者群组。一个群组里的消费者订阅的是同一个主题,每个消费者负责读取这个主题的部分消息。
向群组添加消费者是横向扩展数据处理的主要方式。不要让消费者超过主题的分区数量,因为多余的消费者只会被限制。
每个应用都有自己的消费组就可以让它们获取到所有的消息。
不同于传统的消息系统,横向伸缩消费者和消费者群组并不会导致 Kafka 性能下降。
下面是关于消费者组和消费者的示例。
消费者群组和分区再均衡
消费者群组里的消费者共享主题分区的所有权。
- 当一个新消费者加入群组时,它将开始读取一部分原本由其他消费者读取的消息。
- 当一个消费者被关闭或发生崩溃时,它将离开群组,原本由它读取的分区将由群组里的其他消费者读取。
- 主题发生变化时(如增加了新分区),会导致分区重新分配。
分区的所有权从一个消费者转移到另一个消费者的行为,称为 再均衡。它为消费者群组带来了高可用性和伸缩性。不过,在正常情况下,我们并不希望发生再均衡。
根据消费者群组所使用的分区分配策略,再均衡分为两类:
- 主动再均衡
- 协作再均衡(增量再均衡)
在进行主动再均衡期间,所有消费者都会停止读取消息,放弃分区所有权,重新加入消费者群组,并获得重新分配到的分区。这样会导致整个消费者群组在一个很短的时间窗口内不可用。
协作再均衡通常是指,将一个消费者的部分分区重新分配给另一个消费者,其他消费者则继续读取没有被重新分配的分区。
消费者会向被指定为 群组协调器 的 broker(不同消费者群组的协调器不能不同) 发送心跳,以此来保持群组成员关系和对分区的所有权关系。
如果消费者在足够长的一段时间内没有发送心跳,那么它的会话就将超时,群组协调器会认为它已经死亡,进而再触发均衡。
群组固定成员
默认情况下,消费者的群组成员身份标识是临时的。当一个消费者离开群组时,分配给它的分区所有权将被撤销;而该消费者重新加入时,将通过再均衡协议为其分配一个新的成员的ID 和新分区。
可以给消费者分配一个唯一的 group.instance.id
,让它成为群组的 固定 成员。如果两个消费者使用相同的 ID 加入同一个群组,则第二个消费者会收到错误。
通常,当消费者第一次以固定成员身份加入群组时,群组协调器会按照分区分配策略给它分配一部分分区。当这个消费者被关闭时,它不会自动离开群组——它仍然是群组的成员,直到会话超时。当这个消费者重新加入群组时,它会继续持有之前的身份,并分配到之前所持有的分区。群组协调器缓存了每个成员的分区分配信息,只需要将缓存中的信息发送给重新加入的固定成员,不需要进行再均衡。
如果应用程序需要维护与消费者分区所有权相关的本地状态或缓存,那么群组固定成员关系就非常有用。
注意,群组的固定成员在关闭时不会主动离开群组,它们何时真正消失取决于会话超时参数。
创建消费者
订阅主题
轮询
消费者 API 最核心的东西是通过一个简单的轮询向服务器请求数据。
配置消费者
提交和偏移量
消费者可以用 Kafka 来追踪已读取的消息在分区中的位置(偏移量)。
我们把更新分区当前读取位置的操作叫做 偏移量提交。Kafka 不会提交每一条记录。相反,消费者会将已成功处理的最后一条消息提交给 Kafka,并假定该消息之前的每一条消息都已成功处理。
消费者向 __consumer_offset
的主题发送消息,消息里包含每个分区的偏移量。如果消费者一直处于运行状态,那么偏移量就没什么实际作用。但是,如果消费者发生崩溃或有新的消费者加入群组,则会触发再均衡。再均衡完成之后,每个消费者可能会被分配新的分区,而不知之前读取的那个。为了能够继续之前的工作,消费者需要读取每个分区的最后一次提交的偏移量,然后从偏移量指定的位置继续读取消息。
如果最后一次提交的偏移量小于客户端的最后一条消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理。
如果最后一次提交的偏移量大于客户端处理的最后一条消息的偏移量,那么处于两个偏移量之间的消息就会丢失。
所以,如何管理偏移量对应用有很大的影响。KafkaConsumerAPI 提供了多种提交偏移量的方式。
自动提交
最简单的提交方式是让消费者自动提交偏移量。
如果 enable.auto.commit
被设置为 true,那么每个 5秒(默认),消费者就会自动提交 poll()
返回的最大偏移量。
虽然自动提交很方便,但是没有为避免开发者重复处理消息留有余地。
提交当前偏移量
大部分开发者通过控制提交时间来降低丢失消息的可能性和减少可能可能在再均衡期间发生的消息重复。
Consumer API 提供了另一种提交偏移量的方式,开发者可以在必要的时候手动提交偏移量,而不是基于时间间隔。
把 enable.auto.commit
设置为 false,让应用程序自己决定何时提交偏移量。使用 commitSync()
提交偏移量是最简单可靠的方式。这个 API 会提交 poll()
返回的最新偏移量,提交成功后马上返回,如果由于某些原因提交失败就抛出异常。
主要没有发生不可恢复的错误,commitSync()
方法就会一直尝试直至提交成功。如果提交失败,就把异常记录到错误日志里。
异步提交
手动提交有一个缺点,在 broker 对请求做出回应之前,应用会一直阻塞,这样会限制应用的吞吐量。
异步提交 API,只管发送请求,无须等待 broker 做出响应。
commitAsync()
有一个缺点,不会进行重试。但它支持回调,回调经常被用于记录偏移量提交错误或生成指标,如果用它来重试提交偏移量,那么一定要注意提交顺序。
异步提交中的重试
可以用一个单调递增的消费者序列号变量来维护异步提交的顺序。每次调用commitAsync()
后增加序列号,并在回调中更新序列号变量。在准备好进行重试时,先检查回调的序列号与序列号变量是否相等。
如果相等,就说明没有新的提交,可以安全地进行重试。如果序列号比较大,则说明已经有新的提交了,此时应该停止重试。
同步和异步组合提交
如果是消费者被关闭,那么一般会使用 commitAsync()
和 commitSync()
的组合。
提交特定的偏移量
消费者 API 允许在调用 commitAsync()
和 commitSync()
时传递想要的分区和偏移量。
再均衡监听器
消费者会在退出和进行分区再均衡之前做一些清理工作。
如果知道消费者即将失去对一个分区的所有权,那么你就会马上提交最后一个已处理的偏移量。可能还需要关闭句柄、数据库连接等。
消费者 API 提供了一些方法,让你可在消费者分配到新分区或就分区被移除时执行一些代码逻辑。
从特定偏移量位置读取记录
Kafka 提供了一些方法,可让 poll()
从不同的位置读取消息。
Kafka 还提供了用于查找特定偏移量的 API。
如何退出
无须担心消费者在一个无限循环里轮询消息,我们可以让其优雅地退出。
反序列化器
生产者需要用序列化器把对象转换成字节数组后再发送给 Kafka。类似地,消费者需要用反序列化器把从 Kafka 接收到的字节数组转换成 Java 对象。
独立消费者
怎样使用不属于任何群组的消费者?
如果知道需要读取哪些分区,就不需要订阅主题了,可以直接将目标分区分配给消费者。消费者既可以订阅主题(并加入消费者组),也可以为自己分配分区,但不能同时做这两件事。
比如,你只需要一个消费者读取一个主题所有的分区或某个分区。这时候不需要使用消费者组和再均衡,只需要把主题或分区分配给这个消费者,然后开始读取消息。
编程式管理Kafka
Kafka 在 0.11 版本中加入了 AdminClient,为之前只能通过命令行的管理功能提供了编程 API:查看、创建和删除主题,描述集群,管理 ACL 和修改配置。
AdminClient概览
深入Kafka
了解 Kafka 的内部工作原理。
- Kafka 控制器
- Kafka 的复制
- Kafka 如何处理来自生产者和消费者的请求
- Kafka 的存储细节,比如文件格式和索引
集群的成员关系
Kafka 目前使用 ZK 维护集群的成员信息。每个 broker 都有一个唯一的标识符。broker 在启动时通过创建 ZK 临时节点把自己的 ID 注册到 ZK 中。broker、控制器和其他的一些生态工具会订阅 ZK 的 /borker/ids
路径(broker 在 zk 上的注册路径),当有 broker 加入或退出集群时,它们可以收到通知。
如果试图启动一个具有相同 ID 的 broker,则会报错。新 broker 会尝试进行注册,但不会成功,因为 ZK 中已有一个相同的节点。
当 broker 与 zk 断开连接时,它在启动时创建的临时节点会自动从 zk 上移除。监听 broker 节点路径的 Kafka 组件会被告知这个 broker 已被移除。
broker 对应的 zk 节点会在 broker 被关闭后消失,但它的 ID 会继续存在于其他数据结构中。在完全关闭一个 broker 后,如果使用相同的 ID 启动另一个全新的 broker,则它会立即加入集群,并获得与之前相同的分区和主题。
控制器
控制器其实也是一个 broker,只不过提供一般的 broker 功能之外,它还负责选举分区首领。集群中第一个启动的 broker 会通过在 ZK 中创建一个名为 /controller
的临时节点让自己成为控制器。
其他 broker 在启动时也会这样做,由于集群已存在控制器,所以会收到节点已存在的异常。其他 broker 会在控制器节点上创建 zk watch,确保可以收到此节点的变更通知。通过此方式确保集群中只有一个控制器。
如果控制器断开连接,那么这个临时节点就会消失。集群中的其他 broker 将会收到控制器节点已消失的通知,并尝试让自己成为新的控制器。
控制器必须先从 ZK 加载最新的副本集状态,然后才能开始管理集群元数据和执行首领选举。
当控制器发现有一个 broker 离开了集群时,它知道,原先首领位于这个 broker 上的所有分区需要一个新首领。它将遍历所有需要新首领的分区,并决定应该将哪个分区作为新首领(简单一点,它可能就是副本集中的下一个副本)。然后,它会将更新后的状态持久化到 ZK 中,再向所有包含这些分区副本的 broker 发送一个 LeaderAndISR 请求,请求中包含了新首领和跟随者的信息。
每一个新首领都知道自己要开始处理来自生产者和消费者的请求,而跟随者也知道它们要开始从新首领那里复制消息。
总的来说,Kafka 会使用 ZK 的临时节点来选举控制器,并会在 broker 加入或退出集群时通知控制器。控制器负责在 broker 加入或退出集群时进行首领选举。控制器使用 epoch
来避免脑裂。
新控制器KRaft
2019 年,Kafka 社区启动了一个雄心勃勃的项目:使用基于 Raft 的控制器替换基于 ZK 的控制器。新控制器叫 KRaft,其预览版本包含在 Kafka 2.8 中。Kafka 3.0 时包含了它的第一个生产版本。
在现有架构中,ZK 起到了两个重要作用:
- 用于选举控制器
- 保存集群元数据
新控制器背后的核心思想是:Kafka 本身有一个基于日志的架构,其中用户会将状态的变化表示成一个事件流。多个消费者可通过重放事件快速赶上最新的状态。日志保留了事件之间的顺序,并能确保消费者始终沿着单个时间轴移动。
在新架构中,控制节点形成了一个 Raft 仲裁,管理着元数据事件日志。这些日志中包含了集群元数据的每个变更。原先保存在 ZK 中的所有东西都将被保存在这个日志中。
因为使用了 Raft 算法,所以控制器可在不依赖外部系统的情况下选举首领。首领节点被称为主控制器,负责处理所有来自 broker 的 RPC 调用。跟随者会从主控制器那里复制数据,并会作为主控制器的热备。在控制器发生故障转移时,很快就可以完成状态的重载。
所有涉及直接与 ZK 通信的客户端和 broker 操作,都将通过控制器来路由。这样就可以通过替换控制器来进行无缝的迁移,无须对 broker 做出任何修改。
复制
复制保证了 Kafka 集群在个别节点失效时的可用性和持久性。
Kafka 的数据保存在主题中,每个主题被分成若干个分区,每个分区可以有多个副本(默认 1个),副本保存在 broker 上。
副本有两种类型:
- 首领副本(leader):每个分区都有一个首领副本。为了保证一致性,所有生产者和消费者的请求都会经过这个副本。客户端可从首领副本或跟随者副本读取数据。
- 跟随者副本(follower):如果没有指定,则跟随者副本不处理来自客户端的请求,它的主要目的是从首领那里复制消息(数据冗余),保持与首领一致的状态。如果首领发生崩溃,其中的一个跟随者会被提拨为新首领。
处理请求
Kafka 提供了一种二进制协议(基于 TCP),指定了消息的格式以及 broker 如何对请求做出响应。
客户端总是发起连接并发起请求,而 broker 负责处理这些请求并做出响应。broker 会按照请求到达的顺序来处理它们,这种顺序既能保证让 Kafka 具备消息队列的特性,又能保证保存的消息是有序的。
生产和获取请求都必须发送给分区的首领。Kafka 客户端负责把请求发送到包含分区首领的 broker 上。
客户端也可以通过元数据请求(指明了主题所包含的分区、分区首领和副本等信息)。一般情况下,客户端会把这些信息缓存起来,并直接向目标 broker 发送请求。它们需要时不时通过发送元数据请求来刷新缓存,以便知道元数据是否发生了变化。
物理存储
Kafka 的基本存储单元是分区,分区无法在多个 broker 间再细分,也无法在同一个 broker 的多个磁盘见再细分。所以,分区大小受单个挂载点可用空间的限制。
配置参数 log.dirs
适用于保存分区数据的目录列表(不要把它搞成错误日志,日志目录配置在 log4j.properties
文件中)。
分层存储
为 Kafka 增加分层存储能力,计划在 3.0 版本中发布。
分区的分配
在创建主题时,Kafka 首先要决定如何在 broker 间分配分区。
假设有 6 个 broker,打算创建包含 10 个分区的主题,并且复制系数为 3,那么总共有 30 个分区副本,它们将分配给 6 个 broker。
在进行分区分配时,要达到以下目标:
- 在 broker 间平均分布分区副本。
- 确保每个分区的副本分布在不同的 broker 上。
为分区和副本选好合适的 broker 后,接下来要决定新分区应该放在哪个目录。
文件管理
Kafka 会为每个主题配置数据保留期限,数据在达到指定的时间或大小之后被清除。
在一个大文件中查找和删除消息既费时又容易出错,我们会把分区分成若干个片段(在默认情况下,每个片段包含 1GB 或 1周的数据)。在 broker 向分区写入数据时,如果触及任意一个上限,就关闭当前文件,并打开一个新文件。
当前正在写入的数据的片段叫做活动片段,活动片段永远不会被删除。所以,如果你配置的保留时间是 1天,但片段里包含了 5天的数据,那么这些数据就会被保留 5天,因为在片段被关闭之前,这些数据不会被删除。
broker 会为分区的每一个打开的日志片段分配一个文件句柄,哪怕是非活动片段。这样就会打开很多文件句柄,因此必须根据实际情况对操作系统做一些调优。
文件格式
每个日志片段被保存在一个单独的数据文件中,文件中包含了消息和偏移量。
保存在磁盘上的数据格式和生产者发送给服务器以及服务器发送给消费者的消息格式是一样的。因此磁盘存储和网络传输采用相同的格式,所以 Kafka 可使用零复制技术向消费者发送消息,并避免对生产者压缩过的消息进行解压和再解压。
Kafka 消息由有效负载和系统标头组成。有效负载包括一个可选的键、值和一些可选的用户标头(键值对)。
索引
消费者可以从 Kafka 任意可用的偏移量位置开始读取消息。
假设从偏移量 100 开始读取 1MB 消息,那么 broker 就必须立即定位到偏移量 100,然后从此位置开始读取消息。
为了帮助 broker 更快定位到指定的偏移量,Kafka 为每个分区维护了一个索引。该索引将偏移量与片段文件以及偏移量在文件中的位置做了映射。
类似地,Kafka 还有第二个索引。将时间戳与消息偏移量做了映射。在按时间戳搜索消息时会用到。
索引也会被分成片段,所以,在删除消息时也可以删除相应的索引。Kafka 没有为索引维护校验和,如果索引损坏,那么 Kafka 将通过重新读取消息并记录偏移量和位置来再次生成索引。Kafka 会自动重新生成索引,因此删除索引是安全的。
压实
应用程序通过 Kafka 来保存它的当前状态,每次状态发生变化,就将新状态写入 Kafka。当应用程序从故障中恢复时,它会从 Kafka 读取之前保存的消息,以便恢复到最近的状态。应用程序只关心发生崩溃前的那个状态,并不关心在运行过程中发生的所有状态的变换。
如果保留策略是压实(compact),那么只为每个键保留最新的值。
Kafka的可靠性
可靠性是系统而不是某个独立组件的一个属性,需要从系统的整体层面触发。
可靠性保证
ACID 大概是大家最熟悉的一个例子,它是关系型数据库普遍支持的标准可靠性保证。
Kafka 的保证:
- 保证分区中的消息是有序的。
- 一条消息只有在被写入分区所有的同步副本时才被认为时已提交的(不一定刷新到磁盘上)。
- 只有还有一个副本是活动的,已提交的消息就不会丢失。
- 消费者只能读取已提交的消息。
复制机制
Kafka 的复制机制和分区多副本架构是 Kafka 可靠性保证的核心。把消息写入多个副本可保证 Kafka 在发生崩溃时仍能提供消息的持久性。
一个稍有滞后的同步者副本会导致生产者和消费者变慢,因为在消息被认为提交之前,客户端会等待所有同步副本确认消息。如果一个副本变成不同步的,那么就不再需要关心它是否已经收到消息。
broker配置
broker 中有 3 个配置参数会影响 Kafka 的消息存储可靠性。它既可以配置在 broker 级别,也可以配置在主题级别。
复制系数
复制系数参数如下。
如果复制系数是 N,那么在 N-1 个 broker 失效的情况下,客户端仍能够从主题读写数据。所以,更高的复制系数参数会带来更高的可用性、可靠性和更少的灾难性事故。另外,复制系数 N 至少需要 N 个 broker,也就是有 N 个数据副本,并且会占用 N 倍的磁盘空间。
|
|
如何确定一个主题需要几个副本?
- 可用性:如果只有一个副本,那么它在 broker 例行重启期间将不可用。副本越多,可用性就越高。
- 持久性:如果只有一个副本,那么一旦磁盘损坏,这个分区的数据就丢失了。
- 吞吐量:每增加一个副本会增加 broker 内的复制流量。在规划集群大小和容量时,需要考虑这个。
- 端到端延迟:每一条记录必须被复制到所有同步副本之后才能被消费者读取。从理论上讲,副本越多,出现滞后的可能性就越大。
- 成本:出于成本考虑,非关键数据的复制系数可以小于 3。以减少存储和网络成本。
副本的位置分布也很重要。Kafka 可以确保分区的每个副本都放在不同的 broker 上。
不彻底的首领选举
unclean.leader.election.enable
只能在 broker 级别(实际上是集群级别),默认值是 false。
当分区首领不可用时,一个同步副本将被选举为新首领。如果选举过程中未丢失数据,那这个选举就是彻底的。
但如果在首领不可用时,其他副本都是不同步的,该怎么办?
默认值,也就是不允许不同步副本成为首领。它可以保证数据不丢失,但可能在极端不可用的场景中,一些分区将一直不可用,知道手动恢复。
如果允许数据丢失,以便让分区可用,可将值改为 true。
最少同步副本
min.insync.replicas
参数可配置在 broker 和主题级别。
如果想确保已提交的数据被写入不止一个副本,可以把最少同步副本设置大一些。
如果同步副本变得不可用,则必须在可用性和一致性之间做出选择。设置过大的最少同步副本数,当只剩下一个同步副本时,它就变成只读了。
保持副本同步
不同步副本会降低总体可靠性,要尽量避免出现这种情况。一个副本可能在两种情况下变得不同步:
- 与 ZK 断开连接
- 从首领复制消息滞后
zookeeper.session.timeout.ms
参数是允许 broker 不向 ZK 发送心跳的时间间隔。如果超过这个时间不发送心跳,则 ZK 会认为 broker 已经死亡,并将其从集群中移除。在 v2.5 中,默认是 18秒。
如果一个副本未能在 replica.lag.time.max.ms
时间内从首领复制数据或赶上首领,那么它将变成不同步副本。在 v2.5 中,默认值是 30秒。
持久化到磁盘
即使消息还没有被持久化到磁盘上,Kafka 也可以向生产者发出确认,这取决于已接收到的消息的副本的数量。Kafka 会在重启之前和关闭日志片段(默认 1GB)时将消息冲刷到磁盘上,或等到 Linux 系统页面缓存被填满时冲刷。
参数 flush.messages
用于控制未同步的磁盘的最大消息数量,参数 flush.ms
用于控制同步频率。在配置这些参数之前,最好先了解 fsync
是如何影响 Kafka 的吞吐量的以及如何尽力避开它的缺点。
可靠的生产者
对生产者就行可靠性方面的配置。
发送确认
发送确认 ack:
- 0:生产者发消息发出去,那么就认为消息已成功写入 Kafka。
- 1:首领在收到消息并把它写入分数数据文件时,会返回确认或错误响应。
- all:首领在返回确认或错误响应之前,会等待所有同步副本都收到消息。此配置和可以和最少同步副本参数相结合,用于控制在返回确认响应前至少要有多少个副本收到消息。此模式下的生产者延时是最大的。
配置生产者的重试参数
可重试:
- 可重试错误
- 不可重试错误
额外的错误处理
需要开发人员处理:
- 不可重试的 broker 错误,比如消息大小错误、身份验证错误等。
- 在将消息发送给 broker 之前发生的错误,比如序列化错误。
- 在生产者达到重试次数上限或重试消息占用的内存达到上限时发生的错误。
可靠的消费者
造成消费者丢失消息的最主要的一种情况是,它们提交了已读取消息的偏移量却未能全部处理完。在此情况下,如果其他消费者接手了工作,那么那些没有被处理的消息就会被忽略,永远不会得到处理。
所以我们非常重视何时以及如何提交偏移量。
消费者的可靠性配置
以下 4 个参数很重要:
|
|
手动提交偏移量
如果想要更大的灵活性,可以选择手动提交偏移量,但需要考虑正确性和性能方面的问题。
验证系统可靠性
验证配置
验证 broker 和客户端之间的配置。
Kafka 提供了两个重要的配置验证工具:org.apache.kafka.tools
包下面的 VerifiableProducer 类和 VerifiableConsumer 类。
测试的场景:
- 首领选举:如果停掉首领会发生什么事情?生产者和消费者需要多长时间来恢复状态?
- 控制器选举:重启控制器后系统需要多少时间来恢复状态?
- 滚动重启:可以滚动重启 broker 而不丢失消息吗?
- 不彻底的首领选举:如果依次停止一个分区的所有副本(确保每个副本都变为不同步的),然后启动一个不同步的 broker 会发生什么?要怎样才能恢复正常?这样做是可接受的吗?
验证应用程序
验证应用程序是否能够提供我们想要的保证。测试场景:
- 客户端与服务器断开连接
- 客户端与服务器之间存在高延迟
- 磁盘被填满
- 磁盘被挂起
- 首领选举
- 滚动重启 broker
- 滚动重启消费者
- 滚动重启生产者
监控可靠性
除了监控 Kafka 集群的健康状态,也要对客户端和数据流进行监控。
Kafka 的 Java 客户端提供了一些 JMX 指标,可用于监控客户端的状态和事件。
对生产者来说,最重要的两个可靠性指标是消息的错误率和重试率。
对消费者来说,最重要的指标是消费者滞后指标。理想情况下,这个值是 0(0 lag),也就是没有消费延迟,消费者读取的是最新的消息。
监控数据流是为了确保所有生成的数据会被即时地读取,为了确保数据能够被即时读取,需要知道数据是什么时候生成的。
精确一次性语义
Kafka 的精确一次性语义由两个关键特性组成:
- 幂等生产者:避免因重试导致的消息重复
- 事务语义:保证流式处理应用程序的精确一次性处理
幂等生产者
如果一个操作被执行多次的结果与被执行一次相同,那么这个操作就是幂等的。
一个最典型的场景是分区首领收到生产者发送的一条消息,这条消息被跟随者成功复制,然后,首领所在的 broker 在向生产者发送发送响应之前崩溃了。生产者没有收到回应,在一段时间之后将重新发送消息。消息被发送给新首领,而新首领已经有了上一次写入的消息副本,结果导致消息重复。
对于一些应用程序来说,消息重复并不是什么问题。但对于如库存、财库报表等应用程序,会产生严重的问题。
Kafka 的幂等生产者可以自动检测并解决消息重复问题。
幂等生产者的工作原理
如果启用了幂等生产者,那么每条消息都将包含生产者 ID(PID)和序列号。我们将它们与目标主题和分区组合在一起,用于唯一标识一条消息。broker 会用这些唯一标识跟踪写入每个分区的最后 5 条消息。
如果 broker 收到之前已经收到过的消息,那么它将拒绝这条消息,并返回错误。生产者会记录这个错误,并反应在指标当中,但不抛出异常,也不触发告警。
幂等生产者的局限
幂等生产者只能防止由生产者内部重试逻辑引起的消息重复。对于使用同一条消息调用两次发送就会导致消息重复的情况,即使使用幂等生产者也无法避免。因为生产者无法知道这两条信息实际上是一样的。
应用程序有一个或多个生产者的情况很常见。如果两个生产者尝试发送相同的消息,则幂等生产者将无法检测到消息重复。
幂等生产者只能防止因生产者自身的重试机制而导致的消息重复。
如何使用幂等生产者
幂等生产者配置,只需在生产者中加入 enable.idempotence=true
。在启用幂等生产者后,会发生以下变化:
- 为了获取生产者 ID,生产者在启动时会调用一个额外的 API。
- 每个消息批次里的第一条消息都将包含生产者 ID 和序列号(递增)。这些新字段给每个消息批次增加了 96 位(PID 是长整型,序列号是整型)。
- broker 将会验证来自每一个生产者实例的序列号,并保证没有重复消息。
- 每个分区的消息顺序都将得到保证。
事务
为了让流式处理应用程序生成正确的结果,要保证每个输入的消息都被精确处理一次,即使是在发生故障的情况下。
Kafka 的事务机制是专门为流式处理应用程序而添加的。因此,它非常适用于流式处理应用程序的基础模式,即 消费-处理-生产。事务可以保证流式处理的精确的一次性语义——在更新完应用程序内部状态并将结果成功写入输入主题后,对每个输入消息的处理就算完成了。
事务的应用场景
金融行业的应用程序就是典型的复杂流式处理的例子,精确一次性用于保证精确的聚合结果。
事务可以解决哪些问题
- 应用程序崩溃导致的重复处理
- 僵尸应用程序导致的重复处理
事务是如何保证精确一次性的
精确一次性处理意味着消费、处理和生产都是原子操作。我们要确保不会出现只有部分操作执行成功的情况。
为了支持这种情况,Kafka 事务引入了原子多分区写入的概念。
提交偏移量和生成结果都涉及向分区写入数据,结果会被写入输出主题,偏移量会被写入 consumer_offsets
主题。如果可以打开一个事务,向这两个主题写入消息,如果两个写入操作都成功就提交事务,如果不成功就中止,并进行重试,那么就会实现我们所追求的精确一次性语义。
事务不能解决哪些问题
Kafka 事务无法实现精确一次性保证的几种场景:
- 在流式处理中执行外部操作。
- 从 Kafka 中读取数据并写入数据库。
- 从一个数据库读取数据写入 Kafka,再从 Kafka 将数据写入另一个数据库。
- 将数据从一个集群复制到另一个集群。
- 发布和订阅模式。
如何使用事务
事务既是一个 broker 特性,也是 Kafka 协议的一部分,所有有多种客户端支持事务。
事务ID和隔离
为生产者设置事务 ID很重要。错误地分配事务 ID 有可能导致应用程序出现错误或无法提供精确一次性保证。非常关键的是,一个应用程序实例的事务 ID 在重启前后必须保持一致,而且应用程序的不同实例的事务 ID 不能一样,否则 broker 将无法隔离僵尸实例。
事务的工作原理
Kafka 事务的基本算法受到了 Chandy-Lamport 快照的启发,它会将一种被称为 “标记”(marker)的消息发送到通信通道中,并根绝标记的达到情况来确定一致性状态。Kafka 事务根据标记消息来判断跨多个分区的事务是否被提交或中止。
总的来说,此算法会执行如下步骤:
- 记录正在执行中的事务,包括所涉及的分区。
- 记录提交或中止的事务的意图——一旦被记录下来,到最后要么被提交,要么被终止。
- 将所有事务标记写入所有分区。
- 记录事务的完成情况。
要实现这个算法,Kafka 需要一个事务日志。这里使用了一个叫做 __transaction_state
的内部主题。
事务的性能
事务给生产者带来了一些额外的开销。
需要注意的是,生产者在事务方面的开销与事务包含的消息数量无关。因此,一个事务包含的消息越多,相对的开销就越少,同步调用次数也就越少,从而提高了总体吞吐量。
在消费者方面,读取提交标记会增加一些开销。提交事务的时间间隔越长,消费者在读取到消息之前需要等待的时间就越长,端到端的延迟也就越高。
但是,消费者不需要缓冲未提交事务所包含的消息,因为 broker 不会将它们返回给消费者。
构建数据管道
使用 Kafka 构建数据管道:
- 把其他媒介的数据移动到 Kafka,或把 Kafka 的数据移动到其他媒介。
- 把 Kafka 中的数据移动到 S3。
- 把 MongoDB 中的数据移动到 Kafka。
- 把 Kafka 作为中间媒介:为了把 Twitter 中的数据移动到 ES,需要先把它们移动到 Kafka,然后再从 Kafka 中移动到 ES。
Kafka 作为数据管道带来的价值在于,它可以作为数据管道各个数据阶段之间的大型缓冲区,有效解耦生产者和消费者,让同一个数据源的数据可被多个具有不同可用性需求的系统和应用使用。
跨集群数据镜像
保护Kafka
需要从系统的整体层面考虑安全性。
锁住Kafka
Kafka 采用了一系列安全措施来建立和维护数据的机密性、完整性和可用性。
- 身份验证特性用于识别和确定用户身份。
- 授权特性决定了用户可以做什么。
- 加密特性保护数据不被窃取和篡改。
- 审计特性用于跟踪用户已经做了什么或试图做什么。
- 配额特性控制用户可以使用多少资源。
安全协议
borker 上配置了一个或多个监听器,这些监听器负责接收来自客户端的连接。每个监听器(内部或外部)可以有自己的安全设置。安全协议的选择决定了数据传输的身份验证和加密级别。
Kafka使用两种标准技术(TLS 和 SASL)支持 4 种安全协议。
传输层安全(TLS)通常以安全套接字层(SSL)作为代称,支持加密以及客户端和服务器的身份验证。
简单身份验证和安全层(SASL)是一个在面向连接的协议中使用不同的机制实现身份验证的框架。
每一个 Kafka 安全协议都结合了传输层安全(PLAINTEXT 或 SSL)和可选的认证层安全(SSL 或 SASL)。
- PLAINTEXT:没有身份认证的 PLAINTEXT 传输层,只适用于在私有网络内传输不敏感的数据。它没有使用身份验证或加密。
- SSL:带有可选 SSL 客户端身份验证的 SSL 传输层,适用于不安全网络。它支持客户端和服务端的身份验证和加密。
- SASL_PLAINTEXT:带有 SASL 客户端身份验证的 PLAINTEXT 传输层。它支持身份验证,不支持加密,适用于私有网络。
- SASL_SSL:带有 SASL 身份验证的 SSL 传输层,适用于不安全网络。它支持客户端和服务端身份验证和加密。
|
|
身份验证
Kafka 用 KafkaPrincipal 实例表示客户端身份,并用它授予资源访问权限,以及为具有这个客户端身份的连接分配配额。
匿名连接 User:ANONYMOUS
被用于未经身份验证的连接。
SSL
SSL 通道是加密的,会增加 CPU 的开销。
SASL
Kafka 内置几种常用的 SASL 机制:
- GSSAPI
- PLAIN
- SCRAM-SHA-256 和 SCRAM-SHA-512
- OAUTHBEARER
重新认证
一些安全机制使用的凭证的生存期是有限的。
安全更新不停机
Kafka 需要定期轮换密钥、应用安全补丁以及更新到最新的安全协议。
加密
机密被用于保护数据的隐私和完整性。
授权
授权决定可以对哪些资源执行哪些操作的过程。
审计
broker 可以生成用于审计和调试的 log4j 日志。
保护ZK
ZooKeeper 支持基于 SASL/GSSAPI 的 Kerberos 身份验证和基于 SASL/DIGEST-MD5 的用户名和密码身份验证。
管理Kafka
主题操作
kafka-topics.sh
可执行大部分与主题有关的操作。
创建主题
|
|
列出主题
|
|
查看主题详情
|
|
增加分区
增加分区通常是为了通过降低单个分区的吞吐量来扩展主题容量。
|
|
减少分区
不可能减少主题的分区数量。如果删除了主题的一个分区,则分区里的数据也会被删除。建议删除整个主题,并重新创建。
删除主题
默认是不允许删除主题的。可以将 delete.topic.enable
参数设置为 true 来删除主题。
删除主题是异步操作,要删除的主题将被打上删除标记,但可能不会立即被删除,具体取决于数据量和清理策略。
删除一个主题会删除主题的所有数据,这是一个不可逆操作,请非常小心。
|
|
消费者群组
可以用 kafka-consumer-groups.sh
来管理和查看消费者组信息。
列出消费者组
|
|
查看消费者组详情
|
|
删除消费者组
在删除消费者组之前,必须将群组里所有的消费者都关闭,否则会抛出一个异常。
|
|
偏移量管理
尽量不要操作这个。
|
|
动态配置变更
有一些参数可在运行时动态更新,而不需重启集群。可使用 kafka-config.sh
来动态修改这些配置:broker、主题、用户和客户端。
为了便于自动化管理动态配置参数,可通过 --add-config-file
来指定包含了配置的参数的文件。
覆盖broker的默认配置
|
|
覆盖主题的默认配置
参数比较多。
覆盖客户端和用户的默认配置
|
|
查看被覆盖的配置
|
|
移除被覆盖的配置
|
|
生产和消费
Kafka 提供了 kafka-console-consumer.sh
和 kafka-console-producer.sh
两个工具,让我们手动验证生产和消费。这些工具对 Java 客户端进行了包装,让我们可以在不编写代码的情况下与 Kafka 主题发生交互。
无法充分利用控制台生产者的所有特性,正常发送字节是很困难的。建议直接使用 Java 客户端,或其他实现了 Kafka 协议的第三方客户端。
控制台工具建议只用于帮助我们实现功能的测试。
控制台生产者
使用 kafka-console-producer.sh
向主题写入消息。默认,一行输入就是一条消息,消息的键和值以 Tab 字符分割(没有分割,键就是 null)。与控制台消费者一样,生产者使用默认的序列化器(DefaultEncoder)生成原始字节。
|
|
一些有用的参数:
|
|
控制台消费者
使用 kafka-console-consumer.sh
从 Kafka 集群的一个或多个主题读取消息。它读取的消息会被打印在标准输出中,并用换行符分隔。默认,它将输出消息的原始字节,没有键,也不进行格式化。
控制台消费者在启动后会一直持续尝试读取消息,知道遇到退出命令(Ctrl-C
)。
|
|
一些有用的参数:
|
|
消息格式化器配置参数:
|
|
分区管理
Kafka 提供了一些用于管理分区的脚本,用于重新选举首领和将分区分配给 broker。有了这两个工具,就可以通过手动的方式让消息流量均衡地分布在集群的 broker 上。
首选首领选举
Kafka 会将分区副本清单中第一个 ISR 定义为首选首领。当 broker 断开连接时,分区领导权将被转一个另一个 ISR,原始副本就自动丧失了分区领导权。如果不启用自动首领均衡,那么在进行跨集群部署后可能会出现非常低效的均衡。因此,建议启用此功能。
如果发现 Kafka 就请你变得不均衡了,则可以考虑进行首选首领选举。集群控制器会为分区选择理想的首领。
|
|
修改分区的副本
某些情况下,可能需要手动修改分区的副本。场景:
- broker 的负载分布不均衡,自动首领选举也无法解决此问题。
- broker 离线,造成分区不同步。
- 新加了 broker,你想快速给它分配分区。
- 你想修改主题的复制系数。
使用 kafka-reassign-partitions.sh
来调整分区的副本。此过程包含多个步骤:
- 先是,生成迁移清单:基于 broker 和主题生成一个迁移清单。需要一个 JSON 文件,包含了要调整的主题。
- 然后,再根据迁移清单执行调整。
- 最后,跟踪和验证分区调整的进度或完成情况。
也可以用 kafka-reassign-partitions.sh
来增加或减少一个分区的副本系数。
转储日志片段
有时候,可能需要查看消息的内容。使用 kafka-dump-log.sh
来解码分区的日志片段,这样就可以在不读取和解码消息的情况下查看消息的内容。
|
|
副本验证
使用 kafka-replica-verification.sh
来验证集群分区副本的一致性。它会从指定分区的副本读取消息,检查所有副本是否包含了相同的消息,并打印出指定分区的最大延迟。
|
|
不安全的操作
虽然一些操作在技术上是可行的,但这些操作是危险的,不建议执行。
- 移动集群控制器
- 移除待删除的主题
- 手动删除主题
监控Kafka
指标基础
Kafka 提供的所有指标都可以通过 Java 管理扩展(JMX)接口访问。
JMX 端口是 broker 配置信息的一部分,保存在 ZK 中。如果监控系统要直连到 Kafka 的 JMX 端口,可先从 ZK 获取端口信息。/brokers/ids/<ID>
节点包含了 broker 的配置信息,其中就有 jmx_port。需要注意的是,出于安全方面的考虑,Kafka 默认禁用了远程 JMX。如果要启用它,则必须保护好端口,因为 JMX 还允许执行代码。
不管使用哪种指标,都要确保有健康检测来监控应用程序的整体健康状况。
服务级别目标
服务级别指标(SLI),是一种用于描述服务可靠性的指标。
服务级别目标(SLO),也叫做服务级别阈值(SLT),它将 SLI 与目标值组合在一起。常见表示方法是使用数字 9(如 99.9%)。
服务级别协议(SLA),是服务商可客户之间的一种契约。
broker的指标
Kafka 集群一般问题:
- 单个 broker 的问题
- CPU
- 磁盘 IO:磁盘是最重要的子系统,所有消息都保存在磁盘上。
- 网络吞吐量
- 活跃控制器数量
- 控制器队列大小
- 请求处理器空闲率
- 主题流入字节
- 主题流出字节
- 主题流入消息
- 分区数量
- 首领数量
- 离线分区数量
- 集群过载
- 控制器的问题
- 集群级别问题
- 负载不均衡
- 资源过度消耗
- 主题指标和分区指标
- Java 虚拟机(JVM)
- JVM 频繁垃圾回收会影响 broker 的性能
- 操作系统层面
- 日志
客户端监控
- 生产者指标
- 消费者指标
- 配额指标
滞后指标
对消费者来说,最需要被监控的指标是滞后消息数量,也就是分区生产的最后一条消息和消费者读取的最后一条消息之间的差值。
监控消费者滞后最好使用外部工具。
端到端的监控
- 可以向 Kafka 集群写入消息吗?
- 可以从 Kafka 集群读取消息吗?
流式处理
从 0.10 版本开始,除了被用作流式处理框架可靠的数据来源,Kafka 还提供了一个强大的流式处理开发库(Kafka Stream)。
什么是流式处理
什么是数据流?数据流是无边界数据集的抽象表示。无边界意味着无限和持续增长。随着时间的推移,会有新纪录不断加入。
事件流某型的一些属性:
- 数据流无边界
- 数据流是有序的
- 不可变的数据记录
- 事件流是可重放的
流式处理是指实时地处理一个或多个事件流。流式处理是一种编程范式。
下面 3种范式:
- 请求与响应
- 批处理
- 流式处理
流式处理相关概念
拓扑
一个流式处理应用程序包含一个或多个处理拓扑。
时间
时间可能是流式处理中最为重要的概念。在流式处理中,形成一个通用的时间概念非常重要,因为大部分流式应用程序的操作是基于时间窗口的。
流式处理一般包含以下几种时间:
- 事件时间
- 日志追加时间
- 处理时间