Kane Xie


  • 首页

  • 标签

  • 归档

  • 搜索

Elasticsearch环境配置

发表于 2017-08-04 | | 阅读次数:

修改最大打开文件数,大于等于65536

临时生效

ulimit -n 65536

重启生效

vi /etc/security/limits.conf
@search soft nofile 65536
@search hard nofile 65536

阅读全文 »

Kafka Streams State Store

发表于 2017-07-25 | | 阅读次数:

Kafka Streams默认使用RocksDB来存储状态

In-memory or persistent ?

状态可配置为保存在内存中或者持久化,RocksDB对于两者都适用,并且可以通过Stores factory API在两者间切换。StateStoreSupplier被创建之后,可以用于Kafka Streams DSL API(high level),也可以用于Processor API(low level)

阅读全文 »

实时计算平台架构

发表于 2017-07-23 | | 阅读次数:

阅读全文 »

Kafka Streams自动创建内部topic

发表于 2017-07-22 | | 阅读次数:

运行一段kafka streams程序的时候,报出以下异常

1
2
3
4
5
6
7
8
9
10
11
12
Properties settings = new Properties();
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-first-streams-application");
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "10.128.74.83:9092");
StreamsConfig config = new StreamsConfig(settings);

TopologyBuilder builder = new TopologyBuilder();
builder.addSource("SOURCE", "source_topic").addProcessor("PROCESS1", MyProcessor::new, "SOURCE")
.addStateStore(Stores.create("COUNTS").withStringKeys().withIntegerValues().inMemory().build(),
"PROCESS1")
.addSink("SINK3", "sink_topic", "PROCESS1");
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
1
2
3
4
5
6
Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: stream-thread [StreamThread-1] Failed to rebalance
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:410)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_0] Could not find partition info for topic: my-first-streams-application1-COUNTS1-changelog
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:174)
……
阅读全文 »

Flink使用lambda表达式

发表于 2017-07-12 | | 阅读次数:

Java8出来之后,lambda表达式由于简单易读,在流式计算中的使用开始变得普遍。

同样,Flink也支持lambda表达式,例如我们改写一下wordcount样例

阅读全文 »

Failed to allocate memory within the configured max blocking time

发表于 2016-10-09 | | 阅读次数:

Kafka版本0.9.0.1,发送数据时报错

1
Failed to allocate memory within the configured max blocking time

原因很明显,如果producer端缓存(buffer.memory,默认32M)满了的话,在一定时间(max.block.ms,默认60s)内如果数据无法被放入缓存,则抛出该异常。

阅读全文 »

Kafka收发oversize数据

发表于 2016-10-01 | | 阅读次数:

Kafka整个消息管道的默认口径是1M,换句话说,默认Producer只能向Kafka发送大小不超过1M的消息,Kafka内部也只能处理大小不超过1M的消息,Consumer也只能消费大小不超过1M的消息。

如果发送2M(为了方便计算,以下1M=1000K)大小的数据,client会报异常

1
Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 2000037 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
阅读全文 »

Elasticsearch Plugins

发表于 2016-09-24 | | 阅读次数:

安利几个ES的插件:

sql

1
2
3
GITHUB地址:https://github.com/NLPchina/elasticsearch-sql/blob/master/README.md?utm_source=tuicool&utm_medium=referral
在线安装:./bin/plugin -u https://github.com/NLPchina/elasticsearch-sql/releases/download/{version}/elasticsearch-sql-{version}.zip --install sql
访问地址:http://localhost:9200/_plugin/sql/

阅读全文 »

ShutdownHook没有移除导致内存泄漏

发表于 2016-09-21 | | 阅读次数:

今天做kafka高并发测试的时候发生内存泄漏,直接原因是producer关闭之后内存没有得到释放。根本原因是:

  • 为了确保线程producer的connection断开,在线程里加了ShutdownHook
阅读全文 »

Cloudera Manager安装

发表于 2016-09-07 | | 阅读次数:
  • 准备一台虚拟机作为cm-server
  • 配置ssh免密码登录,在cm-server上执行脚本
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    CLUSTER_NODES=localhost
    #replace ',' with space
    CLUSTER_NODES=${CLUSTER_NODES//,/ }

    FILE=~/.ssh/id_rsa.pub
    if [ ! -f "${FILE}" ]; then
    ssh-keygen -t rsa -P ''
    fi

    for i in $CLUSTER_NODES
    do
    scp ~/.ssh/id_rsa.pub root@${i}:/tmp
    ssh root@${i} "cat /tmp/id_rsa.pub >> /root/.ssh/authorized_keys"
    done
阅读全文 »
1234…8
Kane Xie

Kane Xie

不吃早餐才是一件很嘻哈的事

74 日志
29 标签
GitHub E-Mail
© 2015 — 2019 Kane Xie
蚂蚁金服大量职位内推,请将简历投递至kane.xiejing@qq.com,欢迎你的加入!