kafka实时监控

在kafka的开发和维护中,我们经常需要了解kafka topic以及连接在其上的consumer的实时信息,比如logsize,offset,owner等。为此kafka提供了ConsumerOffsetChecker,它的用法很简单

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group <group>

输出结果类似于

Group Topic Pid Offset logSize Lag Owner
Group1 a.topic 0 2 2 0 none
Group1 a.topic 1 0 0 0 none
Group1 a.topic 2 2 2 0 none

我们也可以通过kafka web console一类的工具直观地获取kafka信息,但如果我们要构建自己的监控系统,需要抓取这些信息的话,有两种办法:一种是运行ConsumerOffsetChecker然后解析输出的字符串;另一种就是通过SimpleConsumer和Zookeeper实时抓取信息(换句话说就是把ConsumerOffsetChecker翻译一下:)),以下介绍第二种方法的思路。

首先我们看kafka信息在zookeeper的存储结构

1,/brokers/topics/[topic]/partitions/[partitionId]/state
2,/brokers/ids/[0…N]
3,/consumers/[groupId]/ids/[consumerId]
4,/consumers/[groupId]/owners/[topic]/[partitionId]
5,/consumers/[groupId]/offsets/[topic]/[partitionId]

对于指定的topic和groupid,通过(1)可以拿到所有的partition信息(Pid),然后通过(5)可以拿到offset,通过(4)可以拿到owner。就差logsize还没法拿到,事实上logsize在zookeeper中并没有记录,它必须通过kafka consumer的low level的api取得。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private Long getLogSize(String topicName, String partitionId, long offsetRequestTime) {  
SimpleConsumer consumer = new SimpleConsumer(server, port, 10000, 1024000, "ConsumerOffsetChecker");
Long logSize = null;
TopicAndPartition topicAndPartition = new TopicAndPartition(topicName, Integer.parseInt(partitionId));
Map<TopicAndPartition, PartitionOffsetRequestInfo> map = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
map.put(topicAndPartition, new PartitionOffsetRequestInfo(offsetRequestTime, 1));
OffsetRequest request = new OffsetRequest(map, kafka.api.OffsetRequest.CurrentVersion(), kafka.api.OffsetRequest.DefaultClientId());
OffsetResponse response = consumer.getOffsetsBefore(request);
long[] aa = response.offsets(topicName, Integer.parseInt(partitionId));
if (aa.length != 0) {
logSize = aa[0];
}
return logSize;
}

第二行,创建simple consumer

第四行,创建topic和partition对象

第六行,创建offset request,第一个参数是记录写入的时间,如果是kafka.api.OffsetRequest.EarliestTime(),则代表当前最早的一条记录,也就是当前最小offset;如果是kafka.api.OffsetRequest.LatestTime(),则代表最新的一条记录,也就是当前最大offset。第二个参数是获取offset的个数。

由max offset和current offset,我们可以获得当前还有多少消息没有被消费(lag),由(lag/(maxoffset-minoffset)),我们可以算出当前还没有被消费的消息占的百分比,如果这个百分比接近100%,那么接下来很可能会导致offset out of range exception而丢失数据。