需求
我司的实时平台是用Flume来做实时数据的采集,多数场景下是用户发送TCP请求,Flume通过MultiportSyslogTCPSource接收
流平台的一个关键的监控环节是数据来源,但通过Flume采集下来数据并不带有数据来源的信息,这样就给整个数据流的监控带来很大的麻烦,比如收到某些不合规范的数据,导致Avro转换失败,如果能定位到某个表的话还不算太糟糕,至少还能联系到owner,更坏的情况是收到一些垃圾信息,根本定位不到属于哪个应用或者表,冤无头债无主,一脸懵逼
解决方案1.0
改造的思路是,在MultiportSyslogTCPSource中接收数据的同时,将client端的ip写到flume event header中,再把header的信息通过kafka sink带到kafka message的headers中,这样后续流处理发生异常时就可以把client ip打印出来,便于找人背锅
代码也很简单,在MultiportSyslogTCPSource.java的内部静态类MultiportSyslogHandler的messageReceived函数中,从session中取出RemoteAddress,设置到header中
1 | if (lineSplitter.parseLine(buf, savedBuf, parsedLine)) { |
在KafkaSink.java的process函数中,把flume header的所有entry(或者可以只添加key为REMOTE_ADDRESS的entry)添加到kafka record的headers中
1 | Map<String, String> headers = event.getHeaders(); |
解决方案2.0
问题还没完,由于我们使用nginx来做的HA和负载均衡,后来发现从header里取出来的client ip都是nginx的ip而不是真正客户端的ip。这是因为nginx做反向代理会把request和response转一手,屏蔽掉client和server的信息,如果直接修改request,把nginx的ip改为真正client的ip,server端就会把response发送到真正的client,这样很可能会由于网络不通或者被防火墙拦截掉而响应失败
这个问题在nginx本身是无解的,所以考虑引入某种机制,在请求体中携带client ip,对比了几种方案之后最终确定用代理协议Proxy Protocol
Nginx开启Proxy Protocol只需要添加配置proxy_protocol on;
,例如
1 | server { |
开启之后,每条数据都会添加一个header(实际上就是在发送原数据之前先发送一条proxy protocol信息),proxy protocol的格式如下
1 | PROXY 协议栈 源IP 目的IP 源端口 目的端口 |
例如:
1 | PROXY TCP4 10.0.0.2 192.168.0.1 49863 6240 |
这样flume只需要从header中解析出源IP然后event header中就可以了,具体做法如下(在MultiportSyslogTCPSource基础上改造):
1 | String msg = parseMsg(parsedLine, decoder); |
从ParsedBuffer中构造一行数据,然后通过ProxyProtocolUtil.acceptProxyMetadata判断是否符合proxy protocol格式,如果符合,则将源IP提取出来并放到session中;如果不符合,则说明这是原数据,用来构造event,并把session中的源IP写入header中。后续处理和解决方案1.0中一样。ProxyProtocolUtil.acceptProxyMetadata的代码如下:
1 | public class ProxyProtocolUtil { |