Flume追踪数据来源

需求

我司的实时平台是用Flume来做实时数据的采集,多数场景下是用户发送TCP请求,Flume通过MultiportSyslogTCPSource接收

流平台的一个关键的监控环节是数据来源,但通过Flume采集下来数据并不带有数据来源的信息,这样就给整个数据流的监控带来很大的麻烦,比如收到某些不合规范的数据,导致Avro转换失败,如果能定位到某个表的话还不算太糟糕,至少还能联系到owner,更坏的情况是收到一些垃圾信息,根本定位不到属于哪个应用或者表,冤无头债无主,一脸懵逼

解决方案1.0

改造的思路是,在MultiportSyslogTCPSource中接收数据的同时,将client端的ip写到flume event header中,再把header的信息通过kafka sink带到kafka message的headers中,这样后续流处理发生异常时就可以把client ip打印出来,便于找人背锅

代码也很简单,在MultiportSyslogTCPSource.java的内部静态类MultiportSyslogHandler的messageReceived函数中,从session中取出RemoteAddress,设置到header中

1
2
3
4
5
6
7
8
9
10
if (lineSplitter.parseLine(buf, savedBuf, parsedLine)) {
Event event = parseEvent(parsedLine, decoder);
if (portHeader != null) {
event.getHeaders().put(portHeader, String.valueOf(port));
}
event.getHeaders().put(REMOTE_ADDRESS, ((InetSocketAddress) session.getRemoteAddress()).getAddress().getHostAddress());
events.add(event);
} else {
logger.trace("Parsed null event");
}

在KafkaSink.java的process函数中,把flume header的所有entry(或者可以只添加key为REMOTE_ADDRESS的entry)添加到kafka record的headers中

1
2
3
4
Map<String, String> headers = event.getHeaders();
for(Entry<String, String> entry: headers.entrySet()) {
record.headers().add(entry.getKey(), entry.getValue().getBytes());
}

解决方案2.0

问题还没完,由于我们使用nginx来做的HA和负载均衡,后来发现从header里取出来的client ip都是nginx的ip而不是真正客户端的ip。这是因为nginx做反向代理会把request和response转一手,屏蔽掉client和server的信息,如果直接修改request,把nginx的ip改为真正client的ip,server端就会把response发送到真正的client,这样很可能会由于网络不通或者被防火墙拦截掉而响应失败

这个问题在nginx本身是无解的,所以考虑引入某种机制,在请求体中携带client ip,对比了几种方案之后最终确定用代理协议Proxy Protocol

Nginx开启Proxy Protocol只需要添加配置proxy_protocol on;,例如

1
2
3
4
5
6
7
server {
listen 6240;
proxy_connect_timeout 3s;
proxy_timeout 120s;
proxy_pass tcp_5240;
proxy_protocol on;
}

开启之后,每条数据都会添加一个header(实际上就是在发送原数据之前先发送一条proxy protocol信息),proxy protocol的格式如下

1
PROXY 协议栈 源IP 目的IP 源端口 目的端口

例如:

1
PROXY TCP4 10.0.0.2 192.168.0.1 49863 6240

这样flume只需要从header中解析出源IP然后event header中就可以了,具体做法如下(在MultiportSyslogTCPSource基础上改造):

1
2
3
4
5
6
7
8
9
String msg = parseMsg(parsedLine, decoder);
if (StringUtils.isNotEmpty(msg) && !ProxyProtocolUtil.acceptProxyMetadata(session, msg)) {
Event event = parseEvent(parsedLine, decoder, msg);
if (portHeader != null) {
event.getHeaders().put(portHeader, String.valueOf(port));
}
event.getHeaders().put(REMOTE_ADDRESS, ProxyProtocolUtil.getRemoteAddress(session));
events.add(event);
}

从ParsedBuffer中构造一行数据,然后通过ProxyProtocolUtil.acceptProxyMetadata判断是否符合proxy protocol格式,如果符合,则将源IP提取出来并放到session中;如果不符合,则说明这是原数据,用来构造event,并把session中的源IP写入header中。后续处理和解决方案1.0中一样。ProxyProtocolUtil.acceptProxyMetadata的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public class ProxyProtocolUtil {
private static final String CLIENT_ADDRESS = "ClientAddress";
private static final Logger LOGGER = LoggerFactory.getLogger(ProxyProtocolUtil.class);
// 108 bytes is the largest buffer needed for the PROXY protocol, but we are a
// bit more lenient
public static final int MAX_PROXY_HEADER_LENGTH = Byte.MAX_VALUE;
public static final String PROX_PROTOCOL_PREFIX = "PROXY";

public static boolean acceptProxyMetadata(IoSession session, String msg) {

if (!msg.startsWith(PROX_PROTOCOL_PREFIX)) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("acceptServerProxyMetadata(session={}) mismatched protocol header: expected={}",
new Object[] { session, PROX_PROTOCOL_PREFIX });
}
return false;
}
try {
return parseProxyHeader(session, msg);
} catch (Exception e) {
LOGGER.debug("Not Proxy-Protocol. Msg = " + msg, e);
return false;
}
}

public static String getRemoteAddress(IoSession session) {
try {
if (session.containsAttribute(CLIENT_ADDRESS)) {
return ((InetSocketAddress) session.getAttribute(CLIENT_ADDRESS)).getAddress().getHostAddress();
} else {
return ((InetSocketAddress) session.getRemoteAddress()).getAddress().getHostAddress();
}
} catch (Exception e) {
return "UNKNOWN";
}
}

private static boolean parseProxyHeader(IoSession session, String msg) {
String[] proxyFields = GenericUtils.split(msg, ' ');
// Trim all fields just in case more than one space used
for (int index = 0; index < proxyFields.length; index++) {
String f = proxyFields[index];
proxyFields[index] = GenericUtils.trimToEmpty(f);
}

String proxyProtocolPrefix = proxyFields[0];
ValidateUtils.checkTrue(PROX_PROTOCOL_PREFIX.equalsIgnoreCase(proxyProtocolPrefix),
"Mismatched protocol prefix: %s", proxyProtocolPrefix);

String protocolVersion = proxyFields[1];
if ("TCP4".equalsIgnoreCase(protocolVersion) || "TCP6".equalsIgnoreCase(protocolVersion)) {
String layer3SrcAddress = proxyFields[2];
String layer3DstAddress = proxyFields[3];
String layer3SrcPort = proxyFields[4];
String layer3DstPort = proxyFields[5];
LOGGER.debug("parseProxyHeader(session={}) using {}:{} -> {}:{} proxy",
new Object[] { session, layer3SrcAddress, layer3SrcPort, layer3DstAddress, layer3DstPort });

session.setAttribute(CLIENT_ADDRESS,
new InetSocketAddress(layer3SrcAddress, Integer.parseInt(layer3SrcPort)));
} else {
LOGGER.warn("parseProxyHeader(session={}) unsuppored sub-protocol - {} - continue as usual", session,
protocolVersion);
}

return true;
}
}