我是Scala/flink/spark的新手,会有几个问题。 现在正在使用带有 flink 的 scala。
数据流的一般思路是这样的:
csv 文件 -> flink -> 弹性> flink(进程数据( -> MongoDB -> Tableau
有大量的日志文件以分号分隔。 我想将这些文件写入 elasticsearch 作为我的数据库。(这已经有效了(
现在需要各种分析(例如一致性报告/生产力报告(。 对于这些报表,需要不同类型的列。
这个想法是使用 flink 从 elasticsearch 导入基础数据,编辑数据并将其保存到 mongodb 中,以便可以使用 Tableau 完成数据可视化。
编辑将包括添加其他列,如工作日和不同状态的开始/结束时间
// +-------+-----+-----+
// | status|date |time |
// +-------+-----+-----+
// | start | 1.1 |7:00 |
// | run_a | 1.1 |7:20 |
// | run_b | 1.1 |7:50 |
// +-------+-----+-----+
// +-------+-------+-------+----+
// | status|s_time |e_time |day |
// +-------+-------+-------+----|
// | start | 7:00 |7:20 | MON|
// | run_a | 7:20 |7:50 | MON|
// | run_b | 7:50 |nextVal| MON|
// +-------+-------+-------+----+
经过一番研究,我发现 flink 没有提供使用 elastic 作为数据源的可能性。 有一个 github 项目 https://github.com/mnubo/flink-elasticsearch-source-connector 但它已经一年多没有更新了。这似乎无法正常工作,因为它给我的点击次数较少,然后我会使用相同的查询进入 kibana。 还有其他选择吗?为什么默认情况下不支持此功能?
这些表转换可以使用 flink 吗?用flink做它们有意义吗?(因为我很难实现它们(
我是否为这个项目使用了正确的框架?我应该切换到 Spark 因为它提供了更多功能/社区项目吗?
首先,如果您的目标只是使用日志(健壮搜索、可视化、存储(进行处理,则不能重新发明轮子并使用 ELK 堆栈 您将获得下一个能力 -
- 具有
Logstash
的数据收集和日志解析引擎 - 使用
Kibana
进行分析和可视化 Elasticsearch
喜欢搜索引擎- 与云(AWS 或弹性云(无缝集成
但是这个软件是shareware
- 您将无法访问免费版本中的全部功能,我可以根据我的个人经验说 - 试用版适合在生产中使用 - 它确实让生活更轻松。
如果您想为存储、转换和处理日志或其他文件制作自己的自定义管道Apache Spark
这是为此目的的绝佳解决方案 - 您可以使用Spark
ETL
解决方案来操作您想要的一切 - 构建数据管道非常容易(read from elasticsearch
-->process it
-->save to mongo
;take from mongo
-->send to visualisation
等( - 你可以利用Spark 2.0实现加速(与早期版本的Spark相比(。
此外,已经有集成Spark - Mongo - ES
的现成解决方案,或者您可以通过使用 ES 和 Mongo 的连接器来制作自己的解决方案。关于Flink
- 您可以使用它而不是Spark
但Spark
是更成熟的技术,并且拥有更广泛的社区。与其他方法一样,您可以使用 ETL 解决方案在系统之间快速开发/原型设计数据流(用鼠标拖动必要的组件(,如流集或 NiFi。