数据链路追踪
在实时数据平台中,为了保证数据是可追溯的,一般会在数据生成时给每一条数据分配一个唯一的ID,以及生成的时间,最好再带上数据源的信息,这样DEBUG的时候就能知道数据是从哪来的。然后在数据的每个流转环节中,将每一条数据的ID和到达时间记录到日志或者其他存储中
这样在发生问题或者调试时,就能通过ID重现数据的轨迹,包括数据行经的每个组件以及消耗的时间。这个过程能够通过ELK轻松完成,将所有组件的日志导入ES,然后使用ID在Kibana中搜索,甚至可以通过定制图表来实现数据链路可视化,这里不做展开
以Flume为例,我们可以在最外层的Agent(即跟数据源最近的一层,了解Flume分层架构请自行搜索)通过UUIDInterceptor生成唯一ID,然后连同数据源信息(机房ID,机器ID)和数据接收的时间戳一起放到event header中,最后带到数据流里
注: 一般情况下数据的唯一ID和生成时间event_time应该由数据的生成方指定,如果没有,则由Flume生成。当然也可以强制由Flume生成,主要取决于是否将平台与用户之间的网络视同内网
这里有两个问题:
1.一般来讲实时平台的组件数据结构中都有一个rowkey来保存唯一ID,比如kafka message key,es index id,hbase rowkey等等,但不一定有header或者metamap之类的数据结构来保存数据源信息和数据接收时间,因此,如果能把这些额外的信息都保存到ID中就再好不过了
2.UUID太长了,36个字符,并且是字母数字和符号的组合,这非常影响存储的效率
所以可以考虑用SnowFlake算法来解决这两个问题
SnowFlake算法
先贴代码,来自Twitter雪花算法SnowFlake的Java实现
1 | public class SnowFlake { |
SnowFlake算法用来生成64位的ID,刚好可以用long整型存储,能够用于分布式系统中生产唯一的ID, 并且生成的ID有大致的顺序。以上代码生成的64位ID可以分成5个部分:
0 - 41位时间戳 - 5位数据中心标识 - 5位机器标识 - 12位序列号
41位时间戳
可以存储大概69年的时间,所以以1970/01/01为基准的话,能保存到2039年,而如果以START_STMP(2018/01/01)为基准存储(currentTimeMillis - START_STMP),就能够延长到2087年
12位序列号
,时间戳相同的记录通过使用递增的序列号来避免冲突,12位序列号能支持每毫秒4096个ID(如果一毫秒内请求书超过4096个,则超过的请求阻塞到下一毫秒,并发量会受到影响),即409.6万QPS,一般能够满足需求,如果不够的话,可以通过减少数据中心或机器的存储位数来增加序列化存储
10位数据源标识
,这里拆分为5位数据中心标识和5位机器标识,其实只是逻辑上的区分,总共可支持标记1024个数据源
获得数据的ID之后,可以通过以下代码解析出数据的接收时间以及数据源信息
1 | System.out.println("SEQUENCE = " + ((id) & ~(-1L << SEQUENCE_BIT))); |
Flume
SnowFlake在Flume中的使用就很简单了,使用SnowFlakeIntercepter替换掉UUIDInterceptor即可,代码如下
1 | public class SnowFlakeIntercepter implements Interceptor { |
flume-conf.properties中配置的时候需要指定数据中心标识datacenter
和机器标识machine
,也可以通过指定reverse
=false来关闭ID翻转,默认是true,因为SnowFlake生成的ID默认是递增的,虽然存储效率较高,但应用到某些存储系统时会产生热点问题,比如HBase
1 | a2.sources.test_src.interceptors=i1 |
这里还有个坑,假如使用了ID翻转,并且在数据流中将ID当做数值类型处理,有可能会产生ID冲突,比如12345和123450在翻转并转成Long型之后都是54321。解决的办法也很简单,将START_STMP设置的足够小,比如0L,以保证生成的ID都足够大就没问题了
———————————- 8/14 更新 ——————————————–
之前说到把START_STMP设置的足够小,比如0L,来避免翻转之后产生的冲突。但这里也有问题,SnowFlake算法是能保证产生的id是长整型,但19位的长整型翻转之后就有可能超出Long.MAX_VALUE,比如6433192668775325729,这会给后续的存储带来很大的麻烦。目前我的想法是把生成的id限制在18位,具体做法是数据中心和机器标识一共使用8位存储,然后START_STMP设置为2014/01/01
,可以在30年之内保证id的位数维持在18位