SnowFlake算法在数据链路中的应用

数据链路追踪

在实时数据平台中,为了保证数据是可追溯的,一般会在数据生成时给每一条数据分配一个唯一的ID,以及生成的时间,最好再带上数据源的信息,这样DEBUG的时候就能知道数据是从哪来的。然后在数据的每个流转环节中,将每一条数据的ID和到达时间记录到日志或者其他存储中

这样在发生问题或者调试时,就能通过ID重现数据的轨迹,包括数据行经的每个组件以及消耗的时间。这个过程能够通过ELK轻松完成,将所有组件的日志导入ES,然后使用ID在Kibana中搜索,甚至可以通过定制图表来实现数据链路可视化,这里不做展开

以Flume为例,我们可以在最外层的Agent(即跟数据源最近的一层,了解Flume分层架构请自行搜索)通过UUIDInterceptor生成唯一ID,然后连同数据源信息(机房ID,机器ID)和数据接收的时间戳一起放到event header中,最后带到数据流里

注: 一般情况下数据的唯一ID和生成时间event_time应该由数据的生成方指定,如果没有,则由Flume生成。当然也可以强制由Flume生成,主要取决于是否将平台与用户之间的网络视同内网

这里有两个问题:

1.一般来讲实时平台的组件数据结构中都有一个rowkey来保存唯一ID,比如kafka message key,es index id,hbase rowkey等等,但不一定有header或者metamap之类的数据结构来保存数据源信息和数据接收时间,因此,如果能把这些额外的信息都保存到ID中就再好不过了

2.UUID太长了,36个字符,并且是字母数字和符号的组合,这非常影响存储的效率

所以可以考虑用SnowFlake算法来解决这两个问题

SnowFlake算法

先贴代码,来自Twitter雪花算法SnowFlake的Java实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
public class SnowFlake {

/**
* 起始的时间戳
*/
private final static long START_STMP = 1514736000000L;

/**
* 每一部分占用的位数
*/
private final static long SEQUENCE_BIT = 12; // 序列号占用的位数
private final static long MACHINE_BIT = 5; // 机器标识占用的位数
private final static long DATACENTER_BIT = 5;// 数据中心占用的位数

/**
* 每一部分的最大值
*/
private final static long MAX_DATACENTER_NUM = -1L ^ (-1L << DATACENTER_BIT);
private final static long MAX_MACHINE_NUM = -1L ^ (-1L << MACHINE_BIT);
private final static long MAX_SEQUENCE = -1L ^ (-1L << SEQUENCE_BIT);

/**
* 每一部分向左的位移
*/
private final static long MACHINE_LEFT = SEQUENCE_BIT;
private final static long DATACENTER_LEFT = MACHINE_LEFT + MACHINE_BIT;
private final static long TIMESTMP_LEFT = DATACENTER_LEFT + DATACENTER_BIT;

private long datacenterId; // 数据中心
private long machineId; // 机器标识
private long sequence = 0L; // 序列号
private long lastStmp = -1L;// 上一次时间戳

public SnowFlake(long datacenterId, long machineId) {
if (datacenterId > MAX_DATACENTER_NUM || datacenterId < 0) {
throw new IllegalArgumentException("datacenterId can't be greater than MAX_DATACENTER_NUM or less than 0");
}
if (machineId > MAX_MACHINE_NUM || machineId < 0) {
throw new IllegalArgumentException("machineId can't be greater than MAX_MACHINE_NUM or less than 0");
}
this.datacenterId = datacenterId;
this.machineId = machineId;
}

/**
* 产生下一个ID
*
* @return
*/
public synchronized long nextId() {
long currStmp = getNewstmp();
if (currStmp < lastStmp) {
throw new RuntimeException("Clock moved backwards. Refusing to generate id");
}

if (currStmp == lastStmp) {
// 相同毫秒内,序列号自增
sequence = (sequence + 1) & MAX_SEQUENCE;
// 同一毫秒的序列数已经达到最大
if (sequence == 0L) {
currStmp = getNextMill();
}
} else {
// 不同毫秒内,序列号置为0
sequence = 0L;
}

lastStmp = currStmp;

return (currStmp - START_STMP) << TIMESTMP_LEFT // 时间戳部分
| datacenterId << DATACENTER_LEFT // 数据中心部分
| machineId << MACHINE_LEFT // 机器标识部分
| sequence; // 序列号部分
}

private long getNextMill() {
long mill = getNewstmp();
while (mill <= lastStmp) {
mill = getNewstmp();
}
return mill;
}

private long getNewstmp() {
return System.currentTimeMillis();
}

public static void main(String[] args) {
SnowFlake snowFlake = new SnowFlake(2, 4);
System.out.println(snowFlake.nextId());
}
}

SnowFlake算法用来生成64位的ID,刚好可以用long整型存储,能够用于分布式系统中生产唯一的ID, 并且生成的ID有大致的顺序。以上代码生成的64位ID可以分成5个部分:

0 - 41位时间戳 - 5位数据中心标识 - 5位机器标识 - 12位序列号

41位时间戳可以存储大概69年的时间,所以以1970/01/01为基准的话,能保存到2039年,而如果以START_STMP(2018/01/01)为基准存储(currentTimeMillis - START_STMP),就能够延长到2087年

12位序列号,时间戳相同的记录通过使用递增的序列号来避免冲突,12位序列号能支持每毫秒4096个ID(如果一毫秒内请求书超过4096个,则超过的请求阻塞到下一毫秒,并发量会受到影响),即409.6万QPS,一般能够满足需求,如果不够的话,可以通过减少数据中心或机器的存储位数来增加序列化存储

10位数据源标识,这里拆分为5位数据中心标识和5位机器标识,其实只是逻辑上的区分,总共可支持标记1024个数据源

获得数据的ID之后,可以通过以下代码解析出数据的接收时间以及数据源信息

1
2
3
4
System.out.println("SEQUENCE = " + ((id) & ~(-1L << SEQUENCE_BIT)));
System.out.println("MACHINE = " + ((id >> MACHINE_LEFT) & ~(-1L << MACHINE_BIT)));
System.out.println("DATACENTER = " + ((id >> DATACENTER_LEFT) & ~(-1L << DATACENTER_BIT)));
System.out.println("TIMESTAMP = " + ((id >> TIMESTMP_LEFT) + START_STMP));

Flume

SnowFlake在Flume中的使用就很简单了,使用SnowFlakeIntercepter替换掉UUIDInterceptor即可,代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public class SnowFlakeIntercepter implements Interceptor {

private String headerName;
private boolean preserveExisting;
private SnowFlake snowFlake;
private boolean reverse;

public static final String HEADER_NAME = "headerName";
public static final String PRESERVE_EXISTING_NAME = "preserveExisting";
public static final String DATACENTER_ID = "datacenter";
public static final String MACHINE_ID = "machine";
public static final String REVERSE = "reverse";

protected SnowFlakeIntercepter(Context context) {
reverse = context.getBoolean(REVERSE, true);
headerName = context.getString(HEADER_NAME, "id");
preserveExisting = context.getBoolean(PRESERVE_EXISTING_NAME, true);
snowFlake = new SnowFlake(context.getLong(DATACENTER_ID), context.getLong(MACHINE_ID));
}

@Override
public void initialize() {
}

@Override
public Event intercept(Event event) {
Map<String, String> headers = event.getHeaders();
if (preserveExisting && headers.containsKey(headerName)) {
// preserve the existing id
} else {
String id = snowFlake.nextId() + "";
headers.put(headerName, reverse ? StringUtils.reverse(id) : id);
}
return event;
}

@Override
public List<Event> intercept(List<Event> events) {
List<Event> results = new ArrayList<Event>(events.size());
for (Event event : events) {
event = intercept(event);
if (event != null) {
results.add(event);
}
}
return results;
}

@Override
public void close() {
}

public static class Builder implements Interceptor.Builder {
private Context context;

public Builder() {
}

@Override
public SnowFlakeIntercepter build() {
return new SnowFlakeIntercepter(context);
}

@Override
public void configure(Context context) {
this.context = context;
}

}
}

flume-conf.properties中配置的时候需要指定数据中心标识datacenter和机器标识machine,也可以通过指定reverse=false来关闭ID翻转,默认是true,因为SnowFlake生成的ID默认是递增的,虽然存储效率较高,但应用到某些存储系统时会产生热点问题,比如HBase

1
2
3
4
5
a2.sources.test_src.interceptors=i1
a2.sources.test_src.interceptors.i1.type=com.sdo.dw.rtc.flume.interceptor.SnowFlakeIntercepter$Builder
a2.sources.test_src.interceptors.i1.datacenter=2
a2.sources.test_src.interceptors.i1.machine=4
a2.sources.test_src.interceptors.i1.reverse=false

这里还有个坑,假如使用了ID翻转,并且在数据流中将ID当做数值类型处理,有可能会产生ID冲突,比如12345和123450在翻转并转成Long型之后都是54321。解决的办法也很简单,将START_STMP设置的足够小,比如0L,以保证生成的ID都足够大就没问题了

———————————- 8/14 更新 ——————————————–

之前说到把START_STMP设置的足够小,比如0L,来避免翻转之后产生的冲突。但这里也有问题,SnowFlake算法是能保证产生的id是长整型,但19位的长整型翻转之后就有可能超出Long.MAX_VALUE,比如6433192668775325729,这会给后续的存储带来很大的麻烦。目前我的想法是把生成的id限制在18位,具体做法是数据中心和机器标识一共使用8位存储,然后START_STMP设置为2014/01/01,可以在30年之内保证id的位数维持在18位