Kafka从0.10.1升级到1.0.0

最近要把Kafka集群从0.10.1.1升级到1.0.0,主要是为了使用新的ProducerRecord和ConsumerRecord中的Header来保存和传输一些metadata,要求升级过程中不能服务不能中断,所以不考虑整体停机升级的方案,升级过程如下:

  • 安装Kafka1.0,安装方法略,注意这里只是安装新版本Kafka,但并不启动,旧版本的Kafka保持正常工作
  • 更新Kafka 1.0的server.properties,添加以下配置
1
2
inter.broker.protocol.version=0.10.1
log.message.format.version=0.10.1

inter.broker.protocol.version用于指定broker之间的通信协议版本,默认是和当前Kafka版本一致,即新版本中inter.broker.protocol.version为1.0。如果使用默认值,在轮询重启过程中,先重启的Broker将尝试使用1.0的协议与其他未重启的broker通信,由于未重启的broker仍旧是0.10.1版本,并不支持1.0的协议,会导致server.log中频繁报错,新版本broker上的partition无法被加回到ISR中

1
[2018-01-30 11:19:14,020] INFO [ReplicaFetcher replicaId=81, leaderId=83, fetcherId=0] Retrying leaderEpoch request for partition test_kane-2 as the leader reported an error: UNKNOWN_SERVER_ERROR (kafka.server.ReplicaFetcherThread)

log.message.format.version用于指定broker持久化数据的格式,默认是和当前Kafka版本一致,即新版本log.message.format.version为1.0

  • 轮询重启集群:关闭旧版本broker,再启动新版本broker

  • 再次更新server.properties,并轮询重启集群

    1
    2
    inter.broker.protocol.version=1.0
    log.message.format.version=1.0

这里由于我需要使用Record的Header,因此直接把log.message.format.version改成了1.0,否则会报错java.lang.IllegalArgumentException: Magic v1 does not support record headers。但这样做会有风险,即如果producer的版本为1.0,consumer的版本为0.10.1,则consumer会因为无法解析1.0格式的数据而报错。所以建议确保所有consumer都升级到1.0之后,再把log.message.format.version改成1.0