解决实时平台Avro Schema兼容性问题

在大数据平台或应用中,为了降低数据传输消耗,通常我们会使用某种紧凑型的数据格式,比如Avro

Avro的序列化/反序列化JAVA API主要有两种,DataFileWriter/DataFileReader和GenericDatumWriter/GenericDatumReader

GenericDatumWriter/GenericDatumReader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static GenericRecord genericDatumSerde(Schema schema, GenericRecord record) throws IOException {
byte[] array = null;
// serialize
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream();) {
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);
DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
writer.write(record, encoder);
encoder.flush();
array = outputStream.toByteArray();
}
// deserialize
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
return reader.read(null, DecoderFactory.get().binaryDecoder(array, null));
}

这是比较常用的API,序列化之后的byte array不包含schema信息,因此反序列化构建GenericDatumReader时需要提供schema

DataFileWriter/DataFileReader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static GenericRecord dataFileSerde(Schema schema, GenericRecord record) throws IOException {
byte[] array = null;
// serialize
try (ByteArrayOutputStream out = new ByteArrayOutputStream();
DataFileWriter<GenericRecord> writer = new DataFileWriter<GenericRecord>(
new GenericDatumWriter<GenericRecord>(schema)).create(schema, out);) {
writer.append(record);
writer.flush();
array = out.toByteArray();
}
// deserialize
try (DataFileReader<GenericRecord> reader = new DataFileReader<>(new SeekableByteArrayInput(array),
new GenericDatumReader<GenericRecord>());) {
return reader.next();
}
}

GenericDatumSerde相比,这种方式序列化之后的数据带有schema信息,因此byte array明显要大很多,但好处是反序列化构建GenericDatumReader时不需要提供schema

元数据管理

一般来说在大数据平台或应用中,从节省存储空间和带宽的角度来说,GenericDatumSerde会比较常用(以下如果没有特殊说明,都只讨论GenericDatumSerde)。但由于本身不带schema信息,上下游系统就必须额外的维护相同的或兼容的schema。在复杂的数据平台中,假如每个应用都在本地维护schema,那么数据格式的依赖关系是一种网状结构,当schema需要变更时,会带来相当大的麻烦

这时我们需要引入一个第三方的元数据管理系统,比如SchemaRegistry,这样数据格式的依赖关系就变成了星型结构

以Kafka作为数据交互中间件为例,Producer首先到SchemaRegistry中获取当前topic的schema,对数据进行avro序列化之后发送到Kafka,Consumer读取二进制数据,从SchemaRegistry中获取schema,对数据进行反序列化

Schema兼容性

考虑这样一种场景,Producer使用v1版本的schema对数据进行序列化,然后schema在SchemaRegistry中被更新到v2,比如添加了一个字段A,然后Consumer使用v2版本的schema对数据进行反序列化,就会抛出java.io.EOFException,原因是当尝试用新的schema解析字段A时,byte array中已经没有更多的数据可供读取

解决这个问题有两种办法:

  • 用旧版本的schema反序列化

这里涉及到schema版本号传递的问题,最简单的办法当然是使用DataFileSerde,数据自带schema,不会出现版本不一致的问题,但刚才提到过,由于占用空间太大一般不用这种方式。换一种思路,数据本身只携带schema的id或version,本身占用空间很小,并且可以根据id找到序列化的旧版本schema

携带id有两种做法,一种是在byte array中分配一段固定长度的空间用于保存id,例如SchemaRegistry自带的KafkaAvroSerializer.java就是在byte array中预留前五个字节,其中第一个字节是魔法字节,4个字节用于保存id。读取时先根据第一个字节判断该条记录是否带有schema信息,如果是魔法字节,则从2-5个字节取出id,到SchemaRegistry中获取序列化所用的schema,然后对byte array中剩下的数据进行反序列化,这种做法实际上是对数据有侵入,导致byte array可读性降低(虽然本来可读性就不高)。另一种做法是使用额外的meta空间保存id,例如Kafka message的header,Flume event的header等等

  • 忽略新增字段
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package org.apache.avro.generic;

import java.io.EOFException;
import java.io.IOException;

import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.io.ResolvingDecoder;
import org.apache.avro.specific.SpecificDatumReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
*
* 新版本schema兼容老版本的数据
*
* @author xiejing.kane
*
* @param <D>
*/
public class CompatibleDatumReader<D> extends GenericDatumReader<D> {
private static Logger LOGGER = LoggerFactory.getLogger(CompatibleDatumReader.class);

public CompatibleDatumReader(Schema schema) {
super(schema);
}

@Override
protected Object readRecord(Object old, Schema expected, ResolvingDecoder in) throws IOException {
Object r = getData().newRecord(old, expected);
Object state = getData().getRecordState(r, expected);
for (Field f : in.readFieldOrder()) {
int pos = f.pos();
String name = f.name();
Object oldDatum = (old != null) ? getData().getField(r, name, pos, state) : null;
try {
getData().setField(r, name, pos, read(oldDatum, f.schema(), in), state);
} catch (EOFException e) {
// 如果没有更多的数据可读,说明read schema比write schema版本高,多出了新字段
// 这种情况直接忽略新字段
continue;
}
}
return r;
}
}

继承GenericDatumReader.java,重写readRecord方法,如果没有更多的数据可供读取,则直接忽略该字段。这样对于字段的增删都能够很好的兼容,但对于字段更新就没办法了(包括更新字段名和字段类型)