最近要把Kafka集群从0.10.1.1升级到1.0.0,主要是为了使用新的ProducerRecord和ConsumerRecord中的Header来保存和传输一些metadata,要求升级过程中不能服务不能中断,所以不考虑整体停机升级的方案,升级过程如下:
- 安装Kafka1.0,安装方法略,注意这里只是安装新版本Kafka,但并不启动,旧版本的Kafka保持正常工作
- 更新Kafka 1.0的server.properties,添加以下配置
1 | inter.broker.protocol.version=0.10.1 |
inter.broker.protocol.version
用于指定broker之间的通信协议版本,默认是和当前Kafka版本一致,即新版本中inter.broker.protocol.version为1.0。如果使用默认值,在轮询重启过程中,先重启的Broker将尝试使用1.0的协议与其他未重启的broker通信,由于未重启的broker仍旧是0.10.1版本,并不支持1.0的协议,会导致server.log中频繁报错,新版本broker上的partition无法被加回到ISR中1
[2018-01-30 11:19:14,020] INFO [ReplicaFetcher replicaId=81, leaderId=83, fetcherId=0] Retrying leaderEpoch request for partition test_kane-2 as the leader reported an error: UNKNOWN_SERVER_ERROR (kafka.server.ReplicaFetcherThread)
log.message.format.version
用于指定broker持久化数据的格式,默认是和当前Kafka版本一致,即新版本log.message.format.version为1.0
轮询重启集群:关闭旧版本broker,再启动新版本broker
再次更新server.properties,并轮询重启集群
1
2inter.broker.protocol.version=1.0
log.message.format.version=1.0
这里由于我需要使用Record的Header,因此直接把log.message.format.version改成了1.0,否则会报错java.lang.IllegalArgumentException: Magic v1 does not support record headers
。但这样做会有风险,即如果producer的版本为1.0,consumer的版本为0.10.1,则consumer会因为无法解析1.0格式的数据而报错。所以建议确保所有consumer都升级到1.0之后,再把log.message.format.version改成1.0