Kane Xie


  • 首页

  • 标签

  • 归档

  • 搜索

Kafka0.10升级潜在影响(二)

发表于 2016-07-05 | | 阅读次数:

接上篇,今天做了个测试,看log.message.format.version对consumer性能的影响到底有多大,一共20万条数据,每条数据大小1K,总共200M。

首先在Kafka 0.9上的平均测试结果是8.362秒。

而升级到Kafka 0.10之后,平均测试结果如下:

阅读全文 »

Kafka0.10升级潜在影响(一)

发表于 2016-07-04 | | 阅读次数:

Kafka 0.10的消息格式有新的变化,加入了timestamp字段。磁盘存储的消息格式可以通过server.properties的log.message.format.version配置,默认值是0.10.0.0。

阅读全文 »

Kafka安全认证-SASL

发表于 2016-07-02 | | 阅读次数:

Kafka0.10提供SASL/PLAIN,这是一种简单的用户名/密码安全认证机制。

配置Kafka Brokers

  • 创建kafka_server_jaas.conf
阅读全文 »

Kafka多数据中心架构设计(二)

发表于 2016-06-30 | | 阅读次数:

用mirrormaker在多个数据中心做Kafka的数据同步还需要考虑几个问题:

如何提高同步效率?

跨数据中心的数据传输,瓶颈在于带宽。我们采取的办法是启多个mirrormaker进程和压缩。

阅读全文 »

Kafka多数据中心架构设计(一)

发表于 2016-06-25 | | 阅读次数:

将Kafka部署在单一数据中心是一种相对比较常见的架构,所有的broker,producer和consumer都在同一个数据中心内,一般来说,也都在同一个内网里。

但考虑这样一个问题,公司在北京,上海和深圳都有业务系统,其中上海和北京会生成一些数据或日志,并且这些信息都需要被三地的业务系统所使用。假如我们考虑用Kafka来作为消息系统,那么就不得不考虑多数据中心的部署问题。

阅读全文 »

Kafka Rest Consumer性能优化

发表于 2016-06-17 | | 阅读次数:

之前有多语言consume kafka的需求,因此决定采用Confluent的Kafka Rest2.0.1。虽然测试的结果不如人意(java consumer大约60M/s,但用rest的话只有5M/s),考虑到消息量不大,也能凑合用

但最近在kafka集群中新创建了200个测试的空topic,rest consumer的性能急剧降低,惨不忍睹。。。

阅读全文 »

Kafka Consumer Session Timeout

发表于 2016-06-13 | | 阅读次数:

在kafka client(consumer)的log中看到这样的异常:

1
2
[13/06/16 16:01:26:153 PM CST] 550 ERROR ConsumerCoordinator: Error ILLEGAL_GENERATION occurred while committing offsets for group group7
`[13/06/16 16:01:26:154 PM CST] 424 WARN ConsumerCoordinator: Auto offset commit failed: Commit cannot be completed due to group rebalance
阅读全文 »

Kafka0.10新特性

发表于 2016-05-26 | | 阅读次数:

5月23日Confluent官方宣布Apache Kafka 0.10正式发布。该版本包含了很多新功能和优化,这里列出比较重要的几项:

Streams


如果你有这样的需求,从Kafka拉取数据进行流处理然后再推送回Kafka,那么你会喜欢0.10的Kafka Streams。Kafka Streams是一个类库,它实现了一系列流处理动作(例如join,filter,aggregate等),能够帮助你构建一个功能齐全的低延迟的流处理系统。它支持有状态或无状态的处理,并且能够被部署在各种框架和容器中(例如YARN,Mesos,Docker),也可以集成在Java应用里。

阅读全文 »

Kafka MirrorMaker实践

发表于 2016-04-20 | | 阅读次数:

最近准备使用Kafka Mirrormaker做两个数据中心的数据同步,以下是一些要点:

  1. mirrormaker必须提供一个或多个consumer配置,一个producer配置,一个whitelist或一个blacklist(支持java正则表达式)
  2. 启动多个mirrormaker进程,单个进程启动多个consuemr streams, 可以提高吞吐量和提供容
    阅读全文 »

KafkaConsumer0.9(三)

发表于 2016-01-18 | | 阅读次数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
Properties props = new Properties();  
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test_group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
List<TopicPartition> list = new ArrayList<TopicPartition>();
TopicPartition tp = new TopicPartition("test_topic", 0);
list.add(tp);
consumer.assign(list);
consumer.seek(tp, 96);
// consumer.seekToBeginning(tp);
// consumer.seekToEnd(tp);
int commitInterval = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
if (buffer.size() >= commitInterval) {
batchProcessRecords(buffer);
consumer.commitSync();
buffer.clear();
}
}
}
阅读全文 »
1…456…8
Kane Xie

Kane Xie

不吃早餐才是一件很嘻哈的事

74 日志
29 标签
GitHub E-Mail
© 2015 — 2019 Kane Xie
蚂蚁金服大量职位内推,请将简历投递至kane.xiejing@qq.com,欢迎你的加入!