Kafka收发oversize数据

Kafka整个消息管道的默认口径是1M,换句话说,默认Producer只能向Kafka发送大小不超过1M的消息,Kafka内部也只能处理大小不超过1M的消息,Consumer也只能消费大小不超过1M的消息。

如果发送2M(为了方便计算,以下1M=1000K)大小的数据,client会报异常

1
Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 2000037 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.

根据提示,在Producer config中设置max.request.size为2M+(注意必须大于2M,因为消息本身的metadata也会占用空间,比如上文日志中,一条包含2M数据的消息的大小是2M+37 byte),但服务器返回异常

1
Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.

服务器无法接收这么大的消息。那么修改Kafka的server.properties,添加message.max.bytes=2M,然后重启Kafka集群。发送成功。

然后尝试消费消息,报错

1
Exception in thread "main" org.apache.kafka.common.errors.RecordTooLargeException: There are some messages at [Partition=Offset]: {test-1=6863354} whose size is larger than the fetch size 1048576 and hence cannot be ever returned. Increase the fetch size on the client (using max.partition.fetch.bytes), or decrease the maximum message size the broker will allow (using message.max.bytes).

根据提示,在Consumer config中设置max.partition.fetch.bytes为2M。消费成功。

但还没完,这时发现Kafka的bytes_out异常的高,几乎逼近带宽极限,但这时并没有客户端在往Kafka里发送数据,查看Kafka日志,发现有异常

1
[2016-10-25 03:15:08,361] ERROR [ReplicaFetcherThread-0-1], Replication is failing due to a message that is greater than replica.fetch.max.bytes for partition [test,1]. This generally occurs when the max.message.bytes has been overridden to exceed this value and a suitably large message has also been sent. To fix this problem increase replica.fetch.max.bytes in your broker config to be equal or larger than your settings for max.message.bytes, both at a broker and topic level. (kafka.server.ReplicaFetcherThread)

原因是修改message.max.bytes之后,Kafka broker已经能接收2M的数据,但replication fetcher尝试备份时失败了,因为replica.fetch.max.bytes控制了最大的备份消息size是1M。由于replication fetcher会无限的重试备份,因此bytes_out会急剧升高。把replica.fetch.max.bytes设置成2M,并重启broker之后,问题解决。

由于可以通过topic-level参数max.message.bytes来控制每个topic所能接收的最大消息size,所以为了避免重启,可以将replica.fetch.max.bytes设置为一个比较大的值(比如10M),以后如果有某个topic需要收发oversize的消息,只需要修改该topic的max.message.bytes就行,不需要重启集群。