先看一个简单的KafkaConsumer例子:
1 | Properties props = new Properties(); |
我们看到0.9的consumer最大的变化是:
- 通过consumer.subscribe(Arrays.asList(“test_topic”))来声明要订阅的topic,而之前的版本是用Whitelist声明。
- 通过consumer.poll(100)直接抓取消息,而之前需要遍历KafkaStream的迭代器。(这个比之前方便太多了。。。)。
- MessageAndMetadata变成了ConsumerRecords
enable.auto.commit表示已一个固定的时间间隔自动提交offsets,时间间隔由auto.commit.interval.ms控制。
bootstrap.servers表示kafka集群的broker列表。客户端会连接到这个列表中的任意一台机器,获取到整个集群的信息,因此理论上只需要在bootstrap.servers列出一台机器就够了,但是考虑到灾备,建议在bootstrap.servers中包含所有broker。
key.deserializer和value.deserializer指定了如何解析记录的key和value,在本例中我们认定key和value都是字符串。
在本例中,client端启动了一个从属于test_group的consumer来订阅test_topic。当其中一个consumer process断开之后,kafka broker会通过心跳机制自动检测到,因此集群始终能够知道哪些consumer是活着的。只要被认为是活着,这个consumer就能够从分配给它的partition中获取数据;一旦心跳丢失超过session.timeout.ms,consumer会被认为死掉,它所占有的partition将会被分配给别的process。