kafka消息的管理
kafka producer将消息发送给broker后,消息日志会被存储在broker的磁盘上,采用顺序写入的方式。顺序写可以加快磁盘访问速度,并且可以将将多个小型的逻辑写合并成一次大型的物理磁盘写入,官方数据显示顺序写比随机写入快6000倍以上。另外,操作系统使用内存对磁盘进行缓存即pagecache,pagecache完全由操作系统管理,这也使得写数据变得即简洁也快速。
配置中可以调整过期时间,超过改时间的消息日志将移除,默认值为7天;也可配置文件大小阈值,达到该阈值后,从最旧消息开始删除。配置项为:
从文件到套接字的常见数据传输路径有4步:
1).操作系统从磁盘读取数据到内核空间的 pagecache
2).应用程序读取内核空间的数据到用户空间的缓冲区
3).应用程序将数据(用户空间的缓冲区)写回内核空间到套接字缓冲区(内核空间)
4).操作系统将数据从套接字缓冲区(内核空间)复制到通过网络发送的 NIC 缓冲区
kafka使用 producer ,broker 和 consumer 都共享的标准化的二进制消息格式,这样数据块不用修改就能在他们之间传递。kafka采用Linux 中系统调用sendfile的方式,直接将数据从 pagecache 转移到 socket 网络连接中。这种零拷贝方式使得kafka数据传输更加高效。
以前面文章中安装的kafka为例: Mac 安装kafka
kafka本地文件存储目录可以在配置文件server.properties中设置,参数及默认值为:
进入该目录,可以看到kafka保存的cosumer offset和topic消息:
其中__consumer_offsets开头的为消费的offset信息,test1开头的即为之前创建的topic “test1”,该topic有三个分区,分区编号从0开始,分别是test1-0、test1-1、test1-2。
进入test1-0,查看包含文件如下:
可以看到kafka消息按partition存储的,每个partition一个目录。partition下消息分段(segment)存储,默认每段最大1G,通过参数log.segment.bytes可配置。segment包含索引文件index、消息文件log,分别存储消息的索引和内容,以.index和.log结尾,文件命名为当前segment第一个消息offset。index文件在log每隔一定数据量之间建立索引,可以通过参数index.interval.bytes配置。
通过kafka命令查看00000000000000000000.index内容如下:
00000000000000000000.log内容如下:
其中索引文件中包含两个字段:(offset,position),分别表示消息offset和该消息在log文件的偏移量。如上图中offset=0的消息对应的position=0;对应的就是00000000000000000000.log中的第一条消息:
其中payload为具体的消息内容。
另外里面还有一个以".timeindex"结尾的文件,查看其内容:
该日志文件是kafka0.10.1.1加入的,其中保存的为:(消息时间戳,offset)。时间戳是该segment最后一个消息对应的时间戳(与log文件中最后一条记录时间戳一致),kafka也支持根据时间来读取消息。
由上可知消息是按partition来存储的,partition可以配置n个副本followers。多个partition和其follower在broker上是怎么分配的呢?
partition和broker都进行了排序,下标从0开始;
假设有k个broker,第i个partition被分配到到 i%k 个broker上;
第i%k个broker即为partition i 的leader,负责这个partition的读写 ;
partition的followers也进行排序,从leader的后续broker开始分配,第i个partition的第j个副本broker为 (j+ i%k)%k。
一个有3个broker的kafka集群,包含3个partition,每个partition副本数为1的topic如下图:
总结:
kafka将消息日志采用顺序写入的方式存放在broker磁盘中;数据传输通过系统调用sendfile零拷贝方式;消息日志分段存放,可配置清除时间或大小阈值;每段包含消息索引、消息内容两个文件,通过索引实现快速查找;按照/topic/partition的目录结构分开存储,且均匀分布到集群各broker上。
参考:
https://kafka.apachecn.org/documentation.html#design