我是否使用了正确的框架?



我是Scala/flink/spark的新手,会有几个问题。 现在正在使用带有 flink 的 scala。

数据流的一般思路是这样的:
csv 文件 -> flink -> 弹性> flink(进程数据( -> MongoDB -> Tableau

有大量的日志文件以分号分隔。 我想将这些文件写入 elasticsearch 作为我的数据库。(这已经有效了(
现在需要各种分析(例如一致性报告/生产力报告(。 对于这些报表,需要不同类型的列。

这个想法是使用 flink 从 elasticsearch 导入基础数据,编辑数据并将其保存到 mongodb 中,以便可以使用 Tableau 完成数据可视化。

编辑将包括添加其他列,如工作日和不同状态的开始/结束时间

// +-------+-----+-----+  
// | status|date |time |  
// +-------+-----+-----+  
// | start | 1.1 |7:00 |  
// | run_a | 1.1 |7:20 |  
// | run_b | 1.1 |7:50 |  
// +-------+-----+-----+  

// +-------+-------+-------+----+  
// | status|s_time |e_time |day |  
// +-------+-------+-------+----|  
// | start | 7:00  |7:20   | MON|  
// | run_a | 7:20  |7:50   | MON|  
// | run_b | 7:50  |nextVal| MON|  
// +-------+-------+-------+----+  

经过一番研究,我发现 flink 没有提供使用 elastic 作为数据源的可能性。 有一个 github 项目 https://github.com/mnubo/flink-elasticsearch-source-connector 但它已经一年多没有更新了。这似乎无法正常工作,因为它给我的点击次数较少,然后我会使用相同的查询进入 kibana。 还有其他选择吗?为什么默认情况下不支持此功能?

这些表转换可以使用 flink 吗?用flink做它们有意义吗?(因为我很难实现它们(

我是否为这个项目使用了正确的框架?我应该切换到 Spark 因为它提供了更多功能/社区项目吗?

首先,如果您的目标只是使用日志(健壮搜索、可视化、存储(进行处理,则不能重新发明轮子并使用 ELK 堆栈 您将获得下一个能力 -

  • 具有Logstash的数据收集和日志解析引擎
  • 使用Kibana进行分析和可视化
  • Elasticsearch喜欢搜索引擎
  • 与云(AWS 或弹性云(无缝集成

但是这个软件是shareware- 您将无法访问免费版本中的全部功能,我可以根据我的个人经验说 - 试用版适合在生产中使用 - 它确实让生活更轻松。

如果您想为存储、转换和处理日志或其他文件制作自己的自定义管道Apache Spark这是为此目的的绝佳解决方案 - 您可以使用SparkETL解决方案来操作您想要的一切 - 构建数据管道非常容易(read from elasticsearch-->process it-->save to mongo;take from mongo-->send to visualisation等( - 你可以利用Spark 2.0实现加速(与早期版本的Spark相比(。

此外,已经有集成Spark - Mongo - ES的现成解决方案,或者您可以通过使用 ES 和 Mongo 的连接器来制作自己的解决方案。关于Flink- 您可以使用它而不是SparkSpark是更成熟的技术,并且拥有更广泛的社区。与其他方法一样,您可以使用 ETL 解决方案在系统之间快速开发/原型设计数据流(用鼠标拖动必要的组件(,如流集或 NiFi。

相关内容

  • 没有找到相关文章