如何将Kafka读取批处理到Elasticsearch



我对卡夫卡不太熟悉,但我想知道什么是最好的方法从Kafka批量读取数据,这样我就可以使用Elasticsearch Bulk Api更快、更可靠地加载数据。

顺便说一句,我正在为我的卡夫卡消费者使用Vertx

谢谢你,

我不知道这是否是最好的方法,但当我开始寻找类似的功能时,我找不到任何现成的框架。我发现了这个项目:

https://github.com/reachkrishnaraj/kafka-elasticsearch-standalone-consumer/tree/branch2.0

并开始为它做出贡献,因为它并没有做到我想要的一切,而且也不容易扩展。现在2.0版本非常可靠,我们在生产中使用它,我们公司每天处理/索引300M以上的事件。

这不是自我推销:)——只是分享我们如何做同样类型的工作。当然,现在可能还有其他选择。

https://github.com/confluentinc/kafka-connect-elasticsearch

或者你可以试试这个源

https://github.com/reachkrishnaraj/kafka-elasticsearch-standalone-consumer

作为标准Jar 运行

**1.将代码下载到$INDEXER_HOME目录.中

**2.cp$INDEXER_HOME/src/main/resources/kafka-es-INDEXER.properties.template/your/absolute/path/kafka-es-dexer.properties文件-按照注释中的说明更新所有相关属性

**3.cp$INDEXER_HOME/src/main/resources/logback.xml.template/your/absolute/path/logback.xml

指定要存储日志的目录:

根据需要调整日志文件的最大大小和数量的值

**4.构建/创建应用程序jar(确保您安装了MAven):

cd $INDEXER_HOME
mvn clean package

kafka-es-indexer-2.0.jar将在$indexer_HOME/bin中创建。所有依赖项都将被放入$INDEXER_HOME/bin/lib中。所有JAR依赖项都通过kafka-es-indexer-2.0.JAR清单链接。

**5.编辑$INDEXER_HOME/run_INDEXER.sh脚本:--如果需要,使其可执行(chmod a+x$INDEXERR_HOME/run _INDEXER.sh)--根据您的环境更新标有"CHANGE FOR your ENV"注释的属性

**6.运行应用程序[使用JDK1.8]:

./run_indexer.sh

我使用了spark流,这是一个使用Scala的非常简单的实现。

最新更新